πŸ“Š

Batch Processing

Process multiple requests together to reduce costs by 50%

Time: 3-4 hoursDifficulty: IntermediatePotential Savings: $500-2,500/month

Best For: Applications with bulk processing needs

Batch Processing Implementation Guide

Process Non-Urgent AI Requests at 50% Discount

Difficulty: Beginner to Intermediate
Time Required: 2-3 hours
Potential Savings: $1,500-6,000/month (50% reduction on batch workloads)
Best For: Background jobs, data processing, content generation pipelines


What is Batch Processing?

Traditional (Synchronous) Processing:

Process 1,000 documents immediately
    ↓
Pay full price: $100
Wait for each to complete: 30 minutes

Batch Processing (24-hour window):

Queue 1,000 documents for batch processing
    ↓
Pay 50% less: $50
Results delivered within 24 hours

Trade-off: Wait up to 24 hours, save 50% on costs.


Why You Need This

Cost Comparison:

Use CaseReal-TimeBatchSavings
Classify 10K documents$50$2550%
Generate summaries (1K articles)$200$10050%
Sentiment analysis (100K reviews)$500$25050%
Data enrichment (50K records)$300$15050%

When to Use Batch:

βœ… Good for Batch:

  • Nightly data processing
  • Content moderation queue
  • Bulk document analysis
  • Email campaign generation
  • Analytics reports
  • Data enrichment
  • SEO content generation

❌ Not for Batch:

  • User-facing chatbots
  • Real-time search
  • Live customer support
  • Interactive applications
  • Anything time-sensitive

Prerequisites

Before implementing:

  • OpenAI API access (Batch API available)
  • Identify batch-suitable workloads
  • File storage (local or S3/R2)
  • Python 3.8+ (for examples)
  • Understanding of async job processing

Batch API Availability:

  • OpenAI: βœ… Batch API available
  • Anthropic: ❌ No batch API yet
  • Azure OpenAI: βœ… Batch available

Implementation Steps

Step 1: Identify Batch Workloads

Audit your current usage:

def analyze_batch_potential(usage_logs): """ Analyze which workloads could be batched. Returns estimation of potential savings. """ batch_candidates = { 'document_processing': 0, 'content_generation': 0, 'data_enrichment': 0, 'sentiment_analysis': 0, 'other': 0 } for log in usage_logs: # Check if request is time-sensitive if log.get('priority') == 'low' or log.get('latency_tolerance') > 3600: # This could be batched category = classify_request_type(log) batch_candidates[category] += log.get('cost', 0) total_batchable = sum(batch_candidates.values()) potential_savings = total_batchable * 0.5 # 50% discount return { 'batchable_cost': total_batchable, 'potential_savings': potential_savings, 'by_category': batch_candidates } # Example analysis = analyze_batch_potential(your_usage_logs) print(f"Potential monthly savings: ${analysis['potential_savings']:,.2f}")

Step 2: Create Batch Requests (OpenAI)

Basic Batch Request:

from openai import OpenAI import json client = OpenAI() def create_batch_file(requests: list, filename: str = "batch_requests.jsonl"): """ Create JSONL file with batch requests. Each line is a separate request in JSON format. """ with open(filename, 'w') as f: for i, request in enumerate(requests): # Format as required by OpenAI Batch API batch_request = { "custom_id": f"request-{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4o-mini", "messages": request['messages'], "max_tokens": request.get('max_tokens', 1000) } } f.write(json.dumps(batch_request) + '\n') return filename # Example: Create batch of 1000 document summaries requests = [] for doc in documents[:1000]: requests.append({ 'messages': [ {"role": "system", "content": "Summarize the following document concisely."}, {"role": "user", "content": doc['text']} ], 'max_tokens': 500 }) batch_file = create_batch_file(requests) print(f"Created batch file: {batch_file}")

Step 3: Upload and Submit Batch

def submit_batch(batch_file_path: str) -> str: """ Upload batch file and submit for processing. Returns batch ID for tracking. """ # Step 1: Upload file with open(batch_file_path, 'rb') as f: batch_input_file = client.files.create( file=f, purpose="batch" ) print(f"Uploaded file: {batch_input_file.id}") # Step 2: Create batch batch = client.batches.create( input_file_id=batch_input_file.id, endpoint="/v1/chat/completions", completion_window="24h", # Must complete within 24 hours metadata={ "description": "Document summary batch", "project": "content-pipeline" } ) print(f"Batch submitted: {batch.id}") print(f"Status: {batch.status}") return batch.id # Submit batch batch_id = submit_batch('batch_requests.jsonl')

Step 4: Monitor Batch Status

import time def check_batch_status(batch_id: str) -> dict: """ Check status of batch processing. Returns batch details including progress. """ batch = client.batches.retrieve(batch_id) return { 'id': batch.id, 'status': batch.status, # validating, in_progress, completed, failed 'created_at': batch.created_at, 'request_counts': batch.request_counts, 'output_file_id': batch.output_file_id if batch.status == 'completed' else None, 'error_file_id': batch.error_file_id if hasattr(batch, 'error_file_id') else None } def wait_for_batch(batch_id: str, check_interval: int = 60): """ Wait for batch to complete. Polls every check_interval seconds. """ print(f"Waiting for batch {batch_id} to complete...") while True: status = check_batch_status(batch_id) print(f"Status: {status['status']}") if status['status'] == 'completed': print(f"βœ“ Batch completed!") print(f" Total requests: {status['request_counts']['total']}") print(f" Completed: {status['request_counts']['completed']}") print(f" Failed: {status['request_counts']['failed']}") return status elif status['status'] == 'failed': print(f"βœ— Batch failed!") return status elif status['status'] in ['validating', 'in_progress']: print(f" Progress: {status['request_counts'].get('completed', 0)}/{status['request_counts']['total']}") time.sleep(check_interval) else: print(f"Unknown status: {status['status']}") time.sleep(check_interval) # Wait for completion result = wait_for_batch(batch_id, check_interval=300) # Check every 5 minutes

Step 5: Retrieve Results

def download_batch_results(batch_id: str, output_filename: str = "batch_results.jsonl"): """ Download and parse batch results. Returns list of results. """ # Get batch details batch = client.batches.retrieve(batch_id) if batch.status != 'completed': raise Exception(f"Batch not completed yet. Status: {batch.status}") # Download output file file_response = client.files.content(batch.output_file_id) # Save to file with open(output_filename, 'wb') as f: f.write(file_response.content) # Parse results results = [] with open(output_filename, 'r') as f: for line in f: if line.strip(): result = json.loads(line) results.append(result) print(f"Downloaded {len(results)} results to {output_filename}") return results def process_batch_results(results: list) -> list: """ Extract responses from batch results. Returns cleaned results. """ processed = [] for result in results: if result.get('error'): # Handle error processed.append({ 'custom_id': result['custom_id'], 'status': 'error', 'error': result['error'] }) else: # Extract response response = result['response']['body']['choices'][0]['message']['content'] processed.append({ 'custom_id': result['custom_id'], 'status': 'success', 'response': response, 'usage': result['response']['body']['usage'] }) return processed # Download and process results results = download_batch_results(batch_id) processed_results = process_batch_results(results) # Save to database or process further for result in processed_results: if result['status'] == 'success': # Store result save_to_database(result['custom_id'], result['response']) else: # Log error log_error(result['custom_id'], result['error'])

Step 6: Production Batch Pipeline

Complete production-ready system:

import os from typing import List, Dict, Any from datetime import datetime import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BatchProcessor: """ Production-ready batch processing system. Features: - Automatic batching of queued requests - Progress monitoring - Error handling - Result storage - Cost tracking """ def __init__(self, client, batch_size: int = 50000): self.client = client self.batch_size = batch_size # Max requests per batch def queue_request(self, request_id: str, messages: list, **kwargs): """ Queue a request for batch processing. Stores in database with status 'queued'. """ # Store in database (pseudo-code) db.insert('batch_queue', { 'request_id': request_id, 'messages': json.dumps(messages), 'params': json.dumps(kwargs), 'status': 'queued', 'created_at': datetime.utcnow() }) logger.info(f"Queued request: {request_id}") def create_and_submit_batch(self) -> str: """ Create batch from queued requests and submit. Returns batch_id. """ # Get queued requests queued = db.query( 'batch_queue', {'status': 'queued'}, limit=self.batch_size ) if not queued: logger.info("No requests in queue") return None logger.info(f"Creating batch with {len(queued)} requests") # Create batch file batch_filename = f"batch_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.jsonl" with open(batch_filename, 'w') as f: for req in queued: batch_req = { "custom_id": req['request_id'], "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4o-mini", "messages": json.loads(req['messages']), **json.loads(req['params']) } } f.write(json.dumps(batch_req) + '\n') # Upload and submit with open(batch_filename, 'rb') as f: batch_input_file = self.client.files.create( file=f, purpose="batch" ) batch = self.client.batches.create( input_file_id=batch_input_file.id, endpoint="/v1/chat/completions", completion_window="24h" ) # Update database for req in queued: db.update( 'batch_queue', {'request_id': req['request_id']}, { 'status': 'processing', 'batch_id': batch.id, 'submitted_at': datetime.utcnow() } ) # Store batch metadata db.insert('batch_jobs', { 'batch_id': batch.id, 'request_count': len(queued), 'status': 'processing', 'created_at': datetime.utcnow() }) logger.info(f"Submitted batch: {batch.id}") # Clean up file os.remove(batch_filename) return batch.id def check_and_process_completed(self): """ Check all processing batches and handle completed ones. Run this periodically (e.g., every 5 minutes via cron). """ # Get processing batches processing = db.query('batch_jobs', {'status': 'processing'}) for job in processing: batch = self.client.batches.retrieve(job['batch_id']) if batch.status == 'completed': logger.info(f"Batch {batch.id} completed, processing results") # Download results file_response = self.client.files.content(batch.output_file_id) results = [] for line in file_response.content.decode().split('\n'): if line.strip(): results.append(json.loads(line)) # Process each result for result in results: custom_id = result['custom_id'] if result.get('error'): # Mark as failed db.update( 'batch_queue', {'request_id': custom_id}, { 'status': 'failed', 'error': json.dumps(result['error']), 'completed_at': datetime.utcnow() } ) else: # Extract response response = result['response']['body']['choices'][0]['message']['content'] usage = result['response']['body']['usage'] # Update database db.update( 'batch_queue', {'request_id': custom_id}, { 'status': 'completed', 'response': response, 'usage': json.dumps(usage), 'completed_at': datetime.utcnow() } ) # Update batch job db.update( 'batch_jobs', {'batch_id': batch.id}, { 'status': 'completed', 'completed_at': datetime.utcnow(), 'completed_count': batch.request_counts.completed, 'failed_count': batch.request_counts.failed } ) logger.info(f"Processed {len(results)} results for batch {batch.id}") elif batch.status == 'failed': logger.error(f"Batch {batch.id} failed") # Mark all requests as failed db.update_many( 'batch_queue', {'batch_id': batch.id}, { 'status': 'failed', 'error': 'Batch processing failed' } ) db.update( 'batch_jobs', {'batch_id': batch.id}, {'status': 'failed'} ) def get_request_status(self, request_id: str) -> dict: """Check status of a queued request""" return db.query_one('batch_queue', {'request_id': request_id}) def get_stats(self) -> dict: """Get batch processing statistics""" return { 'queued': db.count('batch_queue', {'status': 'queued'}), 'processing': db.count('batch_queue', {'status': 'processing'}), 'completed': db.count('batch_queue', {'status': 'completed'}), 'failed': db.count('batch_queue', {'status': 'failed'}) } # Usage batch_processor = BatchProcessor(client) # Queue requests for doc in documents: batch_processor.queue_request( request_id=doc['id'], messages=[ {"role": "system", "content": "Summarize this document."}, {"role": "user", "content": doc['text']} ], max_tokens=500 ) # Submit batch (run once per day or when queue reaches threshold) batch_id = batch_processor.create_and_submit_batch() # Check completed (run every 5 minutes via cron) batch_processor.check_and_process_completed() # Check stats stats = batch_processor.get_stats() print(f"Queue status: {stats}")

Scheduling Strategy

Cron Jobs for Automation:

# crontab -e # Submit new batch every night at 2 AM 0 2 * * * /usr/bin/python3 /app/submit_batch.py # Check for completed batches every 5 minutes */5 * * * * /usr/bin/python3 /app/check_batches.py # Generate daily batch report 0 8 * * * /usr/bin/python3 /app/batch_report.py

Celery Tasks (Alternative):

from celery import Celery from celery.schedules import crontab app = Celery('batch_tasks') @app.task def submit_daily_batch(): """Submit queued requests as batch""" batch_processor = BatchProcessor(client) batch_id = batch_processor.create_and_submit_batch() return batch_id @app.task def check_completed_batches(): """Check and process completed batches""" batch_processor = BatchProcessor(client) batch_processor.check_and_process_completed() # Schedule app.conf.beat_schedule = { 'submit-daily-batch': { 'task': 'batch_tasks.submit_daily_batch', 'schedule': crontab(hour=2, minute=0), # 2 AM daily }, 'check-completed': { 'task': 'batch_tasks.check_completed_batches', 'schedule': crontab(minute='*/5'), # Every 5 minutes }, }

Cost Tracking

def calculate_batch_savings(batch_id: str) -> dict: """ Calculate cost savings from using batch API. Returns comparison with real-time processing. """ # Get batch results batch = client.batches.retrieve(batch_id) # Get usage from results results = download_batch_results(batch_id) total_input_tokens = 0 total_output_tokens = 0 for result in results: if result['status'] == 'success': usage = result['usage'] total_input_tokens += usage['prompt_tokens'] total_output_tokens += usage['completion_tokens'] # Calculate costs # GPT-4o-mini pricing input_cost_per_1k = 0.00015 output_cost_per_1k = 0.0006 # Batch: 50% discount batch_input_cost = (total_input_tokens / 1000) * input_cost_per_1k * 0.5 batch_output_cost = (total_output_tokens / 1000) * output_cost_per_1k * 0.5 batch_total = batch_input_cost + batch_output_cost # Real-time: Full price realtime_input_cost = (total_input_tokens / 1000) * input_cost_per_1k realtime_output_cost = (total_output_tokens / 1000) * output_cost_per_1k realtime_total = realtime_input_cost + realtime_output_cost savings = realtime_total - batch_total savings_pct = (savings / realtime_total * 100) if realtime_total > 0 else 0 return { 'batch_cost': batch_total, 'realtime_cost': realtime_total, 'savings': savings, 'savings_pct': savings_pct, 'total_requests': len(results), 'input_tokens': total_input_tokens, 'output_tokens': total_output_tokens } # Calculate savings savings = calculate_batch_savings(batch_id) print(f""" Batch Cost Analysis: -------------------- Batch cost: ${savings['batch_cost']:.2f} Real-time cost: ${savings['realtime_cost']:.2f} Savings: ${savings['savings']:.2f} ({savings['savings_pct']:.1f}%) Total requests: {savings['total_requests']:,} """)

Expected Results

Real-World Performance:

Content Pipeline (1,000 articles/day):

  • Real-time cost: $200/day
  • Batch cost: $100/day
  • Savings: $3,000/month

Data Enrichment (50K records/day):

  • Real-time cost: $300/day
  • Batch cost: $150/day
  • Savings: $4,500/month

Sentiment Analysis (100K reviews/month):

  • Real-time cost: $500/month
  • Batch cost: $250/month
  • Savings: $250/month

Troubleshooting

Issue: Batch stuck in "validating" status

Cause: Malformed JSONL file

Solution: Validate file format

def validate_batch_file(filename: str): """Validate JSONL format""" with open(filename, 'r') as f: for i, line in enumerate(f, 1): try: json.loads(line) except json.JSONDecodeError as e: print(f"Error on line {i}: {e}") return False return True

Issue: Some requests failing

Cause: Token limits or invalid requests

Solution: Check error file

if batch.error_file_id: errors = client.files.content(batch.error_file_id) print(errors.content.decode())

Production Checklist

  • Batch workloads identified
  • Queue system implemented
  • Automated submission scheduled
  • Completion checker running
  • Error handling in place
  • Cost tracking implemented
  • Monitoring/alerting set up
  • Results stored properly
  • Tested with production data

Next Steps

  1. Week 1: Identify batch workloads
  2. Week 2: Implement queue system
  3. Week 3: Test with 10% of workload
  4. Week 4: Roll out to 100%, monitor savings

Additional Resources


Support

Need help with batch processing?

Estimated Implementation Time: 2-3 hours
Difficulty: β­β­β­β˜†β˜† (3/5)
Impact: πŸš€πŸš€πŸš€πŸš€β˜† (4/5 - 50% savings on batch workloads)


Last Updated: January 26, 2026
Tested with: OpenAI SDK 1.12.0, Batch API v1