Advanced Usage
Master advanced DDEX Parser features for production applications, including streaming for large files, performance optimization, and integration patterns.
Streaming Large Files
When processing large DDEX catalogs (>10MB), streaming prevents memory issues and provides better performance.
Basic Streaming
JavaScript / TypeScript
import { DDEXParser } from 'ddex-parser';
async function processLargeCatalog(filePath: string) {
const parser = new DDEXParser({
streaming: true,
bufferSize: 8192,
maxMemoryMB: 100
});
let totalReleases = 0;
let totalTracks = 0;
for await (const batch of parser.streamFile(filePath)) {
// Process each batch
console.log(`Processing batch: ${batch.releases.length} releases`);
// Your processing logic here
await processBatch(batch);
totalReleases += batch.releases.length;
totalTracks += batch.soundRecordings.length;
// Optional: Add delay to prevent overwhelming downstream systems
await new Promise(resolve => setTimeout(resolve, 10));
}
console.log(`Processed ${totalReleases} releases, ${totalTracks} tracks`);
}
async function processBatch(batch: any) {
// Insert into database, send to API, etc.
for (const release of batch.releases) {
await database.insertRelease(release);
}
}
Python
from ddex_parser import DDEXParser, ParseOptions
import asyncio
async def process_large_catalog(file_path: str):
parser = DDEXParser()
options = ParseOptions(
streaming=True,
max_memory_mb=100,
timeout_seconds=300 # 5 minutes
)
total_releases = 0
total_tracks = 0
# Stream processing
for batch in parser.stream(open(file_path, 'r').read(), options):
releases = batch.get('releases', [])
tracks = batch.get('sound_recordings', [])
print(f"Processing batch: {len(releases)} releases")
# Process batch
await process_batch(releases, tracks)
total_releases += len(releases)
total_tracks += len(tracks)
# Optional: Rate limiting
await asyncio.sleep(0.01)
print(f"Processed {total_releases} releases, {total_tracks} tracks")
async def process_batch(releases, tracks):
# Your processing logic
for release in releases:
await database.insert_release(release)
Advanced Streaming Patterns
Parallel Batch Processing
import { DDEXParser } from 'ddex-parser';
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
// Main thread: Stream and distribute batches
async function parallelStreamProcessor(filePath: string, numWorkers: number = 4) {
if (!isMainThread) return;
const parser = new DDEXParser({ streaming: true });
const workers: Worker[] = [];
const workQueue: any[] = [];
// Create workers
for (let i = 0; i < numWorkers; i++) {
const worker = new Worker(__filename, {
workerData: { workerId: i }
});
workers.push(worker);
}
// Distribute work
let workerIndex = 0;
for await (const batch of parser.streamFile(filePath)) {
const worker = workers[workerIndex % workers.length];
worker.postMessage({ type: 'process', batch });
workerIndex++;
}
// Signal completion
workers.forEach(worker => {
worker.postMessage({ type: 'complete' });
worker.terminate();
});
}
// Worker thread: Process individual batches
if (!isMainThread) {
const { workerId } = workerData;
parentPort?.on('message', async ({ type, batch }) => {
if (type === 'process') {
console.log(`Worker ${workerId} processing ${batch.releases.length} releases`);
await processBatchInWorker(batch);
parentPort?.postMessage({ type: 'completed', workerId });
} else if (type === 'complete') {
process.exit(0);
}
});
}
async function processBatchInWorker(batch: any) {
// CPU-intensive processing here
for (const release of batch.releases) {
// Transform, validate, enrich data
const enrichedRelease = await enrichReleaseData(release);
await sendToDatabase(enrichedRelease);
}
}
Stream with Backpressure Control
import { DDEXParser } from 'ddex-parser';
import { pipeline, Transform } from 'stream';
async function streamWithBackpressure(filePath: string) {
const parser = new DDEXParser({ streaming: true });
let inFlight = 0;
const maxConcurrent = 10;
for await (const batch of parser.streamFile(filePath)) {
// Wait if too many concurrent operations
while (inFlight >= maxConcurrent) {
await new Promise(resolve => setTimeout(resolve, 10));
}
inFlight++;
processBatchAsync(batch)
.finally(() => inFlight--);
}
// Wait for all operations to complete
while (inFlight > 0) {
await new Promise(resolve => setTimeout(resolve, 10));
}
}
async function processBatchAsync(batch: any) {
try {
// Async processing
await Promise.all(
batch.releases.map(release => processRelease(release))
);
} catch (error) {
console.error('Batch processing error:', error);
}
}
Graph vs Flattened Models
Understanding when to use each data representation:
Graph Model Use Cases
Use the graph model when you need:
- Compliance: Exact DDEX structure preservation
- Round-trip fidelity: Parse → modify → build workflows
- Reference resolution: Working with DDEX references and IDs
- Extension access: Custom DDEX extensions and namespaces
// Graph model: Faithful DDEX structure
const result = await parser.parseFile('release.xml');
// Access original structure
const messageHeader = result.graph.messageHeader;
const parties = result.graph.partyList.party;
const releases = result.graph.releaseList.release;
// Resolve party references
const labelParty = parties.find(p =>
p.partyReference === releases[0].releaseDetailsByTerritory[0].labelName.partyReference
);
// Access extensions
if (result.graph.extensions) {
const customData = result.graph.extensions['custom:namespace'];
}
Flattened Model Use Cases
Use the flattened model when you need:
- Rapid development: Quick access to common fields
- Analytics: Data analysis and reporting
- API responses: Clean JSON for web applications
- Database insertion: Direct mapping to relational schemas
// Flattened model: Developer-friendly
const result = await parser.parseFile('release.xml');
// Direct access to denormalized data
const releases = result.flat.releases;
releases.forEach(release => {
console.log(`${release.title} by ${release.displayArtist}`);
console.log(`Label: ${release.label}`);
console.log(`Territories: ${release.territories.join(', ')}`);
// Perfect for database insertion
database.insertRelease({
title: release.title,
artist: release.displayArtist,
label: release.label,
release_date: release.releaseDate,
territories: release.territories
});
});
Model Conversion and Switching
// Parse once, use both models
const result = await parser.parseFile('release.xml');
// Use graph model for compliance checks
function validateCompliance(graph: GraphModel): string[] {
const errors = [];
if (!graph.messageHeader.messageId) {
errors.push('Missing required MessageId');
}
if (graph.releaseList.release.length === 0) {
errors.push('No releases found');
}
return errors;
}
// Use flattened model for API response
function createApiResponse(flat: FlattenedModel) {
return {
releases: flat.releases.map(r => ({
title: r.title,
artist: r.displayArtist,
tracks: flat.soundRecordings.filter(t =>
r.soundRecordingReferences.includes(t.reference)
).length
}))
};
}
const complianceErrors = validateCompliance(result.graph);
const apiData = createApiResponse(result.flat);
Extension Preservation
DDEX files often contain custom extensions. The parser preserves these for round-trip compatibility.
Handling Custom Extensions
// Parse with extension preservation
const parser = new DDEXParser({
includeRawExtensions: true,
validation: 'permissive' // Allow non-standard elements
});
const result = await parser.parseFile('extended-release.xml');
// Access preserved extensions
if (result.graph.extensions) {
// Custom namespace data
const spotifyData = result.graph.extensions['spotify:metadata'];
const appleData = result.graph.extensions['itunes:info'];
console.log('Spotify playlist ID:', spotifyData?.playlistId);
console.log('Apple Music ID:', appleData?.adamId);
}
// Extensions are also available in individual elements
result.graph.releaseList.release.forEach(release => {
if (release.extensions) {
const customReleaseData = release.extensions['custom:releaseInfo'];
console.log('Custom release data:', customReleaseData);
}
});
Round-Trip with Extensions
import { DDEXParser } from 'ddex-parser';
import { DDEXBuilder } from 'ddex-builder';
async function roundTripWithExtensions(originalXml: string) {
// Parse with full extension preservation
const parser = new DDEXParser({
includeRawExtensions: true,
includeComments: true,
validation: 'permissive'
});
const parsed = await parser.parseString(originalXml);
// Modify data while preserving extensions
parsed.flat.releases[0].title = "Updated Title";
// Build back to XML with extensions intact
const builder = new DDEXBuilder({
preserveExtensions: true,
deterministic: true
});
const newXml = await builder.build(parsed.toBuildRequest());
// Verify round-trip fidelity
const reparsed = await parser.parseString(newXml);
console.log('Original extensions preserved:',
JSON.stringify(reparsed.graph.extensions) === JSON.stringify(parsed.graph.extensions)
);
return newXml;
}
Performance Optimization
Memory Management
// Configure memory limits
const parser = new DDEXParser({
maxMemoryMB: 256, // Hard memory limit
streaming: true, // Enable streaming for large files
bufferSize: 16384, // Larger buffer for better I/O
parallelProcessing: false // Disable if memory-constrained
});
// Monitor memory usage
async function parseWithMemoryMonitoring(filePath: string) {
const initialMemory = process.memoryUsage();
try {
const result = await parser.parseFile(filePath);
const finalMemory = process.memoryUsage();
const usedMB = (finalMemory.heapUsed - initialMemory.heapUsed) / 1024 / 1024;
console.log(`Memory used: ${usedMB.toFixed(2)} MB`);
console.log(`Peak memory: ${(finalMemory.heapTotal / 1024 / 1024).toFixed(2)} MB`);
return result;
} catch (error) {
if (error.message.includes('memory')) {
console.error('Out of memory. Try streaming mode or increase memory limit.');
}
throw error;
}
}
CPU Optimization
// CPU-optimized settings
const fastParser = new DDEXParser({
validation: 'none', // Skip validation for trusted sources
includeComments: false, // Skip comment preservation
includeRawExtensions: false, // Skip extension preservation
parallelProcessing: true, // Use multiple cores
caching: true // Cache parsed schemas
});
// Batch processing optimization
async function optimizedBatchProcessing(files: string[]) {
// Pre-warm parser cache
const warmupXml = generateMinimalDDEX();
await fastParser.parseString(warmupXml);
// Process files in optimal batch sizes
const batchSize = Math.min(10, Math.ceil(files.length / 4));
for (let i = 0; i < files.length; i += batchSize) {
const batch = files.slice(i, i + batchSize);
await Promise.all(
batch.map(async file => {
const start = performance.now();
const result = await fastParser.parseFile(file);
const duration = performance.now() - start;
console.log(`${file}: ${duration.toFixed(2)}ms`);
return result;
})
);
}
}
Caching Strategies
import { DDEXParser } from 'ddex-parser';
import { createHash } from 'crypto';
class CachingParser {
private parser: DDEXParser;
private cache = new Map<string, any>();
private maxCacheSize = 100;
constructor() {
this.parser = new DDEXParser({ caching: true });
}
async parseWithCache(xml: string): Promise<any> {
// Generate cache key
const hash = createHash('sha256').update(xml).digest('hex');
// Check cache
if (this.cache.has(hash)) {
console.log('Cache hit');
return this.cache.get(hash);
}
// Parse and cache
const result = await this.parser.parseString(xml);
// Manage cache size
if (this.cache.size >= this.maxCacheSize) {
const firstKey = this.cache.keys().next().value;
this.cache.delete(firstKey);
}
this.cache.set(hash, result);
console.log('Parsed and cached');
return result;
}
clearCache() {
this.cache.clear();
}
}
Production Deployment Patterns
Microservice Architecture
// DDEX Parser Service
import express from 'express';
import { DDEXParser } from 'ddex-parser';
import multer from 'multer';
const app = express();
const parser = new DDEXParser({
validation: 'strict',
maxMemoryMB: 512,
timeoutSeconds: 30
});
// Configure file upload
const upload = multer({
limits: { fileSize: 100 * 1024 * 1024 }, // 100MB
fileFilter: (req, file, cb) => {
cb(null, file.mimetype === 'text/xml' || file.originalname.endsWith('.xml'));
}
});
// Parse endpoint
app.post('/parse', upload.single('ddex'), async (req, res) => {
try {
if (!req.file) {
return res.status(400).json({ error: 'No DDEX file uploaded' });
}
const xmlContent = req.file.buffer.toString('utf-8');
const result = await parser.parseString(xmlContent);
res.json({
success: true,
data: {
messageId: result.messageId,
version: result.version,
releases: result.flat.releases.length,
tracks: result.flat.soundRecordings.length
},
metadata: {
fileSize: req.file.size,
processedAt: new Date().toISOString()
}
});
} catch (error) {
console.error('Parse error:', error);
res.status(400).json({
success: false,
error: error.message,
type: error.constructor.name
});
}
});
// Streaming endpoint for large files
app.post('/parse/stream', upload.single('ddex'), async (req, res) => {
try {
const xmlContent = req.file.buffer.toString('utf-8');
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked'
});
for await (const batch of parser.stream(xmlContent)) {
const chunk = JSON.stringify({
type: 'batch',
data: batch
}) + '\n';
res.write(chunk);
}
res.end(JSON.stringify({ type: 'complete' }) + '\n');
} catch (error) {
res.end(JSON.stringify({ type: 'error', error: error.message }) + '\n');
}
});
app.listen(3000, () => {
console.log('DDEX Parser service running on port 3000');
});
Event-Driven Processing
import asyncio
import aioredis
from ddex_parser import DDEXParser
import json
class DDEXProcessor:
def __init__(self, redis_url: str):
self.parser = DDEXParser()
self.redis = None
async def start(self):
"""Start the event processor."""
self.redis = await aioredis.from_url(redis_url)
# Listen for DDEX processing jobs
while True:
try:
# Block for up to 1 second for new jobs
result = await self.redis.brpop(['ddex:queue'], timeout=1)
if result:
queue_name, job_data = result
job = json.loads(job_data)
await self.process_job(job)
except Exception as e:
print(f"Error processing job: {e}")
await asyncio.sleep(1)
async def process_job(self, job: dict):
"""Process a DDEX parsing job."""
job_id = job['id']
xml_content = job['xml']
try:
# Update job status
await self.redis.hset(f"ddex:job:{job_id}", "status", "processing")
# Parse DDEX
result = await self.parser.parse_async(xml_content)
# Store results
result_data = {
'message_id': result.message_id,
'version': result.version,
'releases': len(result.releases),
'processed_at': datetime.utcnow().isoformat()
}
await self.redis.hset(f"ddex:job:{job_id}", mapping={
"status": "completed",
"result": json.dumps(result_data)
})
# Emit completion event
await self.redis.lpush('ddex:completed', json.dumps({
'job_id': job_id,
'result': result_data
}))
print(f"✅ Completed job {job_id}")
except Exception as e:
await self.redis.hset(f"ddex:job:{job_id}", mapping={
"status": "failed",
"error": str(e)
})
print(f"❌ Failed job {job_id}: {e}")
# Start processor
if __name__ == "__main__":
processor = DDEXProcessor("redis://localhost:6379")
asyncio.run(processor.start())
Database Integration
import { DDEXParser } from 'ddex-parser';
import { Pool } from 'pg';
class DDEXDatabaseIntegrator {
private parser: DDEXParser;
private db: Pool;
constructor(dbConfig: any) {
this.parser = new DDEXParser({
validation: 'strict',
streaming: true
});
this.db = new Pool(dbConfig);
}
async processFile(filePath: string): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
for await (const batch of this.parser.streamFile(filePath)) {
await this.processBatch(client, batch);
}
await client.query('COMMIT');
console.log('✅ File processed successfully');
} catch (error) {
await client.query('ROLLBACK');
console.error('❌ Processing failed:', error);
throw error;
} finally {
client.release();
}
}
private async processBatch(client: any, batch: any): Promise<void> {
// Insert releases
for (const release of batch.releases) {
await client.query(`
INSERT INTO releases (title, artist, label, release_date, territories, genres)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (title, artist) DO UPDATE SET
release_date = EXCLUDED.release_date,
territories = EXCLUDED.territories
`, [
release.title,
release.displayArtist,
release.label,
release.releaseDate,
JSON.stringify(release.territories),
JSON.stringify(release.genres)
]);
}
// Insert tracks
for (const track of batch.soundRecordings) {
await client.query(`
INSERT INTO tracks (title, artist, isrc, duration, territories)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (isrc) DO UPDATE SET
title = EXCLUDED.title,
artist = EXCLUDED.artist
`, [
track.title,
track.displayArtist,
track.isrc,
track.duration,
JSON.stringify(track.territories)
]);
}
// Insert deals
for (const deal of batch.deals) {
await client.query(`
INSERT INTO deals (commercial_model, territories, use_types, valid_from, valid_until)
VALUES ($1, $2, $3, $4, $5)
`, [
deal.commercialModelType,
JSON.stringify(deal.territories),
JSON.stringify(deal.useTypes),
deal.validityPeriod.startDate,
deal.validityPeriod.endDate
]);
}
}
}
// Usage
const integrator = new DDEXDatabaseIntegrator({
host: 'localhost',
port: 5432,
database: 'music_catalog',
user: 'ddex_user',
password: 'password'
});
await integrator.processFile('large-catalog.xml');
Error Recovery and Resilience
Retry Strategies
import { DDEXParser, DDEXError, SecurityError } from 'ddex-parser';
class ResilientParser {
private parser: DDEXParser;
private maxRetries: number = 3;
private retryDelay: number = 1000;
constructor() {
this.parser = new DDEXParser({
validation: 'strict',
timeoutSeconds: 30
});
}
async parseWithRetry(xml: string): Promise<any> {
let lastError: Error;
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
return await this.parser.parseString(xml);
} catch (error) {
lastError = error;
// Don't retry security errors or validation errors
if (error instanceof SecurityError || error instanceof ValidationError) {
throw error;
}
if (attempt < this.maxRetries) {
console.log(`Parse attempt ${attempt} failed, retrying in ${this.retryDelay}ms...`);
await this.delay(this.retryDelay * attempt); // Exponential backoff
}
}
}
throw new Error(`Failed to parse after ${this.maxRetries} attempts: ${lastError.message}`);
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Circuit breaker pattern
class CircuitBreakerParser {
private parser: DDEXParser;
private failureCount = 0;
private failureThreshold = 5;
private resetTimeout = 60000; // 1 minute
private state: 'closed' | 'open' | 'half-open' = 'closed';
private nextAttempt = 0;
constructor() {
this.parser = new DDEXParser();
}
async parse(xml: string): Promise<any> {
if (this.state === 'open') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
} else {
this.state = 'half-open';
}
}
try {
const result = await this.parser.parseString(xml);
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess() {
this.failureCount = 0;
this.state = 'closed';
}
private onFailure() {
this.failureCount++;
if (this.failureCount >= this.failureThreshold) {
this.state = 'open';
this.nextAttempt = Date.now() + this.resetTimeout;
}
}
}
These advanced patterns help you build robust, production-ready applications with the DDEX Parser. For integration with the DDEX Builder for complete round-trip workflows, see the Builder Documentation.