Legacy System Migration
Migrate from legacy music metadata systems to DDEX Suite.
Overview
Legacy system migration involves:
- Migrating from proprietary formats
- Database schema transformations
- API integration updates
- Workflow modernization
- Data quality improvements
Common Legacy Systems
CSV/Spreadsheet Migration
import pandas as pd
from ddex_builder import DDEXBuilder
from typing import Dict, Any, List
class CSVToDDEXMigrator:
def __init__(self):
self.builder = DDEXBuilder()
def migrate_csv_to_ddex(self, csv_file: str) -> str:
"""Convert CSV metadata to DDEX XML"""
# Read CSV data
df = pd.read_csv(csv_file)
# Validate required columns
required_columns = ['album_title', 'artist_name', 'track_title', 'duration']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Group by album
releases = self._group_by_album(df)
# Build DDEX data structure
ddex_data = {
'releases': releases
}
# Generate DDEX XML
return self.builder.build(ddex_data)
def _group_by_album(self, df: pd.DataFrame) -> List[Dict[str, Any]]:
"""Group tracks by album"""
releases = []
# Group by album identifier (title + artist)
album_groups = df.groupby(['album_title', 'artist_name'])
for (album_title, artist_name), tracks_df in album_groups:
release = {
'id': self._generate_release_id(album_title, artist_name),
'title': album_title,
'artist': artist_name,
'release_date': self._extract_release_date(tracks_df),
'label': self._extract_label(tracks_df),
'upc': self._extract_upc(tracks_df),
'tracks': []
}
# Add tracks
for idx, track_row in tracks_df.iterrows():
track = {
'id': self._generate_track_id(track_row),
'title': track_row['track_title'],
'artist': track_row.get('track_artist', artist_name),
'duration_ms': self._parse_duration(track_row['duration']),
'track_number': track_row.get('track_number', idx + 1),
'isrc': track_row.get('isrc'),
'genre': self._parse_genre(track_row.get('genre')),
'parental_warning_type': self._parse_explicit(track_row.get('explicit'))
}
release['tracks'].append(track)
releases.append(release)
return releases
def _generate_release_id(self, title: str, artist: str) -> str:
"""Generate unique release ID"""
import hashlib
combined = f"{title}:{artist}".encode('utf-8')
hash_obj = hashlib.md5(combined)
return f"R{hash_obj.hexdigest()[:8].upper()}"
def _generate_track_id(self, track_row) -> str:
"""Generate unique track ID"""
import hashlib
combined = f"{track_row['track_title']}:{track_row.get('track_artist', '')}".encode('utf-8')
hash_obj = hashlib.md5(combined)
return f"T{hash_obj.hexdigest()[:8].upper()}"
def _parse_duration(self, duration_str: str) -> int:
"""Parse duration string to milliseconds"""
if pd.isna(duration_str):
return 0
# Handle different formats: "3:45", "225", "3m45s"
duration_str = str(duration_str).strip()
if ':' in duration_str:
# Format: "3:45" (minutes:seconds)
parts = duration_str.split(':')
minutes = int(parts[0])
seconds = int(parts[1])
return (minutes * 60 + seconds) * 1000
elif duration_str.isdigit():
# Format: "225" (seconds)
return int(duration_str) * 1000
else:
# Try to extract numbers for formats like "3m45s"
import re
matches = re.findall(r'(\d+)[ms]', duration_str.lower())
if len(matches) >= 2:
minutes = int(matches[0])
seconds = int(matches[1])
return (minutes * 60 + seconds) * 1000
return 0
def _extract_release_date(self, tracks_df: pd.DataFrame) -> str:
"""Extract release date from tracks"""
if 'release_date' in tracks_df.columns:
date_val = tracks_df['release_date'].iloc[0]
if pd.notna(date_val):
# Try to parse and normalize date format
try:
import datetime
parsed_date = pd.to_datetime(date_val)
return parsed_date.strftime('%Y-%m-%d')
except:
pass
# Default to current year if no date found
import datetime
return f"{datetime.datetime.now().year}-01-01"
def _extract_label(self, tracks_df: pd.DataFrame) -> str:
"""Extract record label"""
if 'label' in tracks_df.columns:
label_val = tracks_df['label'].iloc[0]
if pd.notna(label_val):
return str(label_val)
return "Unknown Label"
def _extract_upc(self, tracks_df: pd.DataFrame) -> str:
"""Extract UPC code"""
if 'upc' in tracks_df.columns:
upc_val = tracks_df['upc'].iloc[0]
if pd.notna(upc_val):
return str(upc_val)
return None
def _parse_genre(self, genre_str) -> List[str]:
"""Parse genre string to list"""
if pd.isna(genre_str):
return []
# Handle comma-separated genres
genres = [g.strip() for g in str(genre_str).split(',')]
return [g for g in genres if g]
def _parse_explicit(self, explicit_val) -> str:
"""Parse explicit content flag"""
if pd.isna(explicit_val):
return 'Unknown'
explicit_str = str(explicit_val).lower()
if explicit_str in ['true', '1', 'yes', 'explicit']:
return 'Explicit'
elif explicit_str in ['false', '0', 'no', 'clean']:
return 'NotExplicit'
return 'Unknown'
# Usage example
migrator = CSVToDDEXMigrator()
# Convert CSV to DDEX
ddex_xml = migrator.migrate_csv_to_ddex('legacy_catalog.csv')
# Save DDEX output
with open('converted_catalog.xml', 'w') as f:
f.write(ddex_xml)
print("Successfully migrated CSV to DDEX format")
Database Migration
import { DDEXBuilder } from 'ddex-builder';
import { Pool } from 'pg';
export class DatabaseToDDEXMigrator {
private pool: Pool;
private builder: DDEXBuilder;
constructor(connectionConfig: any) {
this.pool = new Pool(connectionConfig);
this.builder = new DDEXBuilder();
}
async migrateFromDatabase(): Promise<void> {
const client = await this.pool.connect();
try {
// Migrate albums/releases
const releases = await this.fetchReleases(client);
for (const release of releases) {
const ddexData = await this.buildDDEXFromRelease(client, release);
const ddexXml = await this.builder.build(ddexData);
// Save or process DDEX XML
await this.saveDDEXFile(release.id, ddexXml);
}
} finally {
client.release();
}
}
private async fetchReleases(client: any): Promise<any[]> {
const query = `
SELECT
r.id,
r.title,
r.release_date,
r.label_id,
r.upc_code,
l.name as label_name,
a.name as artist_name
FROM releases r
LEFT JOIN labels l ON r.label_id = l.id
LEFT JOIN artists a ON r.primary_artist_id = a.id
ORDER BY r.created_at
`;
const result = await client.query(query);
return result.rows;
}
private async buildDDEXFromRelease(client: any, release: any): Promise<any> {
// Fetch tracks for this release
const tracks = await this.fetchTracksForRelease(client, release.id);
// Fetch additional metadata
const genres = await this.fetchGenresForRelease(client, release.id);
const territories = await this.fetchTerritoriesForRelease(client, release.id);
return {
releases: [{
id: `R${release.id.toString().padStart(6, '0')}`,
title: release.title,
artist: release.artist_name,
release_date: release.release_date,
label: release.label_name,
upc: release.upc_code,
genre: genres,
territory_codes: territories,
tracks: tracks.map((track, index) => ({
id: `T${track.id.toString().padStart(6, '0')}`,
title: track.title,
artist: track.artist_name || release.artist_name,
duration_ms: track.duration_seconds * 1000,
track_number: track.track_number || index + 1,
isrc: track.isrc_code,
parental_warning_type: this.mapExplicitFlag(track.is_explicit),
genre: track.genre ? [track.genre] : []
}))
}]
};
}
private async fetchTracksForRelease(client: any, releaseId: number): Promise<any[]> {
const query = `
SELECT
t.id,
t.title,
t.duration_seconds,
t.track_number,
t.isrc_code,
t.is_explicit,
t.genre,
a.name as artist_name
FROM tracks t
LEFT JOIN artists a ON t.artist_id = a.id
WHERE t.release_id = $1
ORDER BY t.track_number
`;
const result = await client.query(query, [releaseId]);
return result.rows;
}
private async fetchGenresForRelease(client: any, releaseId: number): Promise<string[]> {
const query = `
SELECT g.name
FROM release_genres rg
JOIN genres g ON rg.genre_id = g.id
WHERE rg.release_id = $1
`;
const result = await client.query(query, [releaseId]);
return result.rows.map(row => row.name);
}
private async fetchTerritoriesForRelease(client: any, releaseId: number): Promise<string[]> {
const query = `
SELECT t.code
FROM release_territories rt
JOIN territories t ON rt.territory_id = t.id
WHERE rt.release_id = $1
`;
const result = await client.query(query, [releaseId]);
return result.rows.map(row => row.code);
}
private mapExplicitFlag(isExplicit: boolean | null): string {
if (isExplicit === true) return 'Explicit';
if (isExplicit === false) return 'NotExplicit';
return 'Unknown';
}
private async saveDDEXFile(releaseId: number, ddexXml: string): Promise<void> {
const fs = require('fs').promises;
const filename = `release_${releaseId}_ddex.xml`;
await fs.writeFile(`./output/${filename}`, ddexXml, 'utf8');
}
async close(): Promise<void> {
await this.pool.end();
}
}
// Usage
const migrator = new DatabaseToDDEXMigrator({
host: 'localhost',
port: 5432,
database: 'legacy_music_db',
user: 'username',
password: 'password'
});
await migrator.migrateFromDatabase();
await migrator.close();
API Migration Strategy
Legacy API Wrapper
from flask import Flask, request, jsonify
from ddex_parser import DDEXParser
from ddex_builder import DDEXBuilder
class LegacyAPIAdapter:
"""Adapter to maintain legacy API compatibility while using DDEX Suite internally"""
def __init__(self):
self.parser = DDEXParser()
self.builder = DDEXBuilder()
self.app = Flask(__name__)
self._setup_routes()
def _setup_routes(self):
"""Setup legacy API endpoints"""
@self.app.route('/api/v1/albums', methods=['POST'])
def create_album():
"""Legacy endpoint that now uses DDEX internally"""
legacy_data = request.json
try:
# Convert legacy format to DDEX
ddex_data = self._convert_legacy_to_ddex(legacy_data)
# Use DDEX Suite for processing
ddex_xml = self.builder.build(ddex_data)
# Parse back for validation
parsed = self.parser.parse(ddex_xml)
# Convert back to legacy format for response
response_data = self._convert_ddex_to_legacy(parsed.flat)
return jsonify({
'status': 'success',
'data': response_data
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 400
@self.app.route('/api/v1/albums/<album_id>', methods=['GET'])
def get_album(album_id):
"""Legacy endpoint to retrieve album data"""
try:
# In real implementation, load from database/storage
ddex_xml = self._load_ddex_for_album(album_id)
if not ddex_xml:
return jsonify({'error': 'Album not found'}), 404
# Parse with DDEX Suite
parsed = self.parser.parse(ddex_xml)
# Convert to legacy format
legacy_data = self._convert_ddex_to_legacy(parsed.flat)
return jsonify(legacy_data)
except Exception as e:
return jsonify({'error': str(e)}), 500
def _convert_legacy_to_ddex(self, legacy_data: dict) -> dict:
"""Convert legacy API format to DDEX format"""
ddex_release = {
'id': legacy_data.get('album_id', f"R{hash(legacy_data['title']) % 100000:05d}"),
'title': legacy_data['title'],
'artist': legacy_data['artist'],
'release_date': legacy_data.get('release_date'),
'label': legacy_data.get('label'),
'upc': legacy_data.get('upc'),
'genre': legacy_data.get('genres', []),
'tracks': []
}
# Convert tracks
for track_data in legacy_data.get('songs', []):
ddex_track = {
'id': track_data.get('song_id', f"T{hash(track_data['title']) % 100000:05d}"),
'title': track_data['title'],
'artist': track_data.get('artist', legacy_data['artist']),
'duration_ms': track_data.get('duration_seconds', 0) * 1000,
'track_number': track_data.get('track_number'),
'isrc': track_data.get('isrc'),
'parental_warning_type': 'Explicit' if track_data.get('explicit') else 'NotExplicit'
}
ddex_release['tracks'].append(ddex_track)
return {'releases': [ddex_release]}
def _convert_ddex_to_legacy(self, ddex_flat) -> dict:
"""Convert DDEX flat format back to legacy API format"""
if not ddex_flat.releases:
return {}
release = ddex_flat.releases[0]
legacy_data = {
'album_id': release.id,
'title': release.title,
'artist': release.artist,
'release_date': release.release_date,
'label': release.label,
'upc': release.upc,
'genres': release.genre or [],
'songs': []
}
# Convert tracks back to legacy format
for track in release.tracks or []:
legacy_track = {
'song_id': track.id,
'title': track.title,
'artist': track.artist,
'duration_seconds': track.duration_ms // 1000 if track.duration_ms else 0,
'track_number': track.track_number,
'isrc': track.isrc,
'explicit': track.parental_warning_type == 'Explicit'
}
legacy_data['songs'].append(legacy_track)
return legacy_data
def _load_ddex_for_album(self, album_id: str) -> str:
"""Load DDEX XML for album (implementation depends on storage)"""
# This would load from your storage system
# For demo purposes, return None
return None
def run(self, host='0.0.0.0', port=5000, debug=False):
"""Run the legacy API server"""
self.app.run(host=host, port=port, debug=debug)
# Usage
adapter = LegacyAPIAdapter()
adapter.run(debug=True)
Migration Validation
import json
from typing import Dict, List, Any
from dataclasses import dataclass
@dataclass
class MigrationValidationResult:
is_valid: bool
errors: List[str]
warnings: List[str]
data_quality_score: float
missing_fields: List[str]
recommendations: List[str]
class MigrationValidator:
"""Validate migrated data quality and completeness"""
def __init__(self):
self.required_fields = {
'release': ['id', 'title', 'artist'],
'track': ['id', 'title', 'duration_ms']
}
self.recommended_fields = {
'release': ['release_date', 'label', 'upc', 'genre'],
'track': ['track_number', 'isrc', 'artist']
}
def validate_migration(self,
original_data: Dict[str, Any],
migrated_data: Dict[str, Any]) -> MigrationValidationResult:
"""Comprehensive migration validation"""
errors = []
warnings = []
missing_fields = []
# Data completeness check
completeness_score = self._check_data_completeness(
original_data, migrated_data, errors, warnings, missing_fields
)
# Data accuracy check
accuracy_score = self._check_data_accuracy(
original_data, migrated_data, errors, warnings
)
# Data quality check
quality_score = self._check_data_quality(migrated_data, warnings)
# Overall score
overall_score = (completeness_score + accuracy_score + quality_score) / 3
# Generate recommendations
recommendations = self._generate_recommendations(
errors, warnings, missing_fields, overall_score
)
return MigrationValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
data_quality_score=overall_score,
missing_fields=missing_fields,
recommendations=recommendations
)
def _check_data_completeness(self, original, migrated, errors, warnings, missing_fields):
"""Check if all essential data was migrated"""
score = 100.0
# Check releases count
orig_releases = len(original.get('releases', []))
migr_releases = len(migrated.get('releases', []))
if migr_releases < orig_releases:
errors.append(f"Missing releases: {orig_releases - migr_releases}")
score -= 20
# Check tracks for each release
for i, release in enumerate(migrated.get('releases', [])):
release_path = f"releases[{i}]"
# Required fields
for field in self.required_fields['release']:
if not release.get(field):
missing_fields.append(f"{release_path}.{field}")
score -= 10
# Recommended fields
for field in self.recommended_fields['release']:
if not release.get(field):
warnings.append(f"Missing recommended field: {release_path}.{field}")
score -= 2
# Track completeness
orig_tracks = self._get_original_track_count(original, i)
migr_tracks = len(release.get('tracks', []))
if migr_tracks < orig_tracks:
errors.append(f"{release_path}: Missing tracks ({orig_tracks - migr_tracks})")
score -= 15
return max(0, score)
def _check_data_accuracy(self, original, migrated, errors, warnings):
"""Check if migrated data matches original data"""
score = 100.0
# Compare key fields for accuracy
for i, (orig_rel, migr_rel) in enumerate(zip(
original.get('releases', []),
migrated.get('releases', [])
)):
release_path = f"releases[{i}]"
# Title comparison
if orig_rel.get('title') != migr_rel.get('title'):
errors.append(f"{release_path}: Title mismatch")
score -= 15
# Artist comparison
if orig_rel.get('artist') != migr_rel.get('artist'):
errors.append(f"{release_path}: Artist mismatch")
score -= 15
# Track comparison
for j, (orig_track, migr_track) in enumerate(zip(
orig_rel.get('tracks', []),
migr_rel.get('tracks', [])
)):
track_path = f"{release_path}.tracks[{j}]"
if orig_track.get('title') != migr_track.get('title'):
warnings.append(f"{track_path}: Track title mismatch")
score -= 5
return max(0, score)
def _check_data_quality(self, migrated, warnings):
"""Check overall data quality"""
score = 100.0
for i, release in enumerate(migrated.get('releases', [])):
release_path = f"releases[{i}]"
# Check for suspiciously short titles
if release.get('title') and len(release['title']) < 3:
warnings.append(f"{release_path}: Very short title")
score -= 2
# Check for missing ISRCs
tracks_with_isrc = sum(1 for track in release.get('tracks', [])
if track.get('isrc'))
total_tracks = len(release.get('tracks', []))
if total_tracks > 0:
isrc_percentage = tracks_with_isrc / total_tracks
if isrc_percentage < 0.5:
warnings.append(f"{release_path}: Low ISRC coverage ({isrc_percentage:.1%})")
score -= 10
# Check track duration reasonableness
for j, track in enumerate(release.get('tracks', [])):
track_path = f"{release_path}.tracks[{j}]"
duration = track.get('duration_ms', 0)
if duration > 0 and (duration < 30000 or duration > 1800000): # 30s - 30min
warnings.append(f"{track_path}: Unusual duration ({duration/1000:.1f}s)")
score -= 1
return max(0, score)
def _get_original_track_count(self, original, release_index):
"""Get track count from original data"""
if release_index < len(original.get('releases', [])):
return len(original['releases'][release_index].get('tracks', []))
return 0
def _generate_recommendations(self, errors, warnings, missing_fields, score):
"""Generate migration recommendations"""
recommendations = []
if score < 70:
recommendations.append("Migration quality is below acceptable threshold - consider review")
if errors:
recommendations.append("Address all errors before proceeding with migration")
if len(missing_fields) > 10:
recommendations.append("High number of missing fields - review data mapping")
if len(warnings) > 20:
recommendations.append("Many warnings detected - review data quality")
if score > 90:
recommendations.append("Migration looks good - ready for production use")
return recommendations
# Usage example
validator = MigrationValidator()
# Load original and migrated data for comparison
with open('original_data.json', 'r') as f:
original = json.load(f)
with open('migrated_data.json', 'r') as f:
migrated = json.load(f)
# Validate migration
result = validator.validate_migration(original, migrated)
print(f"Migration Valid: {result.is_valid}")
print(f"Quality Score: {result.data_quality_score:.1f}%")
for error in result.errors:
print(f"ERROR: {error}")
for warning in result.warnings:
print(f"WARNING: {warning}")
for rec in result.recommendations:
print(f"RECOMMENDATION: {rec}")
Best Practices
- Incremental Migration: Migrate data in small batches to identify issues early
- Data Validation: Thoroughly validate migrated data against original
- Backup Strategy: Always backup original data before migration
- Schema Mapping: Document field mappings between legacy and DDEX formats
- Quality Assurance: Implement comprehensive quality checks
- Testing: Test migration process with representative sample data
- Rollback Plan: Have a clear rollback strategy if migration fails
- Documentation: Document migration process and decisions
- Performance Monitoring: Monitor migration performance and resource usage
- User Communication: Keep stakeholders informed of migration progress and impacts