Database Integration
Integrate DDEX Suite with your database systems for automated data processing and storage.
Overview
Database integration enables you to:
- Process DDEX data directly from database queries
- Store parsed results in structured database tables
- Implement automated ETL pipelines
- Scale processing across multiple database instances
PostgreSQL Integration
Setup
-- Create tables for DDEX data
CREATE TABLE releases (
id SERIAL PRIMARY KEY,
ddex_id VARCHAR(255) UNIQUE,
title VARCHAR(500) NOT NULL,
artist VARCHAR(500) NOT NULL,
label VARCHAR(255),
release_date DATE,
territory_codes TEXT[],
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE tracks (
id SERIAL PRIMARY KEY,
release_id INTEGER REFERENCES releases(id),
ddex_id VARCHAR(255),
title VARCHAR(500) NOT NULL,
artist VARCHAR(500) NOT NULL,
isrc VARCHAR(12),
duration_ms INTEGER,
track_number INTEGER,
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
-- Indexes for performance
CREATE INDEX idx_releases_ddex_id ON releases(ddex_id);
CREATE INDEX idx_releases_artist ON releases(artist);
CREATE INDEX idx_releases_release_date ON releases(release_date);
CREATE INDEX idx_tracks_isrc ON tracks(isrc);
CREATE INDEX idx_tracks_release_id ON tracks(release_id);
Data Processing Pipeline
import asyncpg
import asyncio
from ddex_parser import DDEXParser
from ddex_builder import DDEXBuilder
class DDEXDatabaseProcessor:
def __init__(self, database_url):
self.database_url = database_url
self.parser = DDEXParser()
self.builder = DDEXBuilder()
async def process_incoming_ddex(self, xml_content, source_id):
"""Process DDEX XML and store in database"""
pool = await asyncpg.create_pool(self.database_url)
try:
# Parse DDEX content
parsed = self.parser.parse(xml_content)
async with pool.acquire() as conn:
async with conn.transaction():
# Insert or update release
release_id = await self.upsert_release(conn, parsed.flat.releases[0])
# Process tracks
for track in parsed.flat.tracks:
await self.upsert_track(conn, release_id, track)
# Log processing
await conn.execute("""
INSERT INTO processing_log (source_id, status, processed_at)
VALUES ($1, 'success', NOW())
""", source_id)
finally:
await pool.close()
async def upsert_release(self, conn, release):
"""Insert or update release data"""
return await conn.fetchval("""
INSERT INTO releases (ddex_id, title, artist, label, release_date, territory_codes, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (ddex_id)
DO UPDATE SET
title = EXCLUDED.title,
artist = EXCLUDED.artist,
label = EXCLUDED.label,
release_date = EXCLUDED.release_date,
territory_codes = EXCLUDED.territory_codes,
metadata = EXCLUDED.metadata,
updated_at = NOW()
RETURNING id
""", release.id, release.title, release.artist, release.label,
release.release_date, release.territory_codes, release.metadata)
async def upsert_track(self, conn, release_id, track):
"""Insert or update track data"""
await conn.execute("""
INSERT INTO tracks (release_id, ddex_id, title, artist, isrc, duration_ms, track_number, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (release_id, ddex_id)
DO UPDATE SET
title = EXCLUDED.title,
artist = EXCLUDED.artist,
isrc = EXCLUDED.isrc,
duration_ms = EXCLUDED.duration_ms,
track_number = EXCLUDED.track_number,
metadata = EXCLUDED.metadata
""", release_id, track.id, track.title, track.artist,
track.isrc, track.duration_ms, track.track_number, track.metadata)
# Usage
async def main():
processor = DDEXDatabaseProcessor('postgresql://user:pass@localhost/ddex')
with open('new_release.xml', 'r') as f:
xml_content = f.read()
await processor.process_incoming_ddex(xml_content, 'source_123')
asyncio.run(main())
MongoDB Integration
Store DDEX data in MongoDB for flexible document-based storage:
from pymongo import MongoClient
from ddex_parser import DDEXParser
import json
class DDEXMongoProcessor:
def __init__(self, connection_string):
self.client = MongoClient(connection_string)
self.db = self.client.ddex_suite
self.parser = DDEXParser()
def process_ddex_document(self, xml_content, collection_name='releases'):
"""Process DDEX and store as MongoDB document"""
# Parse DDEX
parsed = self.parser.parse(xml_content)
# Convert to MongoDB document format
document = {
'_id': parsed.flat.releases[0].id,
'ddex_version': parsed.version,
'parsed_at': datetime.utcnow(),
'graph_data': parsed.graph.to_dict(),
'flat_data': {
'releases': [r.to_dict() for r in parsed.flat.releases],
'tracks': [t.to_dict() for t in parsed.flat.tracks],
'artists': [a.to_dict() for a in parsed.flat.artists]
},
'metadata': {
'source': 'ddex-parser',
'file_size': len(xml_content),
'processing_time_ms': parsed.processing_time_ms
}
}
# Store in MongoDB
collection = self.db[collection_name]
result = collection.replace_one(
{'_id': document['_id']},
document,
upsert=True
)
return result.upserted_id or result.matched_count
# Usage
processor = DDEXMongoProcessor('mongodb://localhost:27017/')
with open('release.xml', 'r') as f:
result = processor.process_ddex_document(f.read())
print(f"Stored document: {result}")
SQLite Integration
Lightweight database integration for smaller applications:
import sqlite3 from 'sqlite3';
import { DDEXParser } from 'ddex-parser';
class DDEXSQLiteProcessor {
private db: sqlite3.Database;
private parser: DDEXParser;
constructor(dbPath: string) {
this.db = new sqlite3.Database(dbPath);
this.parser = new DDEXParser();
this.initDatabase();
}
private initDatabase() {
const schema = `
CREATE TABLE IF NOT EXISTS releases (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
artist TEXT NOT NULL,
release_date TEXT,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS tracks (
id TEXT PRIMARY KEY,
release_id TEXT REFERENCES releases(id),
title TEXT NOT NULL,
isrc TEXT,
duration_ms INTEGER,
metadata TEXT
);
`;
this.db.exec(schema);
}
async processDDEX(xmlContent: string): Promise<void> {
const parsed = await this.parser.parse(xmlContent);
return new Promise((resolve, reject) => {
this.db.serialize(() => {
this.db.run('BEGIN TRANSACTION');
// Insert release
const release = parsed.flat.releases[0];
this.db.run(
'INSERT OR REPLACE INTO releases (id, title, artist, release_date, metadata) VALUES (?, ?, ?, ?, ?)',
[release.id, release.title, release.artist, release.releaseDate, JSON.stringify(release)],
function(err) {
if (err) reject(err);
}
);
// Insert tracks
parsed.flat.tracks.forEach(track => {
this.db.run(
'INSERT OR REPLACE INTO tracks (id, release_id, title, isrc, duration_ms, metadata) VALUES (?, ?, ?, ?, ?, ?)',
[track.id, release.id, track.title, track.isrc, track.durationMs, JSON.stringify(track)]
);
});
this.db.run('COMMIT', (err) => {
if (err) reject(err);
else resolve();
});
});
});
}
}
MySQL Integration
import mysql.connector
from mysql.connector import Error
from ddex_parser import DDEXParser
class DDEXMySQLProcessor:
def __init__(self, config):
self.config = config
self.parser = DDEXParser()
def process_batch(self, xml_files):
"""Process multiple DDEX files in batch"""
try:
connection = mysql.connector.connect(**self.config)
cursor = connection.cursor()
# Prepare batch insert statements
release_data = []
track_data = []
for xml_file in xml_files:
with open(xml_file, 'r') as f:
parsed = self.parser.parse(f.read())
for release in parsed.flat.releases:
release_data.append((
release.id,
release.title,
release.artist,
release.label,
release.release_date,
json.dumps(release.to_dict())
))
for track in parsed.flat.tracks:
track_data.append((
track.id,
track.release_id,
track.title,
track.isrc,
track.duration_ms,
json.dumps(track.to_dict())
))
# Batch insert releases
cursor.executemany("""
INSERT INTO releases (ddex_id, title, artist, label, release_date, metadata)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
artist = VALUES(artist),
label = VALUES(label),
metadata = VALUES(metadata)
""", release_data)
# Batch insert tracks
cursor.executemany("""
INSERT INTO tracks (ddex_id, release_id, title, isrc, duration_ms, metadata)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
isrc = VALUES(isrc),
duration_ms = VALUES(duration_ms),
metadata = VALUES(metadata)
""", track_data)
connection.commit()
print(f"Processed {len(release_data)} releases and {len(track_data)} tracks")
except Error as e:
print(f"Database error: {e}")
if connection.is_connected():
connection.rollback()
finally:
if connection.is_connected():
cursor.close()
connection.close()
# Usage
config = {
'host': 'localhost',
'database': 'ddex_suite',
'user': 'your_user',
'password': 'your_password'
}
processor = DDEXMySQLProcessor(config)
processor.process_batch(['release1.xml', 'release2.xml', 'release3.xml'])
Data Warehouse Integration
Integrate with data warehouses for analytics:
from google.cloud import bigquery
from ddex_parser import DDEXParser
import pandas as pd
class DDEXBigQueryProcessor:
def __init__(self, project_id, dataset_id):
self.client = bigquery.Client(project=project_id)
self.dataset_id = dataset_id
self.parser = DDEXParser()
def create_tables(self):
"""Create BigQuery tables for DDEX data"""
release_schema = [
bigquery.SchemaField("ddex_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
bigquery.SchemaField("artist", "STRING", mode="REQUIRED"),
bigquery.SchemaField("release_date", "DATE"),
bigquery.SchemaField("territory_codes", "STRING", mode="REPEATED"),
bigquery.SchemaField("metadata", "JSON"),
bigquery.SchemaField("processed_at", "TIMESTAMP"),
]
track_schema = [
bigquery.SchemaField("ddex_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("release_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
bigquery.SchemaField("isrc", "STRING"),
bigquery.SchemaField("duration_ms", "INTEGER"),
bigquery.SchemaField("metadata", "JSON"),
]
# Create tables
self.create_table("releases", release_schema)
self.create_table("tracks", track_schema)
def process_to_dataframe(self, xml_content):
"""Process DDEX to DataFrame format suitable for BigQuery"""
parsed = self.parser.parse(xml_content)
df = self.parser.to_dataframe(xml_content)
# Add processing metadata
df['processed_at'] = pd.Timestamp.now()
df['ddex_version'] = parsed.version
return df
def upload_dataframe(self, df, table_name):
"""Upload DataFrame to BigQuery"""
table_id = f"{self.dataset_id}.{table_name}"
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_APPEND",
autodetect=True
)
job = self.client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result() # Wait for the job to complete
print(f"Loaded {len(df)} rows into {table_id}")
# Usage
processor = DDEXBigQueryProcessor('my-project', 'ddex_analytics')
processor.create_tables()
with open('releases.xml', 'r') as f:
df = processor.process_to_dataframe(f.read())
processor.upload_dataframe(df, 'releases')
Performance Optimization
Connection Pooling
from sqlalchemy import create_engine, pool
from sqlalchemy.orm import sessionmaker
from ddex_parser import DDEXParser
class PooledDDEXProcessor:
def __init__(self, database_url):
self.engine = create_engine(
database_url,
poolclass=pool.QueuePool,
pool_size=20,
max_overflow=0,
pool_pre_ping=True,
pool_recycle=300
)
self.Session = sessionmaker(bind=self.engine)
self.parser = DDEXParser()
def process_with_pool(self, xml_files):
"""Process files using connection pool"""
for xml_file in xml_files:
session = self.Session()
try:
with open(xml_file, 'r') as f:
parsed = self.parser.parse(f.read())
# Process data with current session
self.store_parsed_data(session, parsed)
session.commit()
except Exception as e:
session.rollback()
print(f"Error processing {xml_file}: {e}")
finally:
session.close()
Bulk Operations
-- Optimize with bulk operations
COPY releases (ddex_id, title, artist, metadata)
FROM '/path/to/bulk_releases.csv'
WITH (FORMAT csv, HEADER true);
-- Use UPSERT for efficient updates
INSERT INTO releases (ddex_id, title, artist, metadata)
VALUES %s
ON CONFLICT (ddex_id)
DO UPDATE SET
title = EXCLUDED.title,
artist = EXCLUDED.artist,
metadata = EXCLUDED.metadata;
Best Practices
- Use connection pooling for high-throughput applications
- Implement proper indexing on frequently queried fields
- Use transactions for data consistency
- Batch operations for better performance
- Handle errors gracefully with proper rollback mechanisms
- Monitor database performance and optimize queries
- Implement data retention policies for large datasets
- Use appropriate data types for storage efficiency