Batch Processing
Process multiple DDEX documents efficiently using the builder's batch processing capabilities.
Overview
Batch processing allows you to:
- Process hundreds of releases simultaneously
- Optimize memory usage for large datasets
- Implement parallel processing workflows
- Handle bulk operations efficiently
Basic Batch Processing
JavaScript/TypeScript
import { DDEXBuilder } from 'ddex-builder';
const builder = new DDEXBuilder();
const releases = [
{ title: "Album 1", /* ... */ },
{ title: "Album 2", /* ... */ },
// ... more releases
];
// Process all releases in a single operation
const batchResult = await builder.buildBatch(releases, {
version: '4.3',
batchSize: 50, // Process 50 at a time
parallel: true, // Use parallel processing
validateEach: true // Validate each document
});
console.log(`Processed ${batchResult.success.length} releases`);
console.log(`Failed: ${batchResult.errors.length}`);
Python
from ddex_builder import DDEXBuilder
import pandas as pd
builder = DDEXBuilder()
# Process DataFrame with multiple releases
df = pd.read_csv('releases.csv')
results = builder.from_dataframe_batch(
df,
group_by='release_id',
batch_size=100,
parallel=True
)
print(f"Processed {len(results.success)} releases")
for error in results.errors:
print(f"Error in {error.id}: {error.message}")
Rust
use ddex_builder::{DDEXBuilder, BuildRequest};
let builder = DDEXBuilder::new();
let requests: Vec<BuildRequest> = vec![/* ... */];
let results = builder.build_batch(
requests,
BatchOptions {
batch_size: 50,
parallel: true,
fail_fast: false,
}
)?;
println!("Processed {} documents", results.len());
Streaming Batch Processing
For very large datasets, use streaming to minimize memory usage:
import { DDEXBuilder, StreamingBatchProcessor } from 'ddex-builder';
const processor = new StreamingBatchProcessor({
batchSize: 25,
maxConcurrency: 4,
onProgress: (processed, total) => {
console.log(`Progress: ${processed}/${total}`);
},
onError: (error, item) => {
console.error(`Failed to process ${item.id}:`, error);
}
});
// Process from stream
await processor.processStream(inputStream, outputStream);
Database Integration
Process releases directly from database queries:
import asyncio
from ddex_builder import DDEXBuilder
from sqlalchemy import create_engine
async def process_from_database():
builder = DDEXBuilder()
engine = create_engine('postgresql://...')
# Process in chunks to manage memory
chunk_size = 100
offset = 0
while True:
query = f"""
SELECT * FROM releases
ORDER BY created_at
LIMIT {chunk_size} OFFSET {offset}
"""
df = pd.read_sql(query, engine)
if df.empty:
break
results = builder.from_dataframe_batch(df)
# Save results
for i, xml in enumerate(results.success):
filename = f"release_{offset + i}.xml"
with open(f"output/{filename}", 'w') as f:
f.write(xml)
offset += chunk_size
Error Handling
Implement robust error handling for batch operations:
const results = await builder.buildBatch(data, {
continueOnError: true, // Don't stop on individual failures
maxRetries: 3, // Retry failed items
retryDelay: 1000, // Wait between retries
});
// Process results
for (const success of results.success) {
await saveToFile(success.id, success.xml);
}
// Handle errors
for (const error of results.errors) {
logger.error(`Failed to process ${error.id}:`, error.message);
// Optionally retry with different settings
if (error.retryable) {
await retryLater(error.data);
}
}
Performance Optimization
Memory Management
use ddex_builder::BatchOptions;
let options = BatchOptions {
batch_size: 50, // Balance memory vs parallelism
memory_limit: 1024 * 1024 * 100, // 100MB limit
gc_frequency: 10, // Clean up every 10 batches
..Default::default()
};
let results = builder.build_batch_with_options(requests, options)?;
Parallel Processing
from ddex_builder import DDEXBuilder
import concurrent.futures
def build_release(release_data):
builder = DDEXBuilder() # Thread-local instance
return builder.build(release_data)
# Process with thread pool
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(build_release, release)
for release in releases]
results = [future.result() for future in futures]
Monitoring and Metrics
Track batch processing performance:
const processor = new BatchProcessor({
onBatchComplete: (batchStats) => {
console.log({
batchNumber: batchStats.batchNumber,
itemsProcessed: batchStats.itemsProcessed,
processingTime: batchStats.processingTimeMs,
throughput: batchStats.itemsPerSecond,
memoryUsage: batchStats.memoryUsageMB
});
},
onComplete: (totalStats) => {
console.log({
totalItems: totalStats.totalItems,
totalTime: totalStats.totalTimeMs,
averageThroughput: totalStats.averageItemsPerSecond,
peakMemory: totalStats.peakMemoryMB,
errorRate: totalStats.errorRate
});
}
});
Best Practices
- Choose appropriate batch sizes: Start with 25-50 items per batch
- Monitor memory usage: Implement memory limits to prevent OOM
- Handle errors gracefully: Use
continueOnError
for resilient processing - Use streaming for large datasets: Avoid loading everything into memory
- Implement progress tracking: Provide feedback for long-running operations
- Test with representative data: Validate performance with real-world datasets
- Consider database connection pooling: For database-driven workflows
Common Patterns
Producer-Consumer with Queue
import asyncio
from asyncio import Queue
async def producer(queue: Queue, data_source):
async for batch in data_source:
await queue.put(batch)
await queue.put(None) # Signal completion
async def consumer(queue: Queue, builder: DDEXBuilder):
while True:
batch = await queue.get()
if batch is None:
break
results = builder.build_batch(batch)
await process_results(results)
# Run producer and consumer concurrently
queue = Queue(maxsize=5)
await asyncio.gather(
producer(queue, data_source),
consumer(queue, builder)
)