Batch Processing
Batch LLM and embedding jobs to unlock provider batch discounts and simpler rate limits. When to batch, how to chunk inputs, idempotency, and monitoring so throughput goes up and per-token cost goes down.
Best For: Applications with bulk processing needs
Batch Processing Implementation Guide
Process Non-Urgent AI Requests at 50% Discount
Difficulty: Beginner to Intermediate
Time Required: 2-3 hours
Potential Savings: $1,500-6,000/month (50% reduction on batch workloads)
Best For: Background jobs, data processing, content generation pipelines
What is Batch Processing?
Traditional (Synchronous) Processing:
Process 1,000 documents immediately
β
Pay full price: $100
Wait for each to complete: 30 minutes
Batch Processing (24-hour window):
Queue 1,000 documents for batch processing
β
Pay 50% less: $50
Results delivered within 24 hours
Trade-off: Wait up to 24 hours, save 50% on costs.
Why You Need This
Cost Comparison:
| Use Case | Real-Time | Batch | Savings |
|---|---|---|---|
| Classify 10K documents | $50 | $25 | 50% |
| Generate summaries (1K articles) | $200 | $100 | 50% |
| Sentiment analysis (100K reviews) | $500 | $250 | 50% |
| Data enrichment (50K records) | $300 | $150 | 50% |
When to Use Batch:
β Good for Batch:
- Nightly data processing
- Content moderation queue
- Bulk document analysis
- Email campaign generation
- Analytics reports
- Data enrichment
- SEO content generation
β Not for Batch:
- User-facing chatbots
- Real-time search
- Live customer support
- Interactive applications
- Anything time-sensitive
Prerequisites
Before implementing:
- OpenAI API access (Batch API available)
- Identify batch-suitable workloads
- File storage (local or S3/R2)
- Python 3.8+ (for examples)
- Understanding of async job processing
Batch API Availability:
- OpenAI: β Batch API available
- Anthropic: β No batch API yet
- Azure OpenAI: β Batch available
Implementation Steps
Step 1: Identify Batch Workloads
Audit your current usage:
def analyze_batch_potential(usage_logs): """ Analyze which workloads could be batched. Returns estimation of potential savings. """ batch_candidates = { 'document_processing': 0, 'content_generation': 0, 'data_enrichment': 0, 'sentiment_analysis': 0, 'other': 0 } for log in usage_logs: # Check if request is time-sensitive if log.get('priority') == 'low' or log.get('latency_tolerance') > 3600: # This could be batched category = classify_request_type(log) batch_candidates[category] += log.get('cost', 0) total_batchable = sum(batch_candidates.values()) potential_savings = total_batchable * 0.5 # 50% discount return { 'batchable_cost': total_batchable, 'potential_savings': potential_savings, 'by_category': batch_candidates } # Example analysis = analyze_batch_potential(your_usage_logs) print(f"Potential monthly savings: ${analysis['potential_savings']:,.2f}")
Step 2: Create Batch Requests (OpenAI)
Basic Batch Request:
from openai import OpenAI import json client = OpenAI() def create_batch_file(requests: list, filename: str = "batch_requests.jsonl"): """ Create JSONL file with batch requests. Each line is a separate request in JSON format. """ with open(filename, 'w') as f: for i, request in enumerate(requests): # Format as required by OpenAI Batch API batch_request = { "custom_id": f"request-{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4o-mini", "messages": request['messages'], "max_tokens": request.get('max_tokens', 1000) } } f.write(json.dumps(batch_request) + '\n') return filename # Example: Create batch of 1000 document summaries requests = [] for doc in documents[:1000]: requests.append({ 'messages': [ {"role": "system", "content": "Summarize the following document concisely."}, {"role": "user", "content": doc['text']} ], 'max_tokens': 500 }) batch_file = create_batch_file(requests) print(f"Created batch file: {batch_file}")
Step 3: Upload and Submit Batch
def submit_batch(batch_file_path: str) -> str: """ Upload batch file and submit for processing. Returns batch ID for tracking. """ # Step 1: Upload file with open(batch_file_path, 'rb') as f: batch_input_file = client.files.create( file=f, purpose="batch" ) print(f"Uploaded file: {batch_input_file.id}") # Step 2: Create batch batch = client.batches.create( input_file_id=batch_input_file.id, endpoint="/v1/chat/completions", completion_window="24h", # Must complete within 24 hours metadata={ "description": "Document summary batch", "project": "content-pipeline" } ) print(f"Batch submitted: {batch.id}") print(f"Status: {batch.status}") return batch.id # Submit batch batch_id = submit_batch('batch_requests.jsonl')
Step 4: Monitor Batch Status
import time def check_batch_status(batch_id: str) -> dict: """ Check status of batch processing. Returns batch details including progress. """ batch = client.batches.retrieve(batch_id) return { 'id': batch.id, 'status': batch.status, # validating, in_progress, completed, failed 'created_at': batch.created_at, 'request_counts': batch.request_counts, 'output_file_id': batch.output_file_id if batch.status == 'completed' else None, 'error_file_id': batch.error_file_id if hasattr(batch, 'error_file_id') else None } def wait_for_batch(batch_id: str, check_interval: int = 60): """ Wait for batch to complete. Polls every check_interval seconds. """ print(f"Waiting for batch {batch_id} to complete...") while True: status = check_batch_status(batch_id) print(f"Status: {status['status']}") if status['status'] == 'completed': print(f"β Batch completed!") print(f" Total requests: {status['request_counts']['total']}") print(f" Completed: {status['request_counts']['completed']}") print(f" Failed: {status['request_counts']['failed']}") return status elif status['status'] == 'failed': print(f"β Batch failed!") return status elif status['status'] in ['validating', 'in_progress']: print(f" Progress: {status['request_counts'].get('completed', 0)}/{status['request_counts']['total']}") time.sleep(check_interval) else: print(f"Unknown status: {status['status']}") time.sleep(check_interval) # Wait for completion result = wait_for_batch(batch_id, check_interval=300) # Check every 5 minutes
Step 5: Retrieve Results
def download_batch_results(batch_id: str, output_filename: str = "batch_results.jsonl"): """ Download and parse batch results. Returns list of results. """ # Get batch details batch = client.batches.retrieve(batch_id) if batch.status != 'completed': raise Exception(f"Batch not completed yet. Status: {batch.status}") # Download output file file_response = client.files.content(batch.output_file_id) # Save to file with open(output_filename, 'wb') as f: f.write(file_response.content) # Parse results results = [] with open(output_filename, 'r') as f: for line in f: if line.strip(): result = json.loads(line) results.append(result) print(f"Downloaded {len(results)} results to {output_filename}") return results def process_batch_results(results: list) -> list: """ Extract responses from batch results. Returns cleaned results. """ processed = [] for result in results: if result.get('error'): # Handle error processed.append({ 'custom_id': result['custom_id'], 'status': 'error', 'error': result['error'] }) else: # Extract response response = result['response']['body']['choices'][0]['message']['content'] processed.append({ 'custom_id': result['custom_id'], 'status': 'success', 'response': response, 'usage': result['response']['body']['usage'] }) return processed # Download and process results results = download_batch_results(batch_id) processed_results = process_batch_results(results) # Save to database or process further for result in processed_results: if result['status'] == 'success': # Store result save_to_database(result['custom_id'], result['response']) else: # Log error log_error(result['custom_id'], result['error'])
Step 6: Production Batch Pipeline
Complete production-ready system:
import os from typing import List, Dict, Any from datetime import datetime import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BatchProcessor: """ Production-ready batch processing system. Features: - Automatic batching of queued requests - Progress monitoring - Error handling - Result storage - Cost tracking """ def __init__(self, client, batch_size: int = 50000): self.client = client self.batch_size = batch_size # Max requests per batch def queue_request(self, request_id: str, messages: list, **kwargs): """ Queue a request for batch processing. Stores in database with status 'queued'. """ # Store in database (pseudo-code) db.insert('batch_queue', { 'request_id': request_id, 'messages': json.dumps(messages), 'params': json.dumps(kwargs), 'status': 'queued', 'created_at': datetime.utcnow() }) logger.info(f"Queued request: {request_id}") def create_and_submit_batch(self) -> str: """ Create batch from queued requests and submit. Returns batch_id. """ # Get queued requests queued = db.query( 'batch_queue', {'status': 'queued'}, limit=self.batch_size ) if not queued: logger.info("No requests in queue") return None logger.info(f"Creating batch with {len(queued)} requests") # Create batch file batch_filename = f"batch_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.jsonl" with open(batch_filename, 'w') as f: for req in queued: batch_req = { "custom_id": req['request_id'], "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4o-mini", "messages": json.loads(req['messages']), **json.loads(req['params']) } } f.write(json.dumps(batch_req) + '\n') # Upload and submit with open(batch_filename, 'rb') as f: batch_input_file = self.client.files.create( file=f, purpose="batch" ) batch = self.client.batches.create( input_file_id=batch_input_file.id, endpoint="/v1/chat/completions", completion_window="24h" ) # Update database for req in queued: db.update( 'batch_queue', {'request_id': req['request_id']}, { 'status': 'processing', 'batch_id': batch.id, 'submitted_at': datetime.utcnow() } ) # Store batch metadata db.insert('batch_jobs', { 'batch_id': batch.id, 'request_count': len(queued), 'status': 'processing', 'created_at': datetime.utcnow() }) logger.info(f"Submitted batch: {batch.id}") # Clean up file os.remove(batch_filename) return batch.id def check_and_process_completed(self): """ Check all processing batches and handle completed ones. Run this periodically (e.g., every 5 minutes via cron). """ # Get processing batches processing = db.query('batch_jobs', {'status': 'processing'}) for job in processing: batch = self.client.batches.retrieve(job['batch_id']) if batch.status == 'completed': logger.info(f"Batch {batch.id} completed, processing results") # Download results file_response = self.client.files.content(batch.output_file_id) results = [] for line in file_response.content.decode().split('\n'): if line.strip(): results.append(json.loads(line)) # Process each result for result in results: custom_id = result['custom_id'] if result.get('error'): # Mark as failed db.update( 'batch_queue', {'request_id': custom_id}, { 'status': 'failed', 'error': json.dumps(result['error']), 'completed_at': datetime.utcnow() } ) else: # Extract response response = result['response']['body']['choices'][0]['message']['content'] usage = result['response']['body']['usage'] # Update database db.update( 'batch_queue', {'request_id': custom_id}, { 'status': 'completed', 'response': response, 'usage': json.dumps(usage), 'completed_at': datetime.utcnow() } ) # Update batch job db.update( 'batch_jobs', {'batch_id': batch.id}, { 'status': 'completed', 'completed_at': datetime.utcnow(), 'completed_count': batch.request_counts.completed, 'failed_count': batch.request_counts.failed } ) logger.info(f"Processed {len(results)} results for batch {batch.id}") elif batch.status == 'failed': logger.error(f"Batch {batch.id} failed") # Mark all requests as failed db.update_many( 'batch_queue', {'batch_id': batch.id}, { 'status': 'failed', 'error': 'Batch processing failed' } ) db.update( 'batch_jobs', {'batch_id': batch.id}, {'status': 'failed'} ) def get_request_status(self, request_id: str) -> dict: """Check status of a queued request""" return db.query_one('batch_queue', {'request_id': request_id}) def get_stats(self) -> dict: """Get batch processing statistics""" return { 'queued': db.count('batch_queue', {'status': 'queued'}), 'processing': db.count('batch_queue', {'status': 'processing'}), 'completed': db.count('batch_queue', {'status': 'completed'}), 'failed': db.count('batch_queue', {'status': 'failed'}) } # Usage batch_processor = BatchProcessor(client) # Queue requests for doc in documents: batch_processor.queue_request( request_id=doc['id'], messages=[ {"role": "system", "content": "Summarize this document."}, {"role": "user", "content": doc['text']} ], max_tokens=500 ) # Submit batch (run once per day or when queue reaches threshold) batch_id = batch_processor.create_and_submit_batch() # Check completed (run every 5 minutes via cron) batch_processor.check_and_process_completed() # Check stats stats = batch_processor.get_stats() print(f"Queue status: {stats}")
Scheduling Strategy
Cron Jobs for Automation:
# crontab -e # Submit new batch every night at 2 AM 0 2 * * * /usr/bin/python3 /app/submit_batch.py # Check for completed batches every 5 minutes */5 * * * * /usr/bin/python3 /app/check_batches.py # Generate daily batch report 0 8 * * * /usr/bin/python3 /app/batch_report.py
Celery Tasks (Alternative):
from celery import Celery from celery.schedules import crontab app = Celery('batch_tasks') @app.task def submit_daily_batch(): """Submit queued requests as batch""" batch_processor = BatchProcessor(client) batch_id = batch_processor.create_and_submit_batch() return batch_id @app.task def check_completed_batches(): """Check and process completed batches""" batch_processor = BatchProcessor(client) batch_processor.check_and_process_completed() # Schedule app.conf.beat_schedule = { 'submit-daily-batch': { 'task': 'batch_tasks.submit_daily_batch', 'schedule': crontab(hour=2, minute=0), # 2 AM daily }, 'check-completed': { 'task': 'batch_tasks.check_completed_batches', 'schedule': crontab(minute='*/5'), # Every 5 minutes }, }
Cost Tracking
def calculate_batch_savings(batch_id: str) -> dict: """ Calculate cost savings from using batch API. Returns comparison with real-time processing. """ # Get batch results batch = client.batches.retrieve(batch_id) # Get usage from results results = download_batch_results(batch_id) total_input_tokens = 0 total_output_tokens = 0 for result in results: if result['status'] == 'success': usage = result['usage'] total_input_tokens += usage['prompt_tokens'] total_output_tokens += usage['completion_tokens'] # Calculate costs # GPT-4o-mini pricing input_cost_per_1k = 0.00015 output_cost_per_1k = 0.0006 # Batch: 50% discount batch_input_cost = (total_input_tokens / 1000) * input_cost_per_1k * 0.5 batch_output_cost = (total_output_tokens / 1000) * output_cost_per_1k * 0.5 batch_total = batch_input_cost + batch_output_cost # Real-time: Full price realtime_input_cost = (total_input_tokens / 1000) * input_cost_per_1k realtime_output_cost = (total_output_tokens / 1000) * output_cost_per_1k realtime_total = realtime_input_cost + realtime_output_cost savings = realtime_total - batch_total savings_pct = (savings / realtime_total * 100) if realtime_total > 0 else 0 return { 'batch_cost': batch_total, 'realtime_cost': realtime_total, 'savings': savings, 'savings_pct': savings_pct, 'total_requests': len(results), 'input_tokens': total_input_tokens, 'output_tokens': total_output_tokens } # Calculate savings savings = calculate_batch_savings(batch_id) print(f""" Batch Cost Analysis: -------------------- Batch cost: ${savings['batch_cost']:.2f} Real-time cost: ${savings['realtime_cost']:.2f} Savings: ${savings['savings']:.2f} ({savings['savings_pct']:.1f}%) Total requests: {savings['total_requests']:,} """)
Expected Results
Real-World Performance:
Content Pipeline (1,000 articles/day):
- Real-time cost: $200/day
- Batch cost: $100/day
- Savings: $3,000/month
Data Enrichment (50K records/day):
- Real-time cost: $300/day
- Batch cost: $150/day
- Savings: $4,500/month
Sentiment Analysis (100K reviews/month):
- Real-time cost: $500/month
- Batch cost: $250/month
- Savings: $250/month
Troubleshooting
Issue: Batch stuck in "validating" status
Cause: Malformed JSONL file
Solution: Validate file format
def validate_batch_file(filename: str): """Validate JSONL format""" with open(filename, 'r') as f: for i, line in enumerate(f, 1): try: json.loads(line) except json.JSONDecodeError as e: print(f"Error on line {i}: {e}") return False return True
Issue: Some requests failing
Cause: Token limits or invalid requests
Solution: Check error file
if batch.error_file_id: errors = client.files.content(batch.error_file_id) print(errors.content.decode())
Production Checklist
- Batch workloads identified
- Queue system implemented
- Automated submission scheduled
- Completion checker running
- Error handling in place
- Cost tracking implemented
- Monitoring/alerting set up
- Results stored properly
- Tested with production data
Next Steps
- Week 1: Identify batch workloads
- Week 2: Implement queue system
- Week 3: Test with 10% of workload
- Week 4: Roll out to 100%, monitor savings
Additional Resources
- OpenAI Batch API: https://platform.openai.com/docs/guides/batch
- Batch Pricing: https://openai.com/pricing
- Best Practices: https://platform.openai.com/docs/guides/production-best-practices
Support
Need help with batch processing?
- Onaro Support: support@onaro.io
- Book implementation call: https://onaro.io/support
Estimated Implementation Time: 2-3 hours
Difficulty: βββββ (3/5)
Impact: ππππβ (4/5 - 50% savings on batch workloads)
Last Updated: January 26, 2026
Tested with: OpenAI SDK 1.12.0, Batch API v1