Batch Processing
Process multiple requests together to reduce costs by 50%
Best For: Applications with bulk processing needs
Batch Processing Implementation Guide
Process Non-Urgent AI Requests at 50% Discount
Difficulty: Beginner to Intermediate
Time Required: 2-3 hours
Potential Savings: $1,500-6,000/month (50% reduction on batch workloads)
Best For: Background jobs, data processing, content generation pipelines
What is Batch Processing?
Traditional (Synchronous) Processing:
Process 1,000 documents immediately
β
Pay full price: $100
Wait for each to complete: 30 minutes
Batch Processing (24-hour window):
Queue 1,000 documents for batch processing
β
Pay 50% less: $50
Results delivered within 24 hours
Trade-off: Wait up to 24 hours, save 50% on costs.
Why You Need This
Cost Comparison:
| Use Case | Real-Time | Batch | Savings |
|---|---|---|---|
| Classify 10K documents | $50 | $25 | 50% |
| Generate summaries (1K articles) | $200 | $100 | 50% |
| Sentiment analysis (100K reviews) | $500 | $250 | 50% |
| Data enrichment (50K records) | $300 | $150 | 50% |
When to Use Batch:
β Good for Batch:
- Nightly data processing
- Content moderation queue
- Bulk document analysis
- Email campaign generation
- Analytics reports
- Data enrichment
- SEO content generation
β Not for Batch:
- User-facing chatbots
- Real-time search
- Live customer support
- Interactive applications
- Anything time-sensitive
Prerequisites
Before implementing:
- OpenAI API access (Batch API available)
- Identify batch-suitable workloads
- File storage (local or S3/R2)
- Python 3.8+ (for examples)
- Understanding of async job processing
Batch API Availability:
- OpenAI: β Batch API available
- Anthropic: β No batch API yet
- Azure OpenAI: β Batch available
Implementation Steps
Step 1: Identify Batch Workloads
Audit your current usage:
def analyze_batch_potential(usage_logs): """ Analyze which workloads could be batched. Returns estimation of potential savings. """ batch_candidates = { 'document_processing': 0, 'content_generation': 0, 'data_enrichment': 0, 'sentiment_analysis': 0, 'other': 0 } for log in usage_logs: # Check if request is time-sensitive if log.get('priority') == 'low' or log.get('latency_tolerance') > 3600: # This could be batched category = classify_request_type(log) batch_candidates[category] += log.get('cost', 0) total_batchable = sum(batch_candidates.values()) potential_savings = total_batchable * 0.5 # 50% discount return { 'batchable_cost': total_batchable, 'potential_savings': potential_savings, 'by_category': batch_candidates } # Example analysis = analyze_batch_potential(your_usage_logs) print(f"Potential monthly savings: ${analysis['potential_savings']:,.2f}")
Step 2: Create Batch Requests (OpenAI)
Basic Batch Request:
from openai import OpenAI import json client = OpenAI() def create_batch_file(requests: list, filename: str = "batch_requests.jsonl"): """ Create JSONL file with batch requests. Each line is a separate request in JSON format. """ with open(filename, 'w') as f: for i, request in enumerate(requests): # Format as required by OpenAI Batch API batch_request = { "custom_id": f"request-{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4o-mini", "messages": request['messages'], "max_tokens": request.get('max_tokens', 1000) } } f.write(json.dumps(batch_request) + '\n') return filename # Example: Create batch of 1000 document summaries requests = [] for doc in documents[:1000]: requests.append({ 'messages': [ {"role": "system", "content": "Summarize the following document concisely."}, {"role": "user", "content": doc['text']} ], 'max_tokens': 500 }) batch_file = create_batch_file(requests) print(f"Created batch file: {batch_file}")
Step 3: Upload and Submit Batch
def submit_batch(batch_file_path: str) -> str: """ Upload batch file and submit for processing. Returns batch ID for tracking. """ # Step 1: Upload file with open(batch_file_path, 'rb') as f: batch_input_file = client.files.create( file=f, purpose="batch" ) print(f"Uploaded file: {batch_input_file.id}") # Step 2: Create batch batch = client.batches.create( input_file_id=batch_input_file.id, endpoint="/v1/chat/completions", completion_window="24h", # Must complete within 24 hours metadata={ "description": "Document summary batch", "project": "content-pipeline" } ) print(f"Batch submitted: {batch.id}") print(f"Status: {batch.status}") return batch.id # Submit batch batch_id = submit_batch('batch_requests.jsonl')
Step 4: Monitor Batch Status
import time def check_batch_status(batch_id: str) -> dict: """ Check status of batch processing. Returns batch details including progress. """ batch = client.batches.retrieve(batch_id) return { 'id': batch.id, 'status': batch.status, # validating, in_progress, completed, failed 'created_at': batch.created_at, 'request_counts': batch.request_counts, 'output_file_id': batch.output_file_id if batch.status == 'completed' else None, 'error_file_id': batch.error_file_id if hasattr(batch, 'error_file_id') else None } def wait_for_batch(batch_id: str, check_interval: int = 60): """ Wait for batch to complete. Polls every check_interval seconds. """ print(f"Waiting for batch {batch_id} to complete...") while True: status = check_batch_status(batch_id) print(f"Status: {status['status']}") if status['status'] == 'completed': print(f"β Batch completed!") print(f" Total requests: {status['request_counts']['total']}") print(f" Completed: {status['request_counts']['completed']}") print(f" Failed: {status['request_counts']['failed']}") return status elif status['status'] == 'failed': print(f"β Batch failed!") return status elif status['status'] in ['validating', 'in_progress']: print(f" Progress: {status['request_counts'].get('completed', 0)}/{status['request_counts']['total']}") time.sleep(check_interval) else: print(f"Unknown status: {status['status']}") time.sleep(check_interval) # Wait for completion result = wait_for_batch(batch_id, check_interval=300) # Check every 5 minutes
Step 5: Retrieve Results
def download_batch_results(batch_id: str, output_filename: str = "batch_results.jsonl"): """ Download and parse batch results. Returns list of results. """ # Get batch details batch = client.batches.retrieve(batch_id) if batch.status != 'completed': raise Exception(f"Batch not completed yet. Status: {batch.status}") # Download output file file_response = client.files.content(batch.output_file_id) # Save to file with open(output_filename, 'wb') as f: f.write(file_response.content) # Parse results results = [] with open(output_filename, 'r') as f: for line in f: if line.strip(): result = json.loads(line) results.append(result) print(f"Downloaded {len(results)} results to {output_filename}") return results def process_batch_results(results: list) -> list: """ Extract responses from batch results. Returns cleaned results. """ processed = [] for result in results: if result.get('error'): # Handle error processed.append({ 'custom_id': result['custom_id'], 'status': 'error', 'error': result['error'] }) else: # Extract response response = result['response']['body']['choices'][0]['message']['content'] processed.append({ 'custom_id': result['custom_id'], 'status': 'success', 'response': response, 'usage': result['response']['body']['usage'] }) return processed # Download and process results results = download_batch_results(batch_id) processed_results = process_batch_results(results) # Save to database or process further for result in processed_results: if result['status'] == 'success': # Store result save_to_database(result['custom_id'], result['response']) else: # Log error log_error(result['custom_id'], result['error'])
Step 6: Production Batch Pipeline
Complete production-ready system:
import os from typing import List, Dict, Any from datetime import datetime import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BatchProcessor: """ Production-ready batch processing system. Features: - Automatic batching of queued requests - Progress monitoring - Error handling - Result storage - Cost tracking """ def __init__(self, client, batch_size: int = 50000): self.client = client self.batch_size = batch_size # Max requests per batch def queue_request(self, request_id: str, messages: list, **kwargs): """ Queue a request for batch processing. Stores in database with status 'queued'. """ # Store in database (pseudo-code) db.insert('batch_queue', { 'request_id': request_id, 'messages': json.dumps(messages), 'params': json.dumps(kwargs), 'status': 'queued', 'created_at': datetime.utcnow() }) logger.info(f"Queued request: {request_id}") def create_and_submit_batch(self) -> str: """ Create batch from queued requests and submit. Returns batch_id. """ # Get queued requests queued = db.query( 'batch_queue', {'status': 'queued'}, limit=self.batch_size ) if not queued: logger.info("No requests in queue") return None logger.info(f"Creating batch with {len(queued)} requests") # Create batch file batch_filename = f"batch_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.jsonl" with open(batch_filename, 'w') as f: for req in queued: batch_req = { "custom_id": req['request_id'], "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4o-mini", "messages": json.loads(req['messages']), **json.loads(req['params']) } } f.write(json.dumps(batch_req) + '\n') # Upload and submit with open(batch_filename, 'rb') as f: batch_input_file = self.client.files.create( file=f, purpose="batch" ) batch = self.client.batches.create( input_file_id=batch_input_file.id, endpoint="/v1/chat/completions", completion_window="24h" ) # Update database for req in queued: db.update( 'batch_queue', {'request_id': req['request_id']}, { 'status': 'processing', 'batch_id': batch.id, 'submitted_at': datetime.utcnow() } ) # Store batch metadata db.insert('batch_jobs', { 'batch_id': batch.id, 'request_count': len(queued), 'status': 'processing', 'created_at': datetime.utcnow() }) logger.info(f"Submitted batch: {batch.id}") # Clean up file os.remove(batch_filename) return batch.id def check_and_process_completed(self): """ Check all processing batches and handle completed ones. Run this periodically (e.g., every 5 minutes via cron). """ # Get processing batches processing = db.query('batch_jobs', {'status': 'processing'}) for job in processing: batch = self.client.batches.retrieve(job['batch_id']) if batch.status == 'completed': logger.info(f"Batch {batch.id} completed, processing results") # Download results file_response = self.client.files.content(batch.output_file_id) results = [] for line in file_response.content.decode().split('\n'): if line.strip(): results.append(json.loads(line)) # Process each result for result in results: custom_id = result['custom_id'] if result.get('error'): # Mark as failed db.update( 'batch_queue', {'request_id': custom_id}, { 'status': 'failed', 'error': json.dumps(result['error']), 'completed_at': datetime.utcnow() } ) else: # Extract response response = result['response']['body']['choices'][0]['message']['content'] usage = result['response']['body']['usage'] # Update database db.update( 'batch_queue', {'request_id': custom_id}, { 'status': 'completed', 'response': response, 'usage': json.dumps(usage), 'completed_at': datetime.utcnow() } ) # Update batch job db.update( 'batch_jobs', {'batch_id': batch.id}, { 'status': 'completed', 'completed_at': datetime.utcnow(), 'completed_count': batch.request_counts.completed, 'failed_count': batch.request_counts.failed } ) logger.info(f"Processed {len(results)} results for batch {batch.id}") elif batch.status == 'failed': logger.error(f"Batch {batch.id} failed") # Mark all requests as failed db.update_many( 'batch_queue', {'batch_id': batch.id}, { 'status': 'failed', 'error': 'Batch processing failed' } ) db.update( 'batch_jobs', {'batch_id': batch.id}, {'status': 'failed'} ) def get_request_status(self, request_id: str) -> dict: """Check status of a queued request""" return db.query_one('batch_queue', {'request_id': request_id}) def get_stats(self) -> dict: """Get batch processing statistics""" return { 'queued': db.count('batch_queue', {'status': 'queued'}), 'processing': db.count('batch_queue', {'status': 'processing'}), 'completed': db.count('batch_queue', {'status': 'completed'}), 'failed': db.count('batch_queue', {'status': 'failed'}) } # Usage batch_processor = BatchProcessor(client) # Queue requests for doc in documents: batch_processor.queue_request( request_id=doc['id'], messages=[ {"role": "system", "content": "Summarize this document."}, {"role": "user", "content": doc['text']} ], max_tokens=500 ) # Submit batch (run once per day or when queue reaches threshold) batch_id = batch_processor.create_and_submit_batch() # Check completed (run every 5 minutes via cron) batch_processor.check_and_process_completed() # Check stats stats = batch_processor.get_stats() print(f"Queue status: {stats}")
Scheduling Strategy
Cron Jobs for Automation:
# crontab -e # Submit new batch every night at 2 AM 0 2 * * * /usr/bin/python3 /app/submit_batch.py # Check for completed batches every 5 minutes */5 * * * * /usr/bin/python3 /app/check_batches.py # Generate daily batch report 0 8 * * * /usr/bin/python3 /app/batch_report.py
Celery Tasks (Alternative):
from celery import Celery from celery.schedules import crontab app = Celery('batch_tasks') @app.task def submit_daily_batch(): """Submit queued requests as batch""" batch_processor = BatchProcessor(client) batch_id = batch_processor.create_and_submit_batch() return batch_id @app.task def check_completed_batches(): """Check and process completed batches""" batch_processor = BatchProcessor(client) batch_processor.check_and_process_completed() # Schedule app.conf.beat_schedule = { 'submit-daily-batch': { 'task': 'batch_tasks.submit_daily_batch', 'schedule': crontab(hour=2, minute=0), # 2 AM daily }, 'check-completed': { 'task': 'batch_tasks.check_completed_batches', 'schedule': crontab(minute='*/5'), # Every 5 minutes }, }
Cost Tracking
def calculate_batch_savings(batch_id: str) -> dict: """ Calculate cost savings from using batch API. Returns comparison with real-time processing. """ # Get batch results batch = client.batches.retrieve(batch_id) # Get usage from results results = download_batch_results(batch_id) total_input_tokens = 0 total_output_tokens = 0 for result in results: if result['status'] == 'success': usage = result['usage'] total_input_tokens += usage['prompt_tokens'] total_output_tokens += usage['completion_tokens'] # Calculate costs # GPT-4o-mini pricing input_cost_per_1k = 0.00015 output_cost_per_1k = 0.0006 # Batch: 50% discount batch_input_cost = (total_input_tokens / 1000) * input_cost_per_1k * 0.5 batch_output_cost = (total_output_tokens / 1000) * output_cost_per_1k * 0.5 batch_total = batch_input_cost + batch_output_cost # Real-time: Full price realtime_input_cost = (total_input_tokens / 1000) * input_cost_per_1k realtime_output_cost = (total_output_tokens / 1000) * output_cost_per_1k realtime_total = realtime_input_cost + realtime_output_cost savings = realtime_total - batch_total savings_pct = (savings / realtime_total * 100) if realtime_total > 0 else 0 return { 'batch_cost': batch_total, 'realtime_cost': realtime_total, 'savings': savings, 'savings_pct': savings_pct, 'total_requests': len(results), 'input_tokens': total_input_tokens, 'output_tokens': total_output_tokens } # Calculate savings savings = calculate_batch_savings(batch_id) print(f""" Batch Cost Analysis: -------------------- Batch cost: ${savings['batch_cost']:.2f} Real-time cost: ${savings['realtime_cost']:.2f} Savings: ${savings['savings']:.2f} ({savings['savings_pct']:.1f}%) Total requests: {savings['total_requests']:,} """)
Expected Results
Real-World Performance:
Content Pipeline (1,000 articles/day):
- Real-time cost: $200/day
- Batch cost: $100/day
- Savings: $3,000/month
Data Enrichment (50K records/day):
- Real-time cost: $300/day
- Batch cost: $150/day
- Savings: $4,500/month
Sentiment Analysis (100K reviews/month):
- Real-time cost: $500/month
- Batch cost: $250/month
- Savings: $250/month
Troubleshooting
Issue: Batch stuck in "validating" status
Cause: Malformed JSONL file
Solution: Validate file format
def validate_batch_file(filename: str): """Validate JSONL format""" with open(filename, 'r') as f: for i, line in enumerate(f, 1): try: json.loads(line) except json.JSONDecodeError as e: print(f"Error on line {i}: {e}") return False return True
Issue: Some requests failing
Cause: Token limits or invalid requests
Solution: Check error file
if batch.error_file_id: errors = client.files.content(batch.error_file_id) print(errors.content.decode())
Production Checklist
- Batch workloads identified
- Queue system implemented
- Automated submission scheduled
- Completion checker running
- Error handling in place
- Cost tracking implemented
- Monitoring/alerting set up
- Results stored properly
- Tested with production data
Next Steps
- Week 1: Identify batch workloads
- Week 2: Implement queue system
- Week 3: Test with 10% of workload
- Week 4: Roll out to 100%, monitor savings
Additional Resources
- OpenAI Batch API: https://platform.openai.com/docs/guides/batch
- Batch Pricing: https://openai.com/pricing
- Best Practices: https://platform.openai.com/docs/guides/production-best-practices
Support
Need help with batch processing?
- Onaro Support: support@onaro.io
- Book implementation call: https://onaro.io/support
Estimated Implementation Time: 2-3 hours
Difficulty: βββββ (3/5)
Impact: ππππβ (4/5 - 50% savings on batch workloads)
Last Updated: January 26, 2026
Tested with: OpenAI SDK 1.12.0, Batch API v1