πŸ“Š

Batch Processing

Batch LLM and embedding jobs to unlock provider batch discounts and simpler rate limits. When to batch, how to chunk inputs, idempotency, and monitoring so throughput goes up and per-token cost goes down.

Time: 3-4 hoursDifficulty: IntermediatePotential Savings: $500-2,500/month

Best For: Applications with bulk processing needs

Batch Processing Implementation Guide

Process Non-Urgent AI Requests at 50% Discount

Difficulty: Beginner to Intermediate
Time Required: 2-3 hours
Potential Savings: $1,500-6,000/month (50% reduction on batch workloads)
Best For: Background jobs, data processing, content generation pipelines


What is Batch Processing?

Traditional (Synchronous) Processing:

Process 1,000 documents immediately
    ↓
Pay full price: $100
Wait for each to complete: 30 minutes

Batch Processing (24-hour window):

Queue 1,000 documents for batch processing
    ↓
Pay 50% less: $50
Results delivered within 24 hours

Trade-off: Wait up to 24 hours, save 50% on costs.


Why You Need This

Cost Comparison:

Use CaseReal-TimeBatchSavings
Classify 10K documents$50$2550%
Generate summaries (1K articles)$200$10050%
Sentiment analysis (100K reviews)$500$25050%
Data enrichment (50K records)$300$15050%

When to Use Batch:

βœ… Good for Batch:

  • Nightly data processing
  • Content moderation queue
  • Bulk document analysis
  • Email campaign generation
  • Analytics reports
  • Data enrichment
  • SEO content generation

❌ Not for Batch:

  • User-facing chatbots
  • Real-time search
  • Live customer support
  • Interactive applications
  • Anything time-sensitive

Prerequisites

Before implementing:

  • OpenAI API access (Batch API available)
  • Identify batch-suitable workloads
  • File storage (local or S3/R2)
  • Python 3.8+ (for examples)
  • Understanding of async job processing

Batch API Availability:

  • OpenAI: βœ… Batch API available
  • Anthropic: ❌ No batch API yet
  • Azure OpenAI: βœ… Batch available

Implementation Steps

Step 1: Identify Batch Workloads

Audit your current usage:

def analyze_batch_potential(usage_logs): """ Analyze which workloads could be batched. Returns estimation of potential savings. """ batch_candidates = { 'document_processing': 0, 'content_generation': 0, 'data_enrichment': 0, 'sentiment_analysis': 0, 'other': 0 } for log in usage_logs: # Check if request is time-sensitive if log.get('priority') == 'low' or log.get('latency_tolerance') > 3600: # This could be batched category = classify_request_type(log) batch_candidates[category] += log.get('cost', 0) total_batchable = sum(batch_candidates.values()) potential_savings = total_batchable * 0.5 # 50% discount return { 'batchable_cost': total_batchable, 'potential_savings': potential_savings, 'by_category': batch_candidates } # Example analysis = analyze_batch_potential(your_usage_logs) print(f"Potential monthly savings: ${analysis['potential_savings']:,.2f}")

Step 2: Create Batch Requests (OpenAI)

Basic Batch Request:

from openai import OpenAI import json client = OpenAI() def create_batch_file(requests: list, filename: str = "batch_requests.jsonl"): """ Create JSONL file with batch requests. Each line is a separate request in JSON format. """ with open(filename, 'w') as f: for i, request in enumerate(requests): # Format as required by OpenAI Batch API batch_request = { "custom_id": f"request-{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4o-mini", "messages": request['messages'], "max_tokens": request.get('max_tokens', 1000) } } f.write(json.dumps(batch_request) + '\n') return filename # Example: Create batch of 1000 document summaries requests = [] for doc in documents[:1000]: requests.append({ 'messages': [ {"role": "system", "content": "Summarize the following document concisely."}, {"role": "user", "content": doc['text']} ], 'max_tokens': 500 }) batch_file = create_batch_file(requests) print(f"Created batch file: {batch_file}")

Step 3: Upload and Submit Batch

def submit_batch(batch_file_path: str) -> str: """ Upload batch file and submit for processing. Returns batch ID for tracking. """ # Step 1: Upload file with open(batch_file_path, 'rb') as f: batch_input_file = client.files.create( file=f, purpose="batch" ) print(f"Uploaded file: {batch_input_file.id}") # Step 2: Create batch batch = client.batches.create( input_file_id=batch_input_file.id, endpoint="/v1/chat/completions", completion_window="24h", # Must complete within 24 hours metadata={ "description": "Document summary batch", "project": "content-pipeline" } ) print(f"Batch submitted: {batch.id}") print(f"Status: {batch.status}") return batch.id # Submit batch batch_id = submit_batch('batch_requests.jsonl')

Step 4: Monitor Batch Status

import time def check_batch_status(batch_id: str) -> dict: """ Check status of batch processing. Returns batch details including progress. """ batch = client.batches.retrieve(batch_id) return { 'id': batch.id, 'status': batch.status, # validating, in_progress, completed, failed 'created_at': batch.created_at, 'request_counts': batch.request_counts, 'output_file_id': batch.output_file_id if batch.status == 'completed' else None, 'error_file_id': batch.error_file_id if hasattr(batch, 'error_file_id') else None } def wait_for_batch(batch_id: str, check_interval: int = 60): """ Wait for batch to complete. Polls every check_interval seconds. """ print(f"Waiting for batch {batch_id} to complete...") while True: status = check_batch_status(batch_id) print(f"Status: {status['status']}") if status['status'] == 'completed': print(f"βœ“ Batch completed!") print(f" Total requests: {status['request_counts']['total']}") print(f" Completed: {status['request_counts']['completed']}") print(f" Failed: {status['request_counts']['failed']}") return status elif status['status'] == 'failed': print(f"βœ— Batch failed!") return status elif status['status'] in ['validating', 'in_progress']: print(f" Progress: {status['request_counts'].get('completed', 0)}/{status['request_counts']['total']}") time.sleep(check_interval) else: print(f"Unknown status: {status['status']}") time.sleep(check_interval) # Wait for completion result = wait_for_batch(batch_id, check_interval=300) # Check every 5 minutes

Step 5: Retrieve Results

def download_batch_results(batch_id: str, output_filename: str = "batch_results.jsonl"): """ Download and parse batch results. Returns list of results. """ # Get batch details batch = client.batches.retrieve(batch_id) if batch.status != 'completed': raise Exception(f"Batch not completed yet. Status: {batch.status}") # Download output file file_response = client.files.content(batch.output_file_id) # Save to file with open(output_filename, 'wb') as f: f.write(file_response.content) # Parse results results = [] with open(output_filename, 'r') as f: for line in f: if line.strip(): result = json.loads(line) results.append(result) print(f"Downloaded {len(results)} results to {output_filename}") return results def process_batch_results(results: list) -> list: """ Extract responses from batch results. Returns cleaned results. """ processed = [] for result in results: if result.get('error'): # Handle error processed.append({ 'custom_id': result['custom_id'], 'status': 'error', 'error': result['error'] }) else: # Extract response response = result['response']['body']['choices'][0]['message']['content'] processed.append({ 'custom_id': result['custom_id'], 'status': 'success', 'response': response, 'usage': result['response']['body']['usage'] }) return processed # Download and process results results = download_batch_results(batch_id) processed_results = process_batch_results(results) # Save to database or process further for result in processed_results: if result['status'] == 'success': # Store result save_to_database(result['custom_id'], result['response']) else: # Log error log_error(result['custom_id'], result['error'])

Step 6: Production Batch Pipeline

Complete production-ready system:

import os from typing import List, Dict, Any from datetime import datetime import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BatchProcessor: """ Production-ready batch processing system. Features: - Automatic batching of queued requests - Progress monitoring - Error handling - Result storage - Cost tracking """ def __init__(self, client, batch_size: int = 50000): self.client = client self.batch_size = batch_size # Max requests per batch def queue_request(self, request_id: str, messages: list, **kwargs): """ Queue a request for batch processing. Stores in database with status 'queued'. """ # Store in database (pseudo-code) db.insert('batch_queue', { 'request_id': request_id, 'messages': json.dumps(messages), 'params': json.dumps(kwargs), 'status': 'queued', 'created_at': datetime.utcnow() }) logger.info(f"Queued request: {request_id}") def create_and_submit_batch(self) -> str: """ Create batch from queued requests and submit. Returns batch_id. """ # Get queued requests queued = db.query( 'batch_queue', {'status': 'queued'}, limit=self.batch_size ) if not queued: logger.info("No requests in queue") return None logger.info(f"Creating batch with {len(queued)} requests") # Create batch file batch_filename = f"batch_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.jsonl" with open(batch_filename, 'w') as f: for req in queued: batch_req = { "custom_id": req['request_id'], "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4o-mini", "messages": json.loads(req['messages']), **json.loads(req['params']) } } f.write(json.dumps(batch_req) + '\n') # Upload and submit with open(batch_filename, 'rb') as f: batch_input_file = self.client.files.create( file=f, purpose="batch" ) batch = self.client.batches.create( input_file_id=batch_input_file.id, endpoint="/v1/chat/completions", completion_window="24h" ) # Update database for req in queued: db.update( 'batch_queue', {'request_id': req['request_id']}, { 'status': 'processing', 'batch_id': batch.id, 'submitted_at': datetime.utcnow() } ) # Store batch metadata db.insert('batch_jobs', { 'batch_id': batch.id, 'request_count': len(queued), 'status': 'processing', 'created_at': datetime.utcnow() }) logger.info(f"Submitted batch: {batch.id}") # Clean up file os.remove(batch_filename) return batch.id def check_and_process_completed(self): """ Check all processing batches and handle completed ones. Run this periodically (e.g., every 5 minutes via cron). """ # Get processing batches processing = db.query('batch_jobs', {'status': 'processing'}) for job in processing: batch = self.client.batches.retrieve(job['batch_id']) if batch.status == 'completed': logger.info(f"Batch {batch.id} completed, processing results") # Download results file_response = self.client.files.content(batch.output_file_id) results = [] for line in file_response.content.decode().split('\n'): if line.strip(): results.append(json.loads(line)) # Process each result for result in results: custom_id = result['custom_id'] if result.get('error'): # Mark as failed db.update( 'batch_queue', {'request_id': custom_id}, { 'status': 'failed', 'error': json.dumps(result['error']), 'completed_at': datetime.utcnow() } ) else: # Extract response response = result['response']['body']['choices'][0]['message']['content'] usage = result['response']['body']['usage'] # Update database db.update( 'batch_queue', {'request_id': custom_id}, { 'status': 'completed', 'response': response, 'usage': json.dumps(usage), 'completed_at': datetime.utcnow() } ) # Update batch job db.update( 'batch_jobs', {'batch_id': batch.id}, { 'status': 'completed', 'completed_at': datetime.utcnow(), 'completed_count': batch.request_counts.completed, 'failed_count': batch.request_counts.failed } ) logger.info(f"Processed {len(results)} results for batch {batch.id}") elif batch.status == 'failed': logger.error(f"Batch {batch.id} failed") # Mark all requests as failed db.update_many( 'batch_queue', {'batch_id': batch.id}, { 'status': 'failed', 'error': 'Batch processing failed' } ) db.update( 'batch_jobs', {'batch_id': batch.id}, {'status': 'failed'} ) def get_request_status(self, request_id: str) -> dict: """Check status of a queued request""" return db.query_one('batch_queue', {'request_id': request_id}) def get_stats(self) -> dict: """Get batch processing statistics""" return { 'queued': db.count('batch_queue', {'status': 'queued'}), 'processing': db.count('batch_queue', {'status': 'processing'}), 'completed': db.count('batch_queue', {'status': 'completed'}), 'failed': db.count('batch_queue', {'status': 'failed'}) } # Usage batch_processor = BatchProcessor(client) # Queue requests for doc in documents: batch_processor.queue_request( request_id=doc['id'], messages=[ {"role": "system", "content": "Summarize this document."}, {"role": "user", "content": doc['text']} ], max_tokens=500 ) # Submit batch (run once per day or when queue reaches threshold) batch_id = batch_processor.create_and_submit_batch() # Check completed (run every 5 minutes via cron) batch_processor.check_and_process_completed() # Check stats stats = batch_processor.get_stats() print(f"Queue status: {stats}")

Scheduling Strategy

Cron Jobs for Automation:

# crontab -e # Submit new batch every night at 2 AM 0 2 * * * /usr/bin/python3 /app/submit_batch.py # Check for completed batches every 5 minutes */5 * * * * /usr/bin/python3 /app/check_batches.py # Generate daily batch report 0 8 * * * /usr/bin/python3 /app/batch_report.py

Celery Tasks (Alternative):

from celery import Celery from celery.schedules import crontab app = Celery('batch_tasks') @app.task def submit_daily_batch(): """Submit queued requests as batch""" batch_processor = BatchProcessor(client) batch_id = batch_processor.create_and_submit_batch() return batch_id @app.task def check_completed_batches(): """Check and process completed batches""" batch_processor = BatchProcessor(client) batch_processor.check_and_process_completed() # Schedule app.conf.beat_schedule = { 'submit-daily-batch': { 'task': 'batch_tasks.submit_daily_batch', 'schedule': crontab(hour=2, minute=0), # 2 AM daily }, 'check-completed': { 'task': 'batch_tasks.check_completed_batches', 'schedule': crontab(minute='*/5'), # Every 5 minutes }, }

Cost Tracking

def calculate_batch_savings(batch_id: str) -> dict: """ Calculate cost savings from using batch API. Returns comparison with real-time processing. """ # Get batch results batch = client.batches.retrieve(batch_id) # Get usage from results results = download_batch_results(batch_id) total_input_tokens = 0 total_output_tokens = 0 for result in results: if result['status'] == 'success': usage = result['usage'] total_input_tokens += usage['prompt_tokens'] total_output_tokens += usage['completion_tokens'] # Calculate costs # GPT-4o-mini pricing input_cost_per_1k = 0.00015 output_cost_per_1k = 0.0006 # Batch: 50% discount batch_input_cost = (total_input_tokens / 1000) * input_cost_per_1k * 0.5 batch_output_cost = (total_output_tokens / 1000) * output_cost_per_1k * 0.5 batch_total = batch_input_cost + batch_output_cost # Real-time: Full price realtime_input_cost = (total_input_tokens / 1000) * input_cost_per_1k realtime_output_cost = (total_output_tokens / 1000) * output_cost_per_1k realtime_total = realtime_input_cost + realtime_output_cost savings = realtime_total - batch_total savings_pct = (savings / realtime_total * 100) if realtime_total > 0 else 0 return { 'batch_cost': batch_total, 'realtime_cost': realtime_total, 'savings': savings, 'savings_pct': savings_pct, 'total_requests': len(results), 'input_tokens': total_input_tokens, 'output_tokens': total_output_tokens } # Calculate savings savings = calculate_batch_savings(batch_id) print(f""" Batch Cost Analysis: -------------------- Batch cost: ${savings['batch_cost']:.2f} Real-time cost: ${savings['realtime_cost']:.2f} Savings: ${savings['savings']:.2f} ({savings['savings_pct']:.1f}%) Total requests: {savings['total_requests']:,} """)

Expected Results

Real-World Performance:

Content Pipeline (1,000 articles/day):

  • Real-time cost: $200/day
  • Batch cost: $100/day
  • Savings: $3,000/month

Data Enrichment (50K records/day):

  • Real-time cost: $300/day
  • Batch cost: $150/day
  • Savings: $4,500/month

Sentiment Analysis (100K reviews/month):

  • Real-time cost: $500/month
  • Batch cost: $250/month
  • Savings: $250/month

Troubleshooting

Issue: Batch stuck in "validating" status

Cause: Malformed JSONL file

Solution: Validate file format

def validate_batch_file(filename: str): """Validate JSONL format""" with open(filename, 'r') as f: for i, line in enumerate(f, 1): try: json.loads(line) except json.JSONDecodeError as e: print(f"Error on line {i}: {e}") return False return True

Issue: Some requests failing

Cause: Token limits or invalid requests

Solution: Check error file

if batch.error_file_id: errors = client.files.content(batch.error_file_id) print(errors.content.decode())

Production Checklist

  • Batch workloads identified
  • Queue system implemented
  • Automated submission scheduled
  • Completion checker running
  • Error handling in place
  • Cost tracking implemented
  • Monitoring/alerting set up
  • Results stored properly
  • Tested with production data

Next Steps

  1. Week 1: Identify batch workloads
  2. Week 2: Implement queue system
  3. Week 3: Test with 10% of workload
  4. Week 4: Roll out to 100%, monitor savings

Additional Resources


Support

Need help with batch processing?

Estimated Implementation Time: 2-3 hours
Difficulty: β­β­β­β˜†β˜† (3/5)
Impact: πŸš€πŸš€πŸš€πŸš€β˜† (4/5 - 50% savings on batch workloads)


Last Updated: January 26, 2026
Tested with: OpenAI SDK 1.12.0, Batch API v1