Skip to main content

Using DDEX with Pandas DataFrames

Learn how to leverage pandas DataFrames for powerful DDEX data analysis, transformation, and visualization.

Overview

The DDEX Suite provides seamless integration with pandas DataFrames, enabling:

  • Data Analysis: Statistical analysis of catalog metadata
  • Data Cleaning: Identifying and fixing data quality issues
  • Transformation: Converting between formats and structures
  • Visualization: Creating charts and reports from DDEX data
  • Machine Learning: Feature extraction for recommendation systems

Basic DataFrame Operations

Converting DDEX to DataFrame

import pandas as pd
from ddex_parser import DDEXParser
from ddex_builder import DDEXBuilder

# Initialize parser
parser = DDEXParser()

# Parse DDEX XML to DataFrame
with open('release.xml', 'r') as f:
xml_content = f.read()

# Direct conversion to DataFrame
df = parser.to_dataframe(xml_content)
print(f"DataFrame shape: {df.shape}")
print(f"Columns: {list(df.columns)}")

# Preview the data
print(df.head())

DataFrame Structure

The DDEX DataFrame has a flattened structure with these key columns:

# Examine DataFrame structure
print("Core columns:")
core_columns = [
'message_id', 'release_id', 'sound_recording_id', 'isrc',
'title', 'display_artist', 'label_name', 'release_date',
'genre', 'territory', 'deal_type', 'distribution_channel'
]

for col in core_columns:
if col in df.columns:
print(f" {col}: {df[col].dtype}")

# Check for missing values
print("\nMissing values:")
print(df.isnull().sum())

Data Analysis Examples

Catalog Statistics

import matplotlib.pyplot as plt
import seaborn as sns

class DDEXAnalyzer:
def __init__(self, df: pd.DataFrame):
self.df = df

def catalog_overview(self) -> dict:
"""Generate comprehensive catalog statistics"""
stats = {
'total_releases': self.df['release_id'].nunique(),
'total_tracks': self.df['sound_recording_id'].nunique(),
'unique_artists': self.df['display_artist'].nunique(),
'unique_labels': self.df['label_name'].nunique(),
'date_range': {
'earliest': self.df['release_date'].min(),
'latest': self.df['release_date'].max()
},
'territories': self.df['territory'].nunique(),
'genres': self.df['genre'].nunique()
}

return stats

def top_entities(self, n: int = 10) -> dict:
"""Find top artists, labels, and genres"""
return {
'artists': self.df['display_artist'].value_counts().head(n),
'labels': self.df['label_name'].value_counts().head(n),
'genres': self.df['genre'].value_counts().head(n)
}

def release_trends(self) -> pd.DataFrame:
"""Analyze release trends over time"""
# Extract year from release_date
self.df['release_year'] = pd.to_datetime(self.df['release_date']).dt.year

trends = self.df.groupby('release_year').agg({
'release_id': 'nunique',
'sound_recording_id': 'nunique',
'display_artist': 'nunique'
}).rename(columns={
'release_id': 'releases',
'sound_recording_id': 'tracks',
'display_artist': 'artists'
})

return trends

def territory_analysis(self) -> pd.DataFrame:
"""Analyze content distribution by territory"""
territory_stats = self.df.groupby('territory').agg({
'release_id': 'nunique',
'display_artist': 'nunique',
'label_name': 'nunique',
'genre': lambda x: x.mode().iloc[0] if not x.empty else None
}).rename(columns={
'release_id': 'releases',
'display_artist': 'artists',
'label_name': 'labels',
'genre': 'top_genre'
})

return territory_stats.sort_values('releases', ascending=False)

# Usage
analyzer = DDEXAnalyzer(df)
stats = analyzer.catalog_overview()
print(f"Catalog contains {stats['total_releases']} releases")
print(f"Date range: {stats['date_range']['earliest']} to {stats['date_range']['latest']}")

top_entities = analyzer.top_entities()
print(f"Top artist: {top_entities['artists'].index[0]} ({top_entities['artists'].iloc[0]} releases)")

Data Quality Assessment

class DataQualityChecker:
def __init__(self, df: pd.DataFrame):
self.df = df
self.issues = []

def check_missing_fields(self) -> pd.DataFrame:
"""Identify missing critical fields"""
critical_fields = ['title', 'display_artist', 'isrc', 'release_date']

missing_report = pd.DataFrame({
'field': critical_fields,
'missing_count': [self.df[field].isnull().sum() for field in critical_fields],
'missing_percentage': [self.df[field].isnull().mean() * 100 for field in critical_fields]
})

return missing_report.sort_values('missing_percentage', ascending=False)

def check_duplicate_isrcs(self) -> pd.DataFrame:
"""Find duplicate ISRCs (potential data quality issue)"""
isrc_counts = self.df['isrc'].value_counts()
duplicates = isrc_counts[isrc_counts > 1]

if len(duplicates) > 0:
duplicate_details = self.df[self.df['isrc'].isin(duplicates.index)][
['isrc', 'title', 'display_artist', 'release_date']
].sort_values('isrc')
return duplicate_details

return pd.DataFrame()

def check_date_consistency(self) -> pd.DataFrame:
"""Check for date inconsistencies"""
# Convert to datetime
self.df['release_date_parsed'] = pd.to_datetime(self.df['release_date'], errors='coerce')

# Find invalid dates
invalid_dates = self.df[self.df['release_date_parsed'].isnull() & self.df['release_date'].notna()]

# Find future dates
future_dates = self.df[self.df['release_date_parsed'] > pd.Timestamp.now()]

issues = pd.DataFrame({
'issue_type': ['Invalid Date Format', 'Future Release Date'],
'count': [len(invalid_dates), len(future_dates)]
})

return issues

def generate_quality_report(self) -> dict:
"""Generate comprehensive data quality report"""
report = {
'missing_fields': self.check_missing_fields(),
'duplicate_isrcs': self.check_duplicate_isrcs(),
'date_issues': self.check_date_consistency(),
'completeness_score': self.calculate_completeness_score()
}

return report

def calculate_completeness_score(self) -> float:
"""Calculate overall data completeness score (0-100)"""
critical_fields = ['title', 'display_artist', 'isrc', 'release_date', 'genre']

total_cells = len(self.df) * len(critical_fields)
filled_cells = total_cells - self.df[critical_fields].isnull().sum().sum()

return (filled_cells / total_cells) * 100

# Usage
quality_checker = DataQualityChecker(df)
quality_report = quality_checker.generate_quality_report()

print(f"Data completeness score: {quality_report['completeness_score']:.1f}%")
print("\nMissing field analysis:")
print(quality_report['missing_fields'])

if not quality_report['duplicate_isrcs'].empty:
print(f"\nFound {len(quality_report['duplicate_isrcs'])} duplicate ISRCs")

Data Transformation

Cleaning and Standardization

class DDEXDataCleaner:
def __init__(self, df: pd.DataFrame):
self.df = df.copy()

def standardize_artist_names(self) -> pd.DataFrame:
"""Standardize artist name formats"""
# Remove extra whitespace
self.df['display_artist'] = self.df['display_artist'].str.strip()

# Standardize featuring formats
self.df['display_artist'] = self.df['display_artist'].str.replace(
r'\s+(ft\.?|feat\.?|featuring)\s+', ' feat. ', regex=True, case=False
)

# Standardize "and" vs "&"
self.df['display_artist'] = self.df['display_artist'].str.replace(' & ', ' and ')

return self.df

def normalize_genres(self) -> pd.DataFrame:
"""Normalize genre categories"""
genre_mapping = {
'Hip-Hop': ['Hip Hop', 'Rap', 'Hip-hop'],
'Electronic': ['EDM', 'House', 'Techno', 'Electronic Dance Music'],
'R&B': ['RnB', 'R and B', 'Rhythm and Blues'],
'Alternative': ['Alt Rock', 'Alternative Rock', 'Indie Rock']
}

for standard_genre, variants in genre_mapping.items():
mask = self.df['genre'].isin(variants)
self.df.loc[mask, 'genre'] = standard_genre

return self.df

def fix_date_formats(self) -> pd.DataFrame:
"""Standardize date formats"""
# Convert to datetime and back to ISO format
self.df['release_date'] = pd.to_datetime(
self.df['release_date'],
errors='coerce'
).dt.strftime('%Y-%m-%d')

return self.df

def deduplicate_releases(self) -> pd.DataFrame:
"""Remove duplicate releases based on ISRC"""
# Keep first occurrence of each ISRC
self.df = self.df.drop_duplicates(subset=['isrc'], keep='first')
return self.df

def clean_all(self) -> pd.DataFrame:
"""Apply all cleaning operations"""
self.standardize_artist_names()
self.normalize_genres()
self.fix_date_formats()
self.deduplicate_releases()
return self.df

# Usage
cleaner = DDEXDataCleaner(df)
cleaned_df = cleaner.clean_all()
print(f"Original: {len(df)} rows, Cleaned: {len(cleaned_df)} rows")

Feature Engineering

class DDEXFeatureEngineer:
def __init__(self, df: pd.DataFrame):
self.df = df.copy()

def extract_temporal_features(self) -> pd.DataFrame:
"""Extract temporal features from release dates"""
self.df['release_date'] = pd.to_datetime(self.df['release_date'])

self.df['release_year'] = self.df['release_date'].dt.year
self.df['release_month'] = self.df['release_date'].dt.month
self.df['release_quarter'] = self.df['release_date'].dt.quarter
self.df['release_weekday'] = self.df['release_date'].dt.day_name()

# Calculate time since release
self.df['days_since_release'] = (pd.Timestamp.now() - self.df['release_date']).dt.days

return self.df

def create_artist_features(self) -> pd.DataFrame:
"""Create artist-level features"""
# Artist productivity
artist_stats = self.df.groupby('display_artist').agg({
'release_id': 'nunique',
'sound_recording_id': 'nunique',
'genre': lambda x: x.mode().iloc[0] if not x.empty else None,
'label_name': 'nunique'
}).rename(columns={
'release_id': 'artist_release_count',
'sound_recording_id': 'artist_track_count',
'genre': 'artist_primary_genre',
'label_name': 'artist_label_count'
})

# Merge back to main DataFrame
self.df = self.df.merge(artist_stats, left_on='display_artist', right_index=True)

# Artist collaboration indicator
self.df['is_collaboration'] = self.df['display_artist'].str.contains(
r'(feat\.?|ft\.?|and|&|,)', regex=True, case=False
)

return self.df

def create_label_features(self) -> pd.DataFrame:
"""Create label-level features"""
label_stats = self.df.groupby('label_name').agg({
'release_id': 'nunique',
'display_artist': 'nunique',
'genre': lambda x: x.mode().iloc[0] if not x.empty else None
}).rename(columns={
'release_id': 'label_release_count',
'display_artist': 'label_artist_count',
'genre': 'label_primary_genre'
})

self.df = self.df.merge(label_stats, left_on='label_name', right_index=True)
return self.df

def create_genre_features(self) -> pd.DataFrame:
"""Create genre-based features"""
# One-hot encode genres
genre_dummies = pd.get_dummies(self.df['genre'], prefix='genre')
self.df = pd.concat([self.df, genre_dummies], axis=1)

# Genre popularity score
genre_popularity = self.df['genre'].value_counts(normalize=True)
self.df['genre_popularity_score'] = self.df['genre'].map(genre_popularity)

return self.df

# Usage
feature_engineer = DDEXFeatureEngineer(cleaned_df)
feature_engineer.extract_temporal_features()
feature_engineer.create_artist_features()
feature_engineer.create_label_features()
feature_engineer.create_genre_features()

enriched_df = feature_engineer.df
print(f"Added features. New shape: {enriched_df.shape}")

Converting DataFrame Back to DDEX

Building DDEX from Modified DataFrame

from ddex_builder import DDEXBuilder

class DataFrameToDDEX:
def __init__(self):
self.builder = DDEXBuilder()

def df_to_ddex(self, df: pd.DataFrame, preset: 'youtube_album'') -> str:
"""Convert DataFrame back to DDEX XML"""

# Group by release to reconstruct release structure
releases = []

for release_id, release_group in df.groupby('release_id'):
# Get release-level info (first row)
release_info = release_group.iloc[0]

# Collect sound recordings for this release
sound_recordings = []
for _, track in release_group.iterrows():
if pd.notna(track['sound_recording_id']):
sound_recordings.append({
'soundRecordingId': track['sound_recording_id'],
'isrc': track['isrc'],
'title': track['title'],
'displayArtist': track['display_artist'],
'duration': track.get('duration', 'PT3M30S')
})

# Create release object
release = {
'releaseId': release_id,
'releaseType': release_info.get('release_type', 'Single'),
'releaseDetailsByTerritory': [{
'territory': release_info.get('territory', 'Worldwide'),
'displayArtist': release_info['display_artist'],
'labelName': release_info['label_name'],
'title': release_info['title'],
'releaseDate': release_info['release_date'],
'genre': release_info['genre']
}],
'soundRecordings': sound_recordings
}
releases.append(release)

# Create build request
build_request = {
'version': '4.3',
'messageHeader': {
'messageId': f"MSG_{pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')}",
'sentOnBehalfOf': 'DataProcessor',
'messageRecipient': 'Platform'
},
'updateIndicator': 'OriginalMessage',
'messageControlType': 'LiveMessage',
'releaseList': releases
}

# Build XML
return self.builder.build(build_request, preset=preset)

# Usage - modify DataFrame and convert back to DDEX
modified_df = enriched_df.copy()

# Example modification: update all pop genre to "Pop/Rock"
modified_df.loc[modified_df['genre'] == 'Pop', 'genre'] = 'Pop/Rock'

# Convert back to DDEX
converter = DataFrameToDDEX()
updated_xml = converter.df_to_ddex(modified_df)

# Save updated DDEX
with open('updated_catalog.xml', 'w') as f:
f.write(updated_xml)

Advanced Analytics

Statistical Analysis

import scipy.stats as stats
from sklearn.preprocessing import LabelEncoder
from sklearn.cluster import KMeans

class DDEXStatistics:
def __init__(self, df: pd.DataFrame):
self.df = df

def release_frequency_analysis(self) -> dict:
"""Analyze release frequency patterns"""
# Releases per artist distribution
releases_per_artist = self.df.groupby('display_artist')['release_id'].nunique()

return {
'mean_releases_per_artist': releases_per_artist.mean(),
'median_releases_per_artist': releases_per_artist.median(),
'distribution_stats': releases_per_artist.describe(),
'prolific_artists': releases_per_artist.nlargest(10)
}

def genre_correlation_analysis(self) -> pd.DataFrame:
"""Analyze correlations between genres and other factors"""
# Encode categorical variables
le_genre = LabelEncoder()
le_label = LabelEncoder()

analysis_df = self.df.copy()
analysis_df['genre_encoded'] = le_genre.fit_transform(analysis_df['genre'])
analysis_df['label_encoded'] = le_label.fit_transform(analysis_df['label_name'])

# Calculate correlations
correlation_matrix = analysis_df[[
'genre_encoded', 'label_encoded', 'release_year',
'artist_release_count', 'label_release_count'
]].corr()

return correlation_matrix

def seasonal_analysis(self) -> pd.DataFrame:
"""Analyze seasonal release patterns"""
monthly_releases = self.df.groupby('release_month').agg({
'release_id': 'nunique',
'genre': lambda x: x.mode().iloc[0] if not x.empty else None
}).rename(columns={
'release_id': 'release_count',
'genre': 'dominant_genre'
})

# Add month names
month_names = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
monthly_releases['month_name'] = [month_names[i-1] for i in monthly_releases.index]

return monthly_releases

# Usage
stats_analyzer = DDEXStatistics(enriched_df)
frequency_analysis = stats_analyzer.release_frequency_analysis()
print(f"Average releases per artist: {frequency_analysis['mean_releases_per_artist']:.2f}")

seasonal_data = stats_analyzer.seasonal_analysis()
print("\nSeasonal release patterns:")
print(seasonal_data[['month_name', 'release_count', 'dominant_genre']])

Clustering and Segmentation

class DDEXClustering:
def __init__(self, df: pd.DataFrame):
self.df = df

def artist_segmentation(self, n_clusters: int = 5) -> pd.DataFrame:
"""Segment artists based on their characteristics"""
# Prepare features for clustering
artist_features = self.df.groupby('display_artist').agg({
'release_id': 'nunique',
'sound_recording_id': 'nunique',
'genre': lambda x: x.mode().iloc[0] if not x.empty else 'Unknown',
'label_name': 'nunique',
'days_since_release': 'mean'
}).rename(columns={
'release_id': 'total_releases',
'sound_recording_id': 'total_tracks',
'genre': 'primary_genre',
'label_name': 'label_count',
'days_since_release': 'avg_days_since_release'
})

# Encode categorical variables
le_genre = LabelEncoder()
artist_features['genre_encoded'] = le_genre.fit_transform(artist_features['primary_genre'])

# Select numeric features for clustering
cluster_features = artist_features[['total_releases', 'total_tracks',
'label_count', 'genre_encoded']]

# Perform clustering
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
artist_features['cluster'] = kmeans.fit_predict(cluster_features)

# Analyze clusters
cluster_analysis = artist_features.groupby('cluster').agg({
'total_releases': ['mean', 'std'],
'total_tracks': ['mean', 'std'],
'primary_genre': lambda x: x.mode().iloc[0] if not x.empty else 'Mixed'
})

return artist_features, cluster_analysis

# Usage
clusterer = DDEXClustering(enriched_df)
artist_segments, cluster_summary = clusterer.artist_segmentation()

print("Artist segments:")
for cluster in cluster_summary.index:
artists_in_cluster = artist_segments[artist_segments['cluster'] == cluster]
print(f"\nCluster {cluster}: {len(artists_in_cluster)} artists")
print(f" Avg releases: {cluster_summary.loc[cluster, ('total_releases', 'mean')]:.1f}")
print(f" Dominant genre: {cluster_summary.loc[cluster, ('primary_genre', '<lambda>')]}")

Visualization

Creating Charts and Reports

import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go

class DDEXVisualizer:
def __init__(self, df: pd.DataFrame):
self.df = df
plt.style.use('seaborn-v0_8')

def create_release_timeline(self) -> None:
"""Create timeline of releases"""
timeline_data = self.df.groupby('release_year')['release_id'].nunique()

plt.figure(figsize=(12, 6))
plt.plot(timeline_data.index, timeline_data.values, marker='o', linewidth=2)
plt.title('Release Count Over Time')
plt.xlabel('Year')
plt.ylabel('Number of Releases')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

def create_genre_distribution(self) -> None:
"""Create genre distribution chart"""
genre_counts = self.df['genre'].value_counts().head(10)

plt.figure(figsize=(10, 6))
sns.barplot(x=genre_counts.values, y=genre_counts.index)
plt.title('Top 10 Genres by Release Count')
plt.xlabel('Number of Releases')
plt.tight_layout()
plt.show()

def create_interactive_dashboard(self) -> None:
"""Create interactive Plotly dashboard"""
# Prepare data
artist_stats = self.df.groupby('display_artist').agg({
'release_id': 'nunique',
'sound_recording_id': 'nunique',
'genre': lambda x: x.mode().iloc[0] if not x.empty else 'Unknown'
}).reset_index()

# Create scatter plot
fig = px.scatter(
artist_stats,
x='release_id',
y='sound_recording_id',
color='genre',
hover_data=['display_artist'],
title='Artist Productivity: Releases vs Tracks',
labels={'release_id': 'Number of Releases',
'sound_recording_id': 'Number of Tracks'}
)

fig.show()

def create_territory_heatmap(self) -> None:
"""Create territory analysis heatmap"""
territory_genre = pd.crosstab(self.df['territory'], self.df['genre'])

plt.figure(figsize=(12, 8))
sns.heatmap(territory_genre, annot=True, fmt='d', cmap='YlOrRd')
plt.title('Release Distribution: Territory vs Genre')
plt.tight_layout()
plt.show()

# Usage
visualizer = DDEXVisualizer(enriched_df)
visualizer.create_release_timeline()
visualizer.create_genre_distribution()
visualizer.create_interactive_dashboard()

Best Practices

Performance Optimization

# Use efficient data types
def optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Optimize DataFrame memory usage"""
# Convert object columns to category if appropriate
for col in df.select_dtypes(include=['object']):
if df[col].nunique() / len(df) < 0.5: # Less than 50% unique values
df[col] = df[col].astype('category')

# Optimize integer columns
for col in df.select_dtypes(include=['int64']):
if df[col].min() >= 0:
if df[col].max() < 255:
df[col] = df[col].astype('uint8')
elif df[col].max() < 65535:
df[col] = df[col].astype('uint16')

return df

# Use chunking for large datasets
def process_large_catalog(file_paths: list, chunk_size: int = 1000):
"""Process large catalogs in chunks"""
all_chunks = []

for i in range(0, len(file_paths), chunk_size):
chunk_files = file_paths[i:i + chunk_size]
chunk_df = process_file_batch(chunk_files)
all_chunks.append(chunk_df)

return pd.concat(all_chunks, ignore_index=True)

Error Handling

def safe_dataframe_operations(df: pd.DataFrame) -> pd.DataFrame:
"""Perform DataFrame operations with error handling"""
try:
# Validate required columns
required_cols = ['release_id', 'title', 'display_artist']
missing_cols = [col for col in required_cols if col not in df.columns]

if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")

# Check for empty DataFrame
if df.empty:
raise ValueError("DataFrame is empty")

# Perform operations
result_df = df.copy()

# Safe date parsing
if 'release_date' in result_df.columns:
result_df['release_date'] = pd.to_datetime(
result_df['release_date'],
errors='coerce'
)

return result_df

except Exception as e:
print(f"Error processing DataFrame: {e}")
return pd.DataFrame() # Return empty DataFrame on error

Integration Patterns

Combining with Other Tools

# Export to different formats
def export_analysis_results(df: pd.DataFrame, base_path: str):
"""Export analysis results to multiple formats"""

# Excel with multiple sheets
with pd.ExcelWriter(f'{base_path}_analysis.xlsx') as writer:
df.to_excel(writer, sheet_name='Raw Data', index=False)

# Summary statistics
summary = df.describe()
summary.to_excel(writer, sheet_name='Summary Stats')

# Top entities
top_artists = df['display_artist'].value_counts().head(20)
top_artists.to_excel(writer, sheet_name='Top Artists')

# CSV for data science workflows
df.to_csv(f'{base_path}_data.csv', index=False)

# JSON for web applications
df.to_json(f'{base_path}_data.json', orient='records', indent=2)

# Parquet for big data workflows
df.to_parquet(f'{base_path}_data.parquet', index=False)

# Database integration
def save_to_database(df: pd.DataFrame, connection_string: str):
"""Save DataFrame to database"""
from sqlalchemy import create_engine

engine = create_engine(connection_string)
df.to_sql('ddex_releases', engine, if_exists='replace', index=False)

The pandas integration provides powerful capabilities for DDEX data analysis, from basic statistics to advanced machine learning workflows.