Published: September 18, 2020
Author: Fernando McKenzie
Tags: Database Migration, PostgreSQL, AWS RDS, Data Engineering, Zero Downtime
During Q2 2020, while managing remote work challenges, we simultaneously executed a critical database migration: moving our 2TB PostgreSQL database from on-premise hardware to AWS RDS. This article details our migration strategy, challenges, and lessons learned from a zero-downtime transition supporting critical supply chain operations.
Database Analysis Script:
-- Comprehensive database assessment
WITH table_sizes AS (
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,
pg_total_relation_size(schemaname||'.'||tablename) as size_bytes,
n_tup_ins + n_tup_upd + n_tup_del as total_operations
FROM pg_tables
JOIN pg_stat_user_tables ON pg_tables.tablename = pg_stat_user_tables.relname
WHERE schemaname NOT IN ('information_schema', 'pg_catalog')
),
index_usage AS (
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
JOIN pg_indexes ON pg_stat_user_indexes.indexname = pg_indexes.indexname
)
SELECT
ts.schemaname,
ts.tablename,
ts.size,
ts.total_operations,
COUNT(iu.indexname) as index_count,
SUM(iu.idx_scan) as total_index_scans
FROM table_sizes ts
LEFT JOIN index_usage iu ON ts.tablename = iu.tablename
GROUP BY ts.schemaname, ts.tablename, ts.size, ts.total_operations
ORDER BY ts.size_bytes DESC;
Key Findings:
AWS RDS Configuration:
# RDS parameter optimization
rds_configuration:
instance_class: "db.r5.2xlarge"
engine_version: "12.8"
allocated_storage: 3000 # 50% buffer
iops: 9000
parameter_group:
shared_preload_libraries: "pg_stat_statements,auto_explain"
effective_cache_size: "24GB"
shared_buffers: "8GB"
work_mem: "256MB"
maintenance_work_mem: "2GB"
backup_configuration:
backup_retention_period: 7
backup_window: "03:00-04:00"
maintenance_window: "sun:04:00-sun:05:00"
monitoring:
performance_insights: true
enhanced_monitoring: true
monitoring_interval: 60
Read Replica Strategy:
# Connection routing configuration
class DatabaseRouter:
def __init__(self):
self.primary_endpoint = "prod-primary.cluster-xyz.us-west-2.rds.amazonaws.com"
self.read_endpoints = [
"prod-reader-1.cluster-xyz.us-west-2.rds.amazonaws.com",
"prod-reader-2.cluster-xyz.us-west-2.rds.amazonaws.com"
]
self.connection_pool = {}
def get_connection(self, operation_type='read'):
"""Route connections based on operation type"""
if operation_type in ['insert', 'update', 'delete']:
return self.get_primary_connection()
else:
return self.get_read_connection()
def get_read_connection(self):
"""Load balance across read replicas"""
import random
endpoint = random.choice(self.read_endpoints)
if endpoint not in self.connection_pool:
self.connection_pool[endpoint] = psycopg2.pool.ThreadedConnectionPool(
minconn=5,
maxconn=20,
host=endpoint,
database="production",
user=self.db_user,
password=self.db_password
)
return self.connection_pool[endpoint].getconn()
AWS DMS Configuration:
{
"replication_instance": {
"allocated_storage": 500,
"apply_immediately": true,
"auto_minor_version_upgrade": true,
"availability_zone": "us-west-2a",
"engine_version": "3.4.7",
"multi_az": false,
"publicly_accessible": false,
"replication_instance_class": "dms.r5.xlarge",
"replication_instance_identifier": "postgres-migration-instance"
},
"source_endpoint": {
"database_name": "production",
"endpoint_identifier": "postgres-source",
"endpoint_type": "source",
"engine_name": "postgres",
"server_name": "on-premise-db.company.local",
"port": 5432,
"ssl_mode": "require"
},
"target_endpoint": {
"database_name": "production",
"endpoint_identifier": "rds-target",
"endpoint_type": "target",
"engine_name": "postgres",
"server_name": "prod-primary.cluster-xyz.us-west-2.rds.amazonaws.com",
"port": 5432,
"ssl_mode": "require"
}
}
Custom Replication Monitoring:
# DMS monitoring and alerting
import boto3
import json
from datetime import datetime, timedelta
class DMSMonitoring:
def __init__(self):
self.dms_client = boto3.client('dms')
self.cloudwatch = boto3.client('cloudwatch')
def monitor_replication_lag(self, replication_task_arn):
"""Monitor and alert on replication lag"""
response = self.dms_client.describe_replication_tasks(
Filters=[
{
'Name': 'replication-task-arn',
'Values': [replication_task_arn]
}
]
)
task = response['ReplicationTasks'][0]
stats = task.get('ReplicationTaskStats', {})
# Extract key metrics
lag_seconds = stats.get('ApplyLatency', 0)
tables_loaded = stats.get('TablesLoaded', 0)
tables_loading = stats.get('TablesLoading', 0)
tables_errored = stats.get('TablesErrored', 0)
# Send custom metrics to CloudWatch
self.cloudwatch.put_metric_data(
Namespace='DMS/Migration',
MetricData=[
{
'MetricName': 'ReplicationLag',
'Value': lag_seconds,
'Unit': 'Seconds',
'Timestamp': datetime.utcnow()
},
{
'MetricName': 'TablesErrored',
'Value': tables_errored,
'Unit': 'Count'
}
]
)
# Alert if lag > 5 minutes
if lag_seconds > 300:
self.send_alert(f"Replication lag is {lag_seconds} seconds")
return {
'lag_seconds': lag_seconds,
'tables_loaded': tables_loaded,
'tables_loading': tables_loading,
'tables_errored': tables_errored
}
Database Abstraction Layer:
# Database abstraction for seamless migration
from contextlib import contextmanager
import logging
from typing import Dict, Any
class DatabaseManager:
def __init__(self, migration_mode=False):
self.migration_mode = migration_mode
self.primary_db = self._connect_to_primary()
self.secondary_db = self._connect_to_secondary() if migration_mode else None
@contextmanager
def get_transaction(self, operation_type='read'):
"""Context manager for database transactions"""
connection = self._get_connection(operation_type)
transaction = connection.begin()
try:
yield connection
transaction.commit()
except Exception as e:
transaction.rollback()
logging.error(f"Database transaction failed: {e}")
raise
finally:
connection.close()
def _get_connection(self, operation_type):
"""Route connections during migration"""
if not self.migration_mode:
return self.primary_db.connect()
# During migration, route based on operation
if operation_type in ['insert', 'update', 'delete']:
# Writes go to primary (on-premise during migration)
return self.primary_db.connect()
else:
# Reads can use secondary (RDS) if available
if self.secondary_db and self._check_replication_lag() < 30:
return self.secondary_db.connect()
else:
return self.primary_db.connect()
def _check_replication_lag(self):
"""Check replication lag before routing reads"""
try:
# Query both databases for latest timestamp
with self.primary_db.connect() as primary_conn:
primary_max = primary_conn.execute(
"SELECT MAX(updated_at) FROM inventory_transactions"
).scalar()
with self.secondary_db.connect() as secondary_conn:
secondary_max = secondary_conn.execute(
"SELECT MAX(updated_at) FROM inventory_transactions"
).scalar()
if primary_max and secondary_max:
lag = (primary_max - secondary_max).total_seconds()
return max(0, lag)
return float('inf') # Assume high lag if can't determine
except Exception as e:
logging.warning(f"Could not check replication lag: {e}")
return float('inf')
Feature Flag Implementation:
# Feature flags for gradual migration
class MigrationFeatureFlags:
def __init__(self):
self.flags = {
'use_rds_for_reports': False,
'use_rds_for_inventory_reads': False,
'use_rds_for_order_reads': False,
'enable_dual_writes': False,
'cutover_ready': False
}
def enable_flag(self, flag_name: str, percentage: int = 100):
"""Gradually enable features for percentage of users"""
import random
if flag_name not in self.flags:
raise ValueError(f"Unknown flag: {flag_name}")
# Use consistent hashing for user-based rollout
user_hash = hash(self.get_current_user_id()) % 100
if user_hash < percentage:
self.flags[flag_name] = True
return self.flags[flag_name]
def is_enabled(self, flag_name: str) -> bool:
return self.flags.get(flag_name, False)
# Usage in application code
def get_inventory_data(product_id):
feature_flags = MigrationFeatureFlags()
if feature_flags.is_enabled('use_rds_for_inventory_reads'):
return get_inventory_from_rds(product_id)
else:
return get_inventory_from_onprem(product_id)
Data Validation Framework:
# Comprehensive data validation
import pandas as pd
import hashlib
from concurrent.futures import ThreadPoolExecutor
class DataValidator:
def __init__(self, source_conn, target_conn):
self.source_conn = source_conn
self.target_conn = target_conn
self.validation_results = {}
def validate_row_counts(self, tables: list):
"""Validate row counts match between source and target"""
def count_rows(table_name, connection):
query = f"SELECT COUNT(*) FROM {table_name}"
return connection.execute(query).scalar()
with ThreadPoolExecutor(max_workers=10) as executor:
for table in tables:
source_future = executor.submit(count_rows, table, self.source_conn)
target_future = executor.submit(count_rows, table, self.target_conn)
source_count = source_future.result()
target_count = target_future.result()
self.validation_results[table] = {
'source_count': source_count,
'target_count': target_count,
'match': source_count == target_count
}
return self.validation_results
def validate_data_integrity(self, table_name, key_column):
"""Validate data integrity using checksums"""
# Get sample of data for comparison
sample_query = f"""
SELECT {key_column},
MD5(CONCAT_WS('|', *)) as row_hash
FROM {table_name}
ORDER BY {key_column}
LIMIT 10000
"""
source_df = pd.read_sql(sample_query, self.source_conn)
target_df = pd.read_sql(sample_query, self.target_conn)
# Compare checksums
merged = source_df.merge(
target_df,
on=key_column,
suffixes=('_source', '_target')
)
mismatches = merged[
merged['row_hash_source'] != merged['row_hash_target']
]
return {
'total_compared': len(merged),
'mismatches': len(mismatches),
'integrity_percentage': (len(merged) - len(mismatches)) / len(merged) * 100
}
def validate_performance(self, query_set):
"""Compare query performance between databases"""
performance_results = {}
for query_name, query in query_set.items():
# Time source query
source_start = time.time()
source_result = self.source_conn.execute(query)
source_time = time.time() - source_start
# Time target query
target_start = time.time()
target_result = self.target_conn.execute(query)
target_time = time.time() - target_start
performance_results[query_name] = {
'source_time': source_time,
'target_time': target_time,
'improvement_factor': source_time / target_time if target_time > 0 else 0
}
return performance_results
Cutover Automation Script:
#!/bin/bash
# Zero-downtime cutover automation
set -e # Exit on any error
LOGFILE="/var/log/migration-cutover.log"
ROLLBACK_POINT=""
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a $LOGFILE
}
# Pre-cutover checks
pre_cutover_checks() {
log "Starting pre-cutover checks..."
# Check replication lag
LAG=$(python3 check_replication_lag.py)
if [ $LAG -gt 30 ]; then
log "ERROR: Replication lag is ${LAG} seconds. Aborting cutover."
exit 1
fi
# Validate data integrity
python3 validate_data_integrity.py
if [ $? -ne 0 ]; then
log "ERROR: Data validation failed. Aborting cutover."
exit 1
fi
# Check application health
curl -f http://healthcheck.internal/api/health
if [ $? -ne 0 ]; then
log "ERROR: Application health check failed. Aborting cutover."
exit 1
fi
log "Pre-cutover checks passed."
}
# Create rollback point
create_rollback_point() {
log "Creating rollback point..."
# Stop application writes temporarily
kubectl scale deployment api-server --replicas=0
# Take final snapshot
aws rds create-db-cluster-snapshot \
--db-cluster-identifier prod-cluster \
--db-cluster-snapshot-identifier "pre-cutover-$(date +%Y%m%d%H%M%S)"
ROLLBACK_POINT="pre-cutover-$(date +%Y%m%d%H%M%S)"
log "Rollback point created: $ROLLBACK_POINT"
}
# Execute cutover
execute_cutover() {
log "Executing cutover..."
# Update application configuration
kubectl create configmap database-config \
--from-literal=primary_host="prod-primary.cluster-xyz.us-west-2.rds.amazonaws.com" \
--from-literal=read_hosts="prod-reader-1.cluster-xyz.us-west-2.rds.amazonaws.com,prod-reader-2.cluster-xyz.us-west-2.rds.amazonaws.com" \
--dry-run=client -o yaml | kubectl apply -f -
# Rolling restart of application pods
kubectl rollout restart deployment api-server
kubectl rollout restart deployment worker-service
kubectl rollout restart deployment reporting-service
# Wait for rollout to complete
kubectl rollout status deployment api-server --timeout=300s
kubectl rollout status deployment worker-service --timeout=300s
kubectl rollout status deployment reporting-service --timeout=300s
log "Application restarted with new database configuration."
}
# Post-cutover validation
post_cutover_validation() {
log "Running post-cutover validation..."
# Health checks
for i in {1..30}; do
if curl -f http://healthcheck.internal/api/health; then
log "Application health check passed."
break
fi
log "Health check attempt $i failed, retrying in 10 seconds..."
sleep 10
done
# Test critical functionality
python3 test_critical_functions.py
if [ $? -ne 0 ]; then
log "ERROR: Critical function tests failed. Consider rollback."
return 1
fi
# Monitor for 30 minutes
log "Monitoring system for 30 minutes..."
python3 monitor_post_cutover.py --duration=30
log "Post-cutover validation completed successfully."
}
# Rollback function
rollback() {
log "INITIATING ROLLBACK..."
# Restore original configuration
kubectl create configmap database-config \
--from-literal=primary_host="on-premise-db.company.local" \
--from-literal=read_hosts="on-premise-db.company.local" \
--dry-run=client -o yaml | kubectl apply -f -
# Rolling restart back to original configuration
kubectl rollout restart deployment api-server
kubectl rollout restart deployment worker-service
kubectl rollout restart deployment reporting-service
log "Rollback completed."
}
# Main execution
main() {
log "Starting database migration cutover..."
# Set trap for cleanup on exit
trap rollback ERR
pre_cutover_checks
create_rollback_point
execute_cutover
if post_cutover_validation; then
log "Migration cutover completed successfully!"
# Remove rollback trap since we succeeded
trap - ERR
else
log "Post-cutover validation failed. Initiating rollback."
rollback
exit 1
fi
}
# Execute main function
main "$@"
Timeline:
Data Validation Results:
Table Validation Summary:
├── Row count matches: 147/150 tables (98%)
├── Data integrity: 99.97% match rate
├── Schema consistency: 100% match
└── Performance tests: All passed
Discrepancies Found:
├── 3 tables with timestamp precision differences
├── 2 auto-increment sequences out of sync
└── 1 table with encoding differences (resolved)
Query Performance Comparison:
-- Before (On-premise): Average query times
Inventory lookups: 450ms
Order history: 1.2s
Reporting queries: 15-30s
Complex aggregations: 45s
-- After (RDS): Average query times
Inventory lookups: 120ms (73% improvement)
Order history: 280ms (77% improvement)
Reporting queries: 3-8s (80% improvement)
Complex aggregations: 12s (73% improvement)
Cost Analysis:
Monthly Infrastructure Costs:
On-Premise (Previous):
├── Hardware depreciation: $3,500
├── Maintenance contracts: $1,200
├── Power and cooling: $800
├── IT staff allocation: $2,500
└── Total: $8,000
AWS RDS (New):
├── Primary instance: $1,100
├── Read replicas (2): $1,800
├── Storage and IOPS: $600
├── Backup storage: $200
├── Data transfer: $100
└── Total: $3,800
Monthly Savings: $4,200 (52% reduction)
Annual Savings: $50,400
Problem: 200+ stored procedures with PostgreSQL-specific syntax
Solution:
-- Automated procedure conversion
CREATE OR REPLACE FUNCTION migrate_procedure_syntax()
RETURNS void AS $$
DECLARE
proc_record RECORD;
new_definition TEXT;
BEGIN
FOR proc_record IN
SELECT proname, prosrc
FROM pg_proc
WHERE pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')
LOOP
-- Convert common syntax differences
new_definition := proc_record.prosrc;
-- Replace proprietary functions
new_definition := REPLACE(new_definition, 'LIMIT 1 INTO', 'INTO LIMIT 1');
new_definition := REPLACE(new_definition, 'ROWNUM', 'ROW_NUMBER()');
-- Update procedure definition
EXECUTE format('CREATE OR REPLACE FUNCTION %I() RETURNS void AS $func$ %s $func$ LANGUAGE plpgsql',
proc_record.proname, new_definition);
END LOOP;
END;
$$ LANGUAGE plpgsql;
Problem: Connection pool configuration needed optimization for cloud environment
Solution:
# Optimized connection pooling for RDS
from sqlalchemy.pool import QueuePool
import boto3
class CloudDatabasePool:
def __init__(self):
self.rds_client = boto3.client('rds')
def create_optimized_pool(self, connection_string):
"""Create connection pool optimized for RDS"""
# Get RDS instance information
instance_info = self.get_rds_instance_info()
max_connections = instance_info['max_connections']
# Configure pool based on instance size
pool_size = min(20, max_connections // 4) # Conservative sizing
return create_engine(
connection_string,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=pool_size // 2,
pool_timeout=30,
pool_recycle=3600, # Recycle connections hourly
pool_pre_ping=True, # Validate connections
# RDS-specific optimizations
connect_args={
'application_name': 'supply_chain_app',
'connect_timeout': 10,
'command_timeout': 30,
'options': '-c statement_timeout=30000'
}
)
Problem: Existing monitoring needed to adapt to cloud environment
Solution:
# Comprehensive RDS monitoring
class RDSMonitoring:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.rds = boto3.client('rds')
def setup_alerts(self, cluster_identifier):
"""Setup comprehensive RDS monitoring"""
alerts = [
{
'name': 'HighCPUUtilization',
'metric': 'CPUUtilization',
'threshold': 80,
'comparison': 'GreaterThanThreshold',
'description': 'Database CPU usage is high'
},
{
'name': 'HighDatabaseConnections',
'metric': 'DatabaseConnections',
'threshold': 40,
'comparison': 'GreaterThanThreshold',
'description': 'High number of database connections'
},
{
'name': 'ReadLatencyHigh',
'metric': 'ReadLatency',
'threshold': 0.2,
'comparison': 'GreaterThanThreshold',
'description': 'Database read latency is high'
}
]
for alert in alerts:
self.cloudwatch.put_metric_alarm(
AlarmName=f"{cluster_identifier}-{alert['name']}",
ComparisonOperator=alert['comparison'],
EvaluationPeriods=2,
MetricName=alert['metric'],
Namespace='AWS/RDS',
Period=300,
Statistic='Average',
Threshold=alert['threshold'],
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-west-2:123456789012:database-alerts'
],
AlarmDescription=alert['description'],
Dimensions=[
{
'Name': 'DBClusterIdentifier',
'Value': cluster_identifier
}
]
)
Key Learning: Spent 60% of project time on testing and validation
Implementation:
Key Learning: Gradual migration reduces risk significantly
Benefits Realized:
Key Learning: Cloud monitoring requires different approaches
New Capabilities:
1. Multi-Region Setup
# Multi-region RDS configuration
regions:
primary: "us-west-2"
secondary: "us-east-1"
cross_region_backup:
automated: true
retention: 30_days
disaster_recovery:
rpo: "15_minutes"
rto: "30_minutes"
2. Advanced Analytics Integration
# Real-time analytics pipeline
def setup_analytics_pipeline():
# DMS to Kinesis for real-time streaming
kinesis_config = {
'stream_name': 'database_changes',
'shard_count': 4,
'retention_period': 24
}
# Lambda for transformation
lambda_config = {
'function_name': 'transform_db_changes',
'runtime': 'python3.8',
'memory': 512,
'timeout': 60
}
# S3 + Athena for analytics
analytics_config = {
'bucket': 'supply-chain-analytics',
'partitioning': 'year/month/day',
'format': 'parquet'
}
The PostgreSQL to RDS migration was a critical success that delivered immediate business value while positioning us for future growth. The zero-downtime approach proved that even the most critical systems can be modernized without business disruption.
Key Success Factors:
Business Impact:
The migration experience reinforced that database modernization isn’t just about technology—it’s about building organizational confidence in cloud transformation.
Planning a database migration? Let’s connect on LinkedIn to discuss strategies and lessons learned.