Data Migration with UUID-Forge¶
Learn how to use deterministic UUIDs for seamless data migration across systems, databases, and platforms.
Overview¶
Data migration often involves moving data between different systems while maintaining referential integrity and consistency. UUID-Forge's deterministic generation ensures that the same entities receive the same UUIDs across different environments and migration phases.
Migration Challenges¶
Traditional Migration Problems¶
- ID Mapping: Translating IDs between different systems
- Referential Integrity: Maintaining relationships during migration
- Incremental Migration: Handling partial migrations over time
- Rollback Scenarios: Reverting migrations safely
- Cross-System Consistency: Ensuring same entity has same ID everywhere
UUID-Forge Solutions¶
- Deterministic IDs: Same input always generates same UUID
- No ID Mapping Required: UUIDs are consistent across systems
- Referential Integrity Maintained: Related entities get related UUIDs
- Idempotent Migration: Running migration multiple times is safe
- Cross-Platform Consistency: Same UUIDs on any system
Migration Patterns¶
Database-to-Database Migration¶
Legacy System to Modern Database¶
from uuid_forge import UUIDGenerator, IDConfig, Namespace
from uuid import UUID
import psycopg2
import sqlite3
class DatabaseMigrator:
def __init__(self):
# Generators for different entity types
config = IDConfig(namespace=Namespace("users"), salt="v1")
self.user_gen = UUIDGenerator(config)
config = IDConfig(namespace=Namespace("orders"), salt="v1")
self.order_gen = UUIDGenerator(config)
config = IDConfig(namespace=Namespace("products"), salt="v1")
self.product_gen = UUIDGenerator(config)
# Database connections
self.legacy_db = sqlite3.connect("legacy.db")
self.modern_db = psycopg2.connect("postgresql://...")
def migrate_users(self):
"""Migrate users from legacy SQLite to PostgreSQL"""
legacy_cursor = self.legacy_db.cursor()
modern_cursor = self.modern_db.cursor()
# Read from legacy database
legacy_cursor.execute("SELECT email, name, created_at FROM users")
legacy_users = legacy_cursor.fetchall()
for email, name, created_at in legacy_users:
# Generate deterministic UUID for user
user_uuid = self.user_gen.generate("user", email=email)
# Insert into modern database
modern_cursor.execute(
"INSERT INTO users (id, email, name, created_at) VALUES (%s, %s, %s, %s)",
(user_uuid, email, name, created_at)
)
self.modern_db.commit()
print(f"Migrated {len(legacy_users)} users")
def migrate_orders(self):
"""Migrate orders maintaining user relationships"""
legacy_cursor = self.legacy_db.cursor()
modern_cursor = self.modern_db.cursor()
# Read orders with user email for UUID generation
legacy_cursor.execute("""
SELECT o.id, u.email, o.total, o.created_at
FROM orders o
JOIN users u ON o.user_id = u.id
""")
legacy_orders = legacy_cursor.fetchall()
for legacy_order_id, user_email, total, created_at in legacy_orders:
# Generate consistent user UUID
user_uuid = self.user_gen.generate("user", email=user_email)
# Generate order UUID from identifying attributes
order_uuid = self.order_gen.generate(
"order",
user_email=user_email,
legacy_id=str(legacy_order_id),
total=str(total),
created_at=str(created_at)
)
# Insert into modern database
modern_cursor.execute(
"INSERT INTO orders (id, user_id, total, created_at) VALUES (%s, %s, %s, %s)",
(order_uuid, user_uuid, total, created_at)
)
self.modern_db.commit()
print(f"Migrated {len(legacy_orders)} orders")
NoSQL to SQL Migration¶
from pymongo import MongoClient
import psycopg2
from uuid_forge import UUIDGenerator
class NoSQLToSQLMigrator:
def __init__(self):
self.mongo_client = MongoClient("mongodb://localhost:27017/")
self.mongo_db = self.mongo_client.legacy_app
self.postgres_conn = psycopg2.connect("postgresql://...")
# UUID generators
config = IDConfig(namespace=Namespace("users"), salt="v1")
self.user_gen = UUIDGenerator(config)
config = IDConfig(namespace=Namespace("posts"), salt="v1")
self.post_gen = UUIDGenerator(config)
config = IDConfig(namespace=Namespace("comments"), salt="v1")
self.comment_gen = UUIDGenerator(config)
def migrate_user_posts(self):
"""Migrate nested document structure to relational tables"""
cursor = self.postgres_conn.cursor()
# Read MongoDB documents
for user_doc in self.mongo_db.users.find():
user_email = user_doc["email"]
user_uuid = self.user_gen.generate("user", email=user_email)
# Migrate user
cursor.execute(
"INSERT INTO users (id, email, name) VALUES (%s, %s, %s)",
(user_uuid, user_doc["email"], user_doc["name"])
)
# Migrate embedded posts
for post in user_doc.get("posts", []):
post_data = {
"user_email": user_email,
"title": post["title"],
"content": post["content"],
"created_at": post["created_at"].isoformat()
}
post_uuid = self.post_gen.generate("user", email=post_data)
cursor.execute(
"INSERT INTO posts (id, user_id, title, content, created_at) VALUES (%s, %s, %s, %s, %s)",
(post_uuid, user_uuid, post["title"], post["content"], post["created_at"])
)
# Migrate embedded comments
for comment in post.get("comments", []):
comment_data = {
"post_id": post_uuid,
"author": comment["author"],
"content": comment["content"],
"created_at": comment["created_at"].isoformat()
}
comment_uuid = self.comment_gen.generate("user", email=comment_data)
cursor.execute(
"INSERT INTO comments (id, post_id, author, content, created_at) VALUES (%s, %s, %s, %s, %s)",
(comment_uuid, post_uuid, comment["author"], comment["content"], comment["created_at"])
)
self.postgres_conn.commit()
Cloud Migration¶
On-Premises to Cloud Migration¶
import boto3
from uuid_forge import UUIDGenerator
class CloudMigrator:
def __init__(self):
# Local database connection
self.local_db = psycopg2.connect("postgresql://localhost/app")
# AWS services
self.dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
self.s3 = boto3.client('s3')
# UUID generators
config = IDConfig(namespace=Namespace("cloud-users"), salt="v1")
self.user_gen = UUIDGenerator(config)
config = IDConfig(namespace=Namespace("cloud-files"), salt="v1")
self.file_gen = UUIDGenerator(config)
def migrate_to_dynamodb(self):
"""Migrate relational data to DynamoDB"""
cursor = self.local_db.cursor()
table = self.dynamodb.Table('Users')
cursor.execute("SELECT email, name, profile_data FROM users")
for email, name, profile_data in cursor.fetchall():
user_uuid = self.user_gen.generate("user", email=email)
# Store in DynamoDB with UUID as partition key
table.put_item(
Item={
'user_id': user_uuid,
'email': email,
'name': name,
'profile_data': profile_data,
'migrated_at': datetime.utcnow().isoformat()
}
)
def migrate_files_to_s3(self):
"""Migrate files to S3 with deterministic keys"""
cursor = self.local_db.cursor()
cursor.execute("SELECT file_path, metadata, content FROM files")
for file_path, metadata, content in cursor.fetchall():
# Generate deterministic S3 key
file_data = {
"original_path": file_path,
"size": len(content),
"metadata": metadata
}
file_uuid = self.file_gen.generate("user", email=file_data)
s3_key = f"migrated-files/{file_uuid}"
# Upload to S3
self.s3.put_object(
Bucket='migration-bucket',
Key=s3_key,
Body=content,
Metadata={
'original-path': file_path,
'file-uuid': file_uuid,
**metadata
}
)
Incremental Migration¶
Phased Migration Strategy¶
class IncrementalMigrator:
def __init__(self):
self.source_db = psycopg2.connect("postgresql://source/")
self.target_db = psycopg2.connect("postgresql://target/")
config = IDConfig(namespace=Namespace("incremental-users"), salt="v1")
self.user_gen = UUIDGenerator(config)
# Track migration progress
self.migration_state = {
"last_migrated_id": 0,
"batch_size": 1000,
"total_migrated": 0
}
def migrate_batch(self):
"""Migrate a batch of records"""
source_cursor = self.source_db.cursor()
target_cursor = self.target_db.cursor()
# Get next batch
source_cursor.execute(
"SELECT id, email, name FROM users WHERE id > %s ORDER BY id LIMIT %s",
(self.migration_state["last_migrated_id"], self.migration_state["batch_size"])
)
batch = source_cursor.fetchall()
if not batch:
print("Migration complete!")
return False
# Migrate batch with deterministic UUIDs
for source_id, email, name in batch:
user_uuid = self.user_gen.generate("user", email=email)
# Use ON CONFLICT for idempotent migration
target_cursor.execute(
"""
INSERT INTO users (id, email, name, source_id, migrated_at)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (email) DO UPDATE SET
name = EXCLUDED.name,
migrated_at = EXCLUDED.migrated_at
""",
(user_uuid, email, name, source_id, datetime.utcnow())
)
self.migration_state["last_migrated_id"] = source_id
self.target_db.commit()
self.migration_state["total_migrated"] += len(batch)
print(f"Migrated batch: {len(batch)} records, Total: {self.migration_state['total_migrated']}")
return True
def run_incremental_migration(self):
"""Run migration in batches"""
while self.migrate_batch():
time.sleep(1) # Brief pause between batches
Data Synchronization¶
Bidirectional Sync¶
class DataSynchronizer:
def __init__(self):
self.system_a = psycopg2.connect("postgresql://system-a/")
self.system_b = psycopg2.connect("postgresql://system-b/")
config = IDConfig(namespace=Namespace("sync-users"), salt="v1")
self.user_gen = UUIDGenerator(config)
self.sync_log = []
def sync_user_changes(self):
"""Synchronize user changes between systems"""
cursor_a = self.system_a.cursor()
cursor_b = self.system_b.cursor()
# Get changes from system A
cursor_a.execute(
"SELECT email, name, updated_at FROM users WHERE updated_at > %s",
(self.last_sync_time,)
)
changes_a = cursor_a.fetchall()
for email, name, updated_at in changes_a:
user_uuid = self.user_gen.generate("user", email=email)
# Apply change to system B
cursor_b.execute(
"""
INSERT INTO users (id, email, name, updated_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
updated_at = EXCLUDED.updated_at
WHERE users.updated_at < EXCLUDED.updated_at
""",
(user_uuid, email, name, updated_at)
)
self.sync_log.append({
"user_id": user_uuid,
"direction": "A->B",
"timestamp": datetime.utcnow()
})
self.system_b.commit()
Migration Validation¶
Data Integrity Verification¶
class MigrationValidator:
def __init__(self):
self.source_db = psycopg2.connect("postgresql://source/")
self.target_db = psycopg2.connect("postgresql://target/")
config = IDConfig(namespace=Namespace("validation-users"), salt="v1")
self.user_gen = UUIDGenerator(config)
def validate_user_migration(self):
"""Validate that all users migrated correctly"""
source_cursor = self.source_db.cursor()
target_cursor = self.target_db.cursor()
source_cursor.execute("SELECT email, name FROM users ORDER BY email")
source_users = source_cursor.fetchall()
target_cursor.execute("SELECT email, name FROM users ORDER BY email")
target_users = target_cursor.fetchall()
validation_results = {
"total_source": len(source_users),
"total_target": len(target_users),
"missing_users": [],
"data_mismatches": []
}
source_dict = {email: name for email, name in source_users}
target_dict = {email: name for email, name in target_users}
# Check for missing users
for email in source_dict:
if email not in target_dict:
validation_results["missing_users"].append(email)
elif source_dict[email] != target_dict[email]:
validation_results["data_mismatches"].append({
"email": email,
"source_name": source_dict[email],
"target_name": target_dict[email]
})
return validation_results
def validate_referential_integrity(self):
"""Validate that relationships are maintained"""
target_cursor = self.target_db.cursor()
# Check that all orders have valid user references
target_cursor.execute("""
SELECT COUNT(*) FROM orders o
LEFT JOIN users u ON o.user_id = u.id
WHERE u.id IS NULL
""")
orphaned_orders = target_cursor.fetchone()[0]
return {
"orphaned_orders": orphaned_orders,
"integrity_valid": orphaned_orders == 0
}
Rollback Strategies¶
Safe Migration Rollback¶
class MigrationRollback:
def __init__(self):
self.target_db = psycopg2.connect("postgresql://target/")
self.backup_db = psycopg2.connect("postgresql://backup/")
def create_rollback_point(self):
"""Create a rollback point before migration"""
target_cursor = self.target_db.cursor()
backup_cursor = self.backup_db.cursor()
# Backup current state
target_cursor.execute("SELECT * FROM users")
users = target_cursor.fetchall()
# Clear backup and restore
backup_cursor.execute("DELETE FROM users")
for user in users:
backup_cursor.execute(
"INSERT INTO users VALUES (%s, %s, %s, %s)",
user
)
self.backup_db.commit()
print("Rollback point created")
def rollback_migration(self):
"""Rollback to previous state"""
target_cursor = self.target_db.cursor()
backup_cursor = self.backup_db.cursor()
# Clear target
target_cursor.execute("DELETE FROM users")
# Restore from backup
backup_cursor.execute("SELECT * FROM users")
backup_users = backup_cursor.fetchall()
for user in backup_users:
target_cursor.execute(
"INSERT INTO users VALUES (%s, %s, %s, %s)",
user
)
self.target_db.commit()
print("Migration rolled back successfully")
Migration Monitoring¶
Progress Tracking¶
class MigrationMonitor:
def __init__(self):
self.metrics = {
"start_time": None,
"records_processed": 0,
"records_failed": 0,
"current_phase": None,
"estimated_completion": None
}
def start_monitoring(self, total_records):
"""Start migration monitoring"""
self.metrics["start_time"] = datetime.utcnow()
self.metrics["total_records"] = total_records
print(f"Migration started: {total_records} records to process")
def update_progress(self, records_processed, current_phase="processing"):
"""Update migration progress"""
self.metrics["records_processed"] = records_processed
self.metrics["current_phase"] = current_phase
elapsed = datetime.utcnow() - self.metrics["start_time"]
progress_pct = (records_processed / self.metrics["total_records"]) * 100
if records_processed > 0:
avg_time_per_record = elapsed / records_processed
remaining_records = self.metrics["total_records"] - records_processed
eta = datetime.utcnow() + (avg_time_per_record * remaining_records)
self.metrics["estimated_completion"] = eta
print(f"Progress: {progress_pct:.1f}% ({records_processed}/{self.metrics['total_records']}) - ETA: {eta}")
def log_error(self, record_id, error):
"""Log migration error"""
self.metrics["records_failed"] += 1
print(f"Error processing record {record_id}: {error}")
def complete_monitoring(self):
"""Complete migration monitoring"""
total_time = datetime.utcnow() - self.metrics["start_time"]
success_rate = ((self.metrics["records_processed"] - self.metrics["records_failed"]) /
self.metrics["total_records"]) * 100
print(f"Migration completed in {total_time}")
print(f"Success rate: {success_rate:.1f}%")
print(f"Records processed: {self.metrics['records_processed']}")
print(f"Records failed: {self.metrics['records_failed']}")
Next Steps¶
- Testing Use Case - Testing migration strategies
- Best Practices - Migration optimization
- Development Guide - Development environment setup