Streaming API
The streaming API enables memory-efficient processing of large DDEX XML files by parsing content incrementally rather than loading everything into memory.
Core Streaming Classes
StreamingParser
Main class for streaming DDEX content:
class StreamingParser {
constructor(options?: StreamingOptions);
parseStream(source: ReadableStream | NodeJS.ReadableStream): AsyncIterableIterator<StreamedRelease>;
getProgress(): StreamProgress;
pause(): void;
resume(): void;
abort(): void;
}
StreamingOptions
Configuration for streaming operations:
interface StreamingOptions {
chunkSize?: number; // Size of chunks to process (default: 8192 bytes)
maxMemory?: number; // Maximum memory usage (default: 64MB)
bufferSize?: number; // Internal buffer size (default: 16384 bytes)
timeout?: number; // Timeout per chunk (default: 5000ms)
skipValidation?: boolean; // Skip XML schema validation for speed
preserveWhitespace?: boolean; // Preserve whitespace in text content
encoding?: string; // Input encoding (default: 'utf-8')
}
Stream Processing
Streaming Parse Results
interface StreamedRelease {
releaseId: string;
title: string;
releaseDate?: string;
mainArtist?: string;
tracks: StreamedTrack[];
metadata: ReleaseMetadata;
xmlFragment?: string; // Raw XML if preserveRaw: true
}
interface StreamedTrack {
resourceId: string;
title: string;
isrc?: string;
duration?: string;
sequenceNumber?: number;
metadata: TrackMetadata;
}
Progress Tracking
interface StreamProgress {
bytesProcessed: number;
bytesTotal?: number;
releasesProcessed: number;
tracksProcessed: number;
elapsedTimeMs: number;
estimatedRemainingMs?: number;
throughputBytesPerSec: number;
memoryUsage: MemoryUsage;
}
interface MemoryUsage {
heapUsed: number;
heapTotal: number;
external: number;
rss: number;
}
Usage Examples
Basic Streaming
import { StreamingParser } from 'ddex-parser';
import { createReadStream } from 'fs';
const parser = new StreamingParser({
chunkSize: 16384, // 16KB chunks
maxMemory: 128 * 1024 * 1024, // 128MB limit
skipValidation: false
});
const fileStream = createReadStream('large-catalog.xml');
const releaseStream = parser.parseStream(fileStream);
// Process releases as they're parsed
for await (const release of releaseStream) {
console.log(`Processing: ${release.title} (${release.tracks.length} tracks)`);
// Process individual release
await processRelease(release);
// Check progress periodically
const progress = parser.getProgress();
if (progress.releasesProcessed % 100 === 0) {
const completion = progress.bytesTotal
? (progress.bytesProcessed / progress.bytesTotal * 100).toFixed(1)
: 'unknown';
console.log(`Progress: ${progress.releasesProcessed} releases, ${completion}% complete`);
}
}
console.log('Streaming parse complete!');
Memory-Constrained Streaming
const constrainedParser = new StreamingParser({
maxMemory: 32 * 1024 * 1024, // 32MB limit
chunkSize: 4096, // Smaller chunks
bufferSize: 8192 // Smaller buffer
});
const stream = createReadStream('massive-catalog.xml');
const releases = constrainedParser.parseStream(stream);
let processedCount = 0;
const batchSize = 50;
for await (const release of releases) {
await processRelease(release);
processedCount++;
// Force garbage collection periodically
if (processedCount % batchSize === 0) {
if (global.gc) {
global.gc();
}
const memory = parser.getProgress().memoryUsage;
console.log(`Memory usage: ${Math.round(memory.heapUsed / 1024 / 1024)}MB`);
// Pause if memory usage is too high
if (memory.heapUsed > 30 * 1024 * 1024) { // 30MB threshold
console.log('Pausing due to high memory usage...');
parser.pause();
// Wait for memory to be freed
await new Promise(resolve => setTimeout(resolve, 1000));
parser.resume();
}
}
}
Real-time Progress Monitoring
const parser = new StreamingParser({
chunkSize: 8192,
maxMemory: 100 * 1024 * 1024
});
// Set up progress monitoring
const progressInterval = setInterval(() => {
const progress = parser.getProgress();
const memoryMB = Math.round(progress.memoryUsage.heapUsed / 1024 / 1024);
const throughputMBps = (progress.throughputBytesPerSec / 1024 / 1024).toFixed(2);
const eta = progress.estimatedRemainingMs
? new Date(Date.now() + progress.estimatedRemainingMs).toLocaleTimeString()
: 'unknown';
console.log(`📊 Progress: ${progress.releasesProcessed} releases | ${memoryMB}MB | ${throughputMBps} MB/s | ETA: ${eta}`);
}, 5000); // Update every 5 seconds
const fileStream = createReadStream('catalog.xml');
const releases = parser.parseStream(fileStream);
try {
for await (const release of releases) {
await processRelease(release);
}
} finally {
clearInterval(progressInterval);
console.log('✅ Streaming complete!');
}
Advanced Streaming Features
Selective Processing
Process only specific types of content:
interface SelectiveStreamingOptions extends StreamingOptions {
filter?: {
releaseTypes?: string[]; // Only process specific release types
territories?: string[]; // Only process specific territories
dateRange?: { // Only process releases in date range
start: string;
end: string;
};
artistFilter?: string[]; // Only process specific artists
labelFilter?: string[]; // Only process specific labels
};
}
const selectiveParser = new StreamingParser({
chunkSize: 8192,
filter: {
releaseTypes: ['Album', 'EP'],
territories: ['US', 'CA'],
dateRange: {
start: '2024-01-01',
end: '2024-12-31'
}
}
} as SelectiveStreamingOptions);
// Only albums and EPs for US/CA released in 2024 will be yielded
for await (const release of selectiveParser.parseStream(stream)) {
// Process filtered release
console.log(`${release.title} - matches filter criteria`);
}
Parallel Processing
Process multiple streams concurrently:
async function processMultipleFiles(filePaths: string[]): Promise<void> {
const concurrency = 3; // Process 3 files at once
const semaphore = new Array(concurrency).fill(null);
const processFile = async (filePath: string): Promise<void> => {
const parser = new StreamingParser({
chunkSize: 8192,
maxMemory: 50 * 1024 * 1024 // 50MB per parser
});
const stream = createReadStream(filePath);
const releases = parser.parseStream(stream);
console.log(`📁 Starting: ${filePath}`);
let count = 0;
for await (const release of releases) {
await processRelease(release);
count++;
}
console.log(`✅ Completed: ${filePath} (${count} releases)`);
};
// Process files with concurrency limit
await Promise.all(
filePaths.map((filePath, index) =>
semaphore[index % concurrency] = processFile(filePath)
)
);
}
// Usage
await processMultipleFiles([
'catalog-2023.xml',
'catalog-2024.xml',
'catalog-updates.xml'
]);
Stream Transformation
Transform releases during streaming:
import { Transform } from 'stream';
class DDEXTransformStream extends Transform {
constructor(private transformer: (release: StreamedRelease) => StreamedRelease) {
super({ objectMode: true });
}
_transform(release: StreamedRelease, encoding: string, callback: Function) {
try {
const transformed = this.transformer(release);
this.push(transformed);
callback();
} catch (error) {
callback(error);
}
}
}
// Usage
const parser = new StreamingParser();
const fileStream = createReadStream('catalog.xml');
const releaseStream = parser.parseStream(fileStream);
const transformer = new DDEXTransformStream((release) => {
// Normalize artist names
release.mainArtist = release.mainArtist?.trim().toUpperCase();
// Add computed fields
release.metadata.totalDurationMs = release.tracks.reduce((total, track) => {
const duration = parseDuration(track.duration || 'PT0S');
return total + duration * 1000;
}, 0);
return release;
});
// Chain streams: File -> Parser -> Transform -> Output
fileStream
.pipe(releaseStream)
.pipe(transformer)
.on('data', (release) => {
console.log(`Processed: ${release.title} (${release.metadata.totalDurationMs}ms)`);
})
.on('end', () => {
console.log('Stream processing complete!');
});
Error Handling in Streams
Resilient Streaming
Handle errors gracefully during streaming:
const resilientParser = new StreamingParser({
chunkSize: 8192,
continueOnError: true, // Continue processing after errors
maxErrors: 10 // Stop after 10 errors
});
const stream = createReadStream('catalog.xml');
const releases = resilientParser.parseStream(stream);
let errorCount = 0;
const errors: Error[] = [];
try {
for await (const release of releases) {
try {
await processRelease(release);
} catch (error) {
errorCount++;
errors.push(error);
console.warn(`⚠️ Error processing release ${release.releaseId}: ${error.message}`);
// Continue with next release
if (errorCount < 10) {
continue;
} else {
console.error('❌ Too many errors, stopping');
break;
}
}
}
} catch (streamError) {
console.error('💥 Fatal streaming error:', streamError.message);
} finally {
console.log(`📊 Summary: ${errorCount} errors encountered`);
if (errors.length > 0) {
console.log('Errors:', errors.map(e => e.message));
}
}
Performance Optimization
Memory Management
const optimizedParser = new StreamingParser({
chunkSize: 16384, // Larger chunks for better throughput
maxMemory: 100 * 1024 * 1024, // 100MB limit
bufferSize: 32768, // Larger buffer
gcThreshold: 0.8, // Trigger GC at 80% memory usage
poolingEnabled: true // Pool objects to reduce GC pressure
});
// Monitor memory usage
const memoryMonitor = setInterval(() => {
const progress = optimizedParser.getProgress();
const memoryPercent = (progress.memoryUsage.heapUsed / (100 * 1024 * 1024) * 100).toFixed(1);
if (parseFloat(memoryPercent) > 80) {
console.warn(`⚠️ High memory usage: ${memoryPercent}%`);
}
}, 1000);
// Process with monitoring
try {
for await (const release of optimizedParser.parseStream(stream)) {
await processRelease(release);
}
} finally {
clearInterval(memoryMonitor);
}
See Also
- Parser API - Main parser documentation
- Memory Management - Memory optimization strategies
- Performance Guide - Performance optimization
- Large Files Guide - Working with large DDEX files