My_Website

Database Migration Strategies: PostgreSQL to Cloud - A Production Case Study

Published: September 18, 2020
Author: Fernando McKenzie
Tags: Database Migration, PostgreSQL, AWS RDS, Data Engineering, Zero Downtime

Introduction

During Q2 2020, while managing remote work challenges, we simultaneously executed a critical database migration: moving our 2TB PostgreSQL database from on-premise hardware to AWS RDS. This article details our migration strategy, challenges, and lessons learned from a zero-downtime transition supporting critical supply chain operations.

The Migration Challenge

Legacy Database Environment

Business Requirements

Migration Strategy Overview

Phase 1: Assessment and Planning (4 weeks)

Database Analysis Script:

-- Comprehensive database assessment
WITH table_sizes AS (
    SELECT 
        schemaname,
        tablename,
        pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,
        pg_total_relation_size(schemaname||'.'||tablename) as size_bytes,
        n_tup_ins + n_tup_upd + n_tup_del as total_operations
    FROM pg_tables 
    JOIN pg_stat_user_tables ON pg_tables.tablename = pg_stat_user_tables.relname
    WHERE schemaname NOT IN ('information_schema', 'pg_catalog')
),
index_usage AS (
    SELECT 
        schemaname,
        tablename,
        indexname,
        idx_scan,
        idx_tup_read,
        idx_tup_fetch
    FROM pg_stat_user_indexes
    JOIN pg_indexes ON pg_stat_user_indexes.indexname = pg_indexes.indexname
)
SELECT 
    ts.schemaname,
    ts.tablename,
    ts.size,
    ts.total_operations,
    COUNT(iu.indexname) as index_count,
    SUM(iu.idx_scan) as total_index_scans
FROM table_sizes ts
LEFT JOIN index_usage iu ON ts.tablename = iu.tablename
GROUP BY ts.schemaname, ts.tablename, ts.size, ts.total_operations
ORDER BY ts.size_bytes DESC;

Key Findings:

Phase 2: Target Architecture Design

AWS RDS Configuration:

# RDS parameter optimization
rds_configuration:
  instance_class: "db.r5.2xlarge"
  engine_version: "12.8"
  allocated_storage: 3000  # 50% buffer
  iops: 9000
  
  parameter_group:
    shared_preload_libraries: "pg_stat_statements,auto_explain"
    effective_cache_size: "24GB"
    shared_buffers: "8GB"
    work_mem: "256MB"
    maintenance_work_mem: "2GB"
    
  backup_configuration:
    backup_retention_period: 7
    backup_window: "03:00-04:00"
    maintenance_window: "sun:04:00-sun:05:00"
    
  monitoring:
    performance_insights: true
    enhanced_monitoring: true
    monitoring_interval: 60

Read Replica Strategy:

# Connection routing configuration
class DatabaseRouter:
    def __init__(self):
        self.primary_endpoint = "prod-primary.cluster-xyz.us-west-2.rds.amazonaws.com"
        self.read_endpoints = [
            "prod-reader-1.cluster-xyz.us-west-2.rds.amazonaws.com",
            "prod-reader-2.cluster-xyz.us-west-2.rds.amazonaws.com"
        ]
        self.connection_pool = {}
        
    def get_connection(self, operation_type='read'):
        """Route connections based on operation type"""
        
        if operation_type in ['insert', 'update', 'delete']:
            return self.get_primary_connection()
        else:
            return self.get_read_connection()
    
    def get_read_connection(self):
        """Load balance across read replicas"""
        import random
        endpoint = random.choice(self.read_endpoints)
        
        if endpoint not in self.connection_pool:
            self.connection_pool[endpoint] = psycopg2.pool.ThreadedConnectionPool(
                minconn=5,
                maxconn=20,
                host=endpoint,
                database="production",
                user=self.db_user,
                password=self.db_password
            )
        
        return self.connection_pool[endpoint].getconn()

Migration Execution

Phase 3: Data Replication Setup (Week 1)

AWS DMS Configuration:

{
  "replication_instance": {
    "allocated_storage": 500,
    "apply_immediately": true,
    "auto_minor_version_upgrade": true,
    "availability_zone": "us-west-2a",
    "engine_version": "3.4.7",
    "multi_az": false,
    "publicly_accessible": false,
    "replication_instance_class": "dms.r5.xlarge",
    "replication_instance_identifier": "postgres-migration-instance"
  },
  
  "source_endpoint": {
    "database_name": "production",
    "endpoint_identifier": "postgres-source",
    "endpoint_type": "source",
    "engine_name": "postgres",
    "server_name": "on-premise-db.company.local",
    "port": 5432,
    "ssl_mode": "require"
  },
  
  "target_endpoint": {
    "database_name": "production", 
    "endpoint_identifier": "rds-target",
    "endpoint_type": "target",
    "engine_name": "postgres",
    "server_name": "prod-primary.cluster-xyz.us-west-2.rds.amazonaws.com",
    "port": 5432,
    "ssl_mode": "require"
  }
}

Custom Replication Monitoring:

# DMS monitoring and alerting
import boto3
import json
from datetime import datetime, timedelta

class DMSMonitoring:
    def __init__(self):
        self.dms_client = boto3.client('dms')
        self.cloudwatch = boto3.client('cloudwatch')
        
    def monitor_replication_lag(self, replication_task_arn):
        """Monitor and alert on replication lag"""
        
        response = self.dms_client.describe_replication_tasks(
            Filters=[
                {
                    'Name': 'replication-task-arn',
                    'Values': [replication_task_arn]
                }
            ]
        )
        
        task = response['ReplicationTasks'][0]
        stats = task.get('ReplicationTaskStats', {})
        
        # Extract key metrics
        lag_seconds = stats.get('ApplyLatency', 0)
        tables_loaded = stats.get('TablesLoaded', 0)
        tables_loading = stats.get('TablesLoading', 0)
        tables_errored = stats.get('TablesErrored', 0)
        
        # Send custom metrics to CloudWatch
        self.cloudwatch.put_metric_data(
            Namespace='DMS/Migration',
            MetricData=[
                {
                    'MetricName': 'ReplicationLag',
                    'Value': lag_seconds,
                    'Unit': 'Seconds',
                    'Timestamp': datetime.utcnow()
                },
                {
                    'MetricName': 'TablesErrored',
                    'Value': tables_errored,
                    'Unit': 'Count'
                }
            ]
        )
        
        # Alert if lag > 5 minutes
        if lag_seconds > 300:
            self.send_alert(f"Replication lag is {lag_seconds} seconds")
        
        return {
            'lag_seconds': lag_seconds,
            'tables_loaded': tables_loaded,
            'tables_loading': tables_loading,
            'tables_errored': tables_errored
        }

Phase 4: Application Preparation (Week 2)

Database Abstraction Layer:

# Database abstraction for seamless migration
from contextlib import contextmanager
import logging
from typing import Dict, Any

class DatabaseManager:
    def __init__(self, migration_mode=False):
        self.migration_mode = migration_mode
        self.primary_db = self._connect_to_primary()
        self.secondary_db = self._connect_to_secondary() if migration_mode else None
        
    @contextmanager
    def get_transaction(self, operation_type='read'):
        """Context manager for database transactions"""
        
        connection = self._get_connection(operation_type)
        transaction = connection.begin()
        
        try:
            yield connection
            transaction.commit()
        except Exception as e:
            transaction.rollback()
            logging.error(f"Database transaction failed: {e}")
            raise
        finally:
            connection.close()
    
    def _get_connection(self, operation_type):
        """Route connections during migration"""
        
        if not self.migration_mode:
            return self.primary_db.connect()
        
        # During migration, route based on operation
        if operation_type in ['insert', 'update', 'delete']:
            # Writes go to primary (on-premise during migration)
            return self.primary_db.connect()
        else:
            # Reads can use secondary (RDS) if available
            if self.secondary_db and self._check_replication_lag() < 30:
                return self.secondary_db.connect()
            else:
                return self.primary_db.connect()
    
    def _check_replication_lag(self):
        """Check replication lag before routing reads"""
        try:
            # Query both databases for latest timestamp
            with self.primary_db.connect() as primary_conn:
                primary_max = primary_conn.execute(
                    "SELECT MAX(updated_at) FROM inventory_transactions"
                ).scalar()
            
            with self.secondary_db.connect() as secondary_conn:
                secondary_max = secondary_conn.execute(
                    "SELECT MAX(updated_at) FROM inventory_transactions"
                ).scalar()
            
            if primary_max and secondary_max:
                lag = (primary_max - secondary_max).total_seconds()
                return max(0, lag)
            
            return float('inf')  # Assume high lag if can't determine
            
        except Exception as e:
            logging.warning(f"Could not check replication lag: {e}")
            return float('inf')

Feature Flag Implementation:

# Feature flags for gradual migration
class MigrationFeatureFlags:
    def __init__(self):
        self.flags = {
            'use_rds_for_reports': False,
            'use_rds_for_inventory_reads': False,
            'use_rds_for_order_reads': False,
            'enable_dual_writes': False,
            'cutover_ready': False
        }
    
    def enable_flag(self, flag_name: str, percentage: int = 100):
        """Gradually enable features for percentage of users"""
        import random
        
        if flag_name not in self.flags:
            raise ValueError(f"Unknown flag: {flag_name}")
        
        # Use consistent hashing for user-based rollout
        user_hash = hash(self.get_current_user_id()) % 100
        
        if user_hash < percentage:
            self.flags[flag_name] = True
        
        return self.flags[flag_name]
    
    def is_enabled(self, flag_name: str) -> bool:
        return self.flags.get(flag_name, False)

# Usage in application code
def get_inventory_data(product_id):
    feature_flags = MigrationFeatureFlags()
    
    if feature_flags.is_enabled('use_rds_for_inventory_reads'):
        return get_inventory_from_rds(product_id)
    else:
        return get_inventory_from_onprem(product_id)

Phase 5: Validation and Testing (Week 3)

Data Validation Framework:

# Comprehensive data validation
import pandas as pd
import hashlib
from concurrent.futures import ThreadPoolExecutor

class DataValidator:
    def __init__(self, source_conn, target_conn):
        self.source_conn = source_conn
        self.target_conn = target_conn
        self.validation_results = {}
    
    def validate_row_counts(self, tables: list):
        """Validate row counts match between source and target"""
        
        def count_rows(table_name, connection):
            query = f"SELECT COUNT(*) FROM {table_name}"
            return connection.execute(query).scalar()
        
        with ThreadPoolExecutor(max_workers=10) as executor:
            for table in tables:
                source_future = executor.submit(count_rows, table, self.source_conn)
                target_future = executor.submit(count_rows, table, self.target_conn)
                
                source_count = source_future.result()
                target_count = target_future.result()
                
                self.validation_results[table] = {
                    'source_count': source_count,
                    'target_count': target_count,
                    'match': source_count == target_count
                }
        
        return self.validation_results
    
    def validate_data_integrity(self, table_name, key_column):
        """Validate data integrity using checksums"""
        
        # Get sample of data for comparison
        sample_query = f"""
        SELECT {key_column}, 
               MD5(CONCAT_WS('|', *)) as row_hash
        FROM {table_name} 
        ORDER BY {key_column}
        LIMIT 10000
        """
        
        source_df = pd.read_sql(sample_query, self.source_conn)
        target_df = pd.read_sql(sample_query, self.target_conn)
        
        # Compare checksums
        merged = source_df.merge(
            target_df, 
            on=key_column, 
            suffixes=('_source', '_target')
        )
        
        mismatches = merged[
            merged['row_hash_source'] != merged['row_hash_target']
        ]
        
        return {
            'total_compared': len(merged),
            'mismatches': len(mismatches),
            'integrity_percentage': (len(merged) - len(mismatches)) / len(merged) * 100
        }
    
    def validate_performance(self, query_set):
        """Compare query performance between databases"""
        
        performance_results = {}
        
        for query_name, query in query_set.items():
            # Time source query
            source_start = time.time()
            source_result = self.source_conn.execute(query)
            source_time = time.time() - source_start
            
            # Time target query  
            target_start = time.time()
            target_result = self.target_conn.execute(query)
            target_time = time.time() - target_start
            
            performance_results[query_name] = {
                'source_time': source_time,
                'target_time': target_time,
                'improvement_factor': source_time / target_time if target_time > 0 else 0
            }
        
        return performance_results

Phase 6: Cutover Weekend (Week 4)

Cutover Automation Script:

#!/bin/bash
# Zero-downtime cutover automation

set -e  # Exit on any error

LOGFILE="/var/log/migration-cutover.log"
ROLLBACK_POINT=""

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a $LOGFILE
}

# Pre-cutover checks
pre_cutover_checks() {
    log "Starting pre-cutover checks..."
    
    # Check replication lag
    LAG=$(python3 check_replication_lag.py)
    if [ $LAG -gt 30 ]; then
        log "ERROR: Replication lag is ${LAG} seconds. Aborting cutover."
        exit 1
    fi
    
    # Validate data integrity
    python3 validate_data_integrity.py
    if [ $? -ne 0 ]; then
        log "ERROR: Data validation failed. Aborting cutover."
        exit 1
    fi
    
    # Check application health
    curl -f http://healthcheck.internal/api/health
    if [ $? -ne 0 ]; then
        log "ERROR: Application health check failed. Aborting cutover."
        exit 1
    fi
    
    log "Pre-cutover checks passed."
}

# Create rollback point
create_rollback_point() {
    log "Creating rollback point..."
    
    # Stop application writes temporarily
    kubectl scale deployment api-server --replicas=0
    
    # Take final snapshot
    aws rds create-db-cluster-snapshot \
        --db-cluster-identifier prod-cluster \
        --db-cluster-snapshot-identifier "pre-cutover-$(date +%Y%m%d%H%M%S)"
    
    ROLLBACK_POINT="pre-cutover-$(date +%Y%m%d%H%M%S)"
    log "Rollback point created: $ROLLBACK_POINT"
}

# Execute cutover
execute_cutover() {
    log "Executing cutover..."
    
    # Update application configuration
    kubectl create configmap database-config \
        --from-literal=primary_host="prod-primary.cluster-xyz.us-west-2.rds.amazonaws.com" \
        --from-literal=read_hosts="prod-reader-1.cluster-xyz.us-west-2.rds.amazonaws.com,prod-reader-2.cluster-xyz.us-west-2.rds.amazonaws.com" \
        --dry-run=client -o yaml | kubectl apply -f -
    
    # Rolling restart of application pods
    kubectl rollout restart deployment api-server
    kubectl rollout restart deployment worker-service
    kubectl rollout restart deployment reporting-service
    
    # Wait for rollout to complete
    kubectl rollout status deployment api-server --timeout=300s
    kubectl rollout status deployment worker-service --timeout=300s
    kubectl rollout status deployment reporting-service --timeout=300s
    
    log "Application restarted with new database configuration."
}

# Post-cutover validation
post_cutover_validation() {
    log "Running post-cutover validation..."
    
    # Health checks
    for i in {1..30}; do
        if curl -f http://healthcheck.internal/api/health; then
            log "Application health check passed."
            break
        fi
        log "Health check attempt $i failed, retrying in 10 seconds..."
        sleep 10
    done
    
    # Test critical functionality
    python3 test_critical_functions.py
    if [ $? -ne 0 ]; then
        log "ERROR: Critical function tests failed. Consider rollback."
        return 1
    fi
    
    # Monitor for 30 minutes
    log "Monitoring system for 30 minutes..."
    python3 monitor_post_cutover.py --duration=30
    
    log "Post-cutover validation completed successfully."
}

# Rollback function
rollback() {
    log "INITIATING ROLLBACK..."
    
    # Restore original configuration
    kubectl create configmap database-config \
        --from-literal=primary_host="on-premise-db.company.local" \
        --from-literal=read_hosts="on-premise-db.company.local" \
        --dry-run=client -o yaml | kubectl apply -f -
    
    # Rolling restart back to original configuration
    kubectl rollout restart deployment api-server
    kubectl rollout restart deployment worker-service
    kubectl rollout restart deployment reporting-service
    
    log "Rollback completed."
}

# Main execution
main() {
    log "Starting database migration cutover..."
    
    # Set trap for cleanup on exit
    trap rollback ERR
    
    pre_cutover_checks
    create_rollback_point
    execute_cutover
    
    if post_cutover_validation; then
        log "Migration cutover completed successfully!"
        # Remove rollback trap since we succeeded
        trap - ERR
    else
        log "Post-cutover validation failed. Initiating rollback."
        rollback
        exit 1
    fi
}

# Execute main function
main "$@"

Results and Performance Impact

Migration Metrics

Timeline:

Data Validation Results:

Table Validation Summary:
├── Row count matches: 147/150 tables (98%)
├── Data integrity: 99.97% match rate
├── Schema consistency: 100% match
└── Performance tests: All passed

Discrepancies Found:
├── 3 tables with timestamp precision differences
├── 2 auto-increment sequences out of sync
└── 1 table with encoding differences (resolved)

Performance Improvements

Query Performance Comparison:

-- Before (On-premise): Average query times
Inventory lookups:     450ms
Order history:         1.2s
Reporting queries:     15-30s
Complex aggregations:  45s

-- After (RDS): Average query times  
Inventory lookups:     120ms (73% improvement)
Order history:         280ms (77% improvement)
Reporting queries:     3-8s (80% improvement)
Complex aggregations:  12s (73% improvement)

Cost Analysis:

Monthly Infrastructure Costs:

On-Premise (Previous):
├── Hardware depreciation:    $3,500
├── Maintenance contracts:    $1,200
├── Power and cooling:        $800
├── IT staff allocation:      $2,500
└── Total:                    $8,000

AWS RDS (New):
├── Primary instance:         $1,100
├── Read replicas (2):        $1,800
├── Storage and IOPS:         $600
├── Backup storage:           $200
├── Data transfer:            $100
└── Total:                    $3,800

Monthly Savings:              $4,200 (52% reduction)
Annual Savings:               $50,400

Challenges and Solutions

Challenge 1: Complex Stored Procedures

Problem: 200+ stored procedures with PostgreSQL-specific syntax

Solution:

-- Automated procedure conversion
CREATE OR REPLACE FUNCTION migrate_procedure_syntax()
RETURNS void AS $$
DECLARE
    proc_record RECORD;
    new_definition TEXT;
BEGIN
    FOR proc_record IN 
        SELECT proname, prosrc 
        FROM pg_proc 
        WHERE pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')
    LOOP
        -- Convert common syntax differences
        new_definition := proc_record.prosrc;
        
        -- Replace proprietary functions
        new_definition := REPLACE(new_definition, 'LIMIT 1 INTO', 'INTO LIMIT 1');
        new_definition := REPLACE(new_definition, 'ROWNUM', 'ROW_NUMBER()');
        
        -- Update procedure definition
        EXECUTE format('CREATE OR REPLACE FUNCTION %I() RETURNS void AS $func$ %s $func$ LANGUAGE plpgsql',
                      proc_record.proname, new_definition);
    END LOOP;
END;
$$ LANGUAGE plpgsql;

Challenge 2: Application Connection Pooling

Problem: Connection pool configuration needed optimization for cloud environment

Solution:

# Optimized connection pooling for RDS
from sqlalchemy.pool import QueuePool
import boto3

class CloudDatabasePool:
    def __init__(self):
        self.rds_client = boto3.client('rds')
        
    def create_optimized_pool(self, connection_string):
        """Create connection pool optimized for RDS"""
        
        # Get RDS instance information
        instance_info = self.get_rds_instance_info()
        max_connections = instance_info['max_connections']
        
        # Configure pool based on instance size
        pool_size = min(20, max_connections // 4)  # Conservative sizing
        
        return create_engine(
            connection_string,
            poolclass=QueuePool,
            pool_size=pool_size,
            max_overflow=pool_size // 2,
            pool_timeout=30,
            pool_recycle=3600,  # Recycle connections hourly
            pool_pre_ping=True,  # Validate connections
            
            # RDS-specific optimizations
            connect_args={
                'application_name': 'supply_chain_app',
                'connect_timeout': 10,
                'command_timeout': 30,
                'options': '-c statement_timeout=30000'
            }
        )

Challenge 3: Monitoring and Alerting

Problem: Existing monitoring needed to adapt to cloud environment

Solution:

# Comprehensive RDS monitoring
class RDSMonitoring:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.rds = boto3.client('rds')
        
    def setup_alerts(self, cluster_identifier):
        """Setup comprehensive RDS monitoring"""
        
        alerts = [
            {
                'name': 'HighCPUUtilization',
                'metric': 'CPUUtilization',
                'threshold': 80,
                'comparison': 'GreaterThanThreshold',
                'description': 'Database CPU usage is high'
            },
            {
                'name': 'HighDatabaseConnections',
                'metric': 'DatabaseConnections', 
                'threshold': 40,
                'comparison': 'GreaterThanThreshold',
                'description': 'High number of database connections'
            },
            {
                'name': 'ReadLatencyHigh',
                'metric': 'ReadLatency',
                'threshold': 0.2,
                'comparison': 'GreaterThanThreshold',
                'description': 'Database read latency is high'
            }
        ]
        
        for alert in alerts:
            self.cloudwatch.put_metric_alarm(
                AlarmName=f"{cluster_identifier}-{alert['name']}",
                ComparisonOperator=alert['comparison'],
                EvaluationPeriods=2,
                MetricName=alert['metric'],
                Namespace='AWS/RDS',
                Period=300,
                Statistic='Average',
                Threshold=alert['threshold'],
                ActionsEnabled=True,
                AlarmActions=[
                    'arn:aws:sns:us-west-2:123456789012:database-alerts'
                ],
                AlarmDescription=alert['description'],
                Dimensions=[
                    {
                        'Name': 'DBClusterIdentifier',
                        'Value': cluster_identifier
                    }
                ]
            )

Lessons Learned

1. Thorough Testing is Non-Negotiable

Key Learning: Spent 60% of project time on testing and validation

Implementation:

2. Feature Flags Enable Safe Rollouts

Key Learning: Gradual migration reduces risk significantly

Benefits Realized:

3. Monitoring Must Evolve with Infrastructure

Key Learning: Cloud monitoring requires different approaches

New Capabilities:

Future Enhancements

2021 Database Roadmap

1. Multi-Region Setup

# Multi-region RDS configuration
regions:
  primary: "us-west-2"
  secondary: "us-east-1"
  
cross_region_backup:
  automated: true
  retention: 30_days
  
disaster_recovery:
  rpo: "15_minutes"
  rto: "30_minutes"

2. Advanced Analytics Integration

# Real-time analytics pipeline
def setup_analytics_pipeline():
    # DMS to Kinesis for real-time streaming
    kinesis_config = {
        'stream_name': 'database_changes',
        'shard_count': 4,
        'retention_period': 24
    }
    
    # Lambda for transformation
    lambda_config = {
        'function_name': 'transform_db_changes',
        'runtime': 'python3.8',
        'memory': 512,
        'timeout': 60
    }
    
    # S3 + Athena for analytics
    analytics_config = {
        'bucket': 'supply-chain-analytics',
        'partitioning': 'year/month/day',
        'format': 'parquet'
    }

Conclusion

The PostgreSQL to RDS migration was a critical success that delivered immediate business value while positioning us for future growth. The zero-downtime approach proved that even the most critical systems can be modernized without business disruption.

Key Success Factors:

Business Impact:

The migration experience reinforced that database modernization isn’t just about technology—it’s about building organizational confidence in cloud transformation.


Planning a database migration? Let’s connect on LinkedIn to discuss strategies and lessons learned.