From ab7acf3932665065583cb7be22b44624af364673 Mon Sep 17 00:00:00 2001 From: Sravan Reddy Date: Wed, 29 Jan 2025 21:01:22 +0530 Subject: [PATCH] Address review comments --- .../management/commands/migrate_multi.py | 8 +- .../commands/setup_logical_replication.py | 92 +++++++++---------- config/db_router.py | 16 ++-- config/settings/base.py | 11 +-- 4 files changed, 59 insertions(+), 68 deletions(-) diff --git a/commcare_connect/opportunity/management/commands/migrate_multi.py b/commcare_connect/opportunity/management/commands/migrate_multi.py index f3cea395..724c6fc4 100644 --- a/commcare_connect/opportunity/management/commands/migrate_multi.py +++ b/commcare_connect/opportunity/management/commands/migrate_multi.py @@ -58,14 +58,8 @@ def migrate_db(db_alias, options=options): call_options["database"] = db_alias call_command("migrate", *args, **call_options) - dbs_to_migrate = [ - db_alias for db_alias in settings.DATABASES.keys() if settings.DATABASES[db_alias].get("MIGRATE", True) - ] - dbs_to_skip = list(set(settings.DATABASES) - set(dbs_to_migrate)) - + dbs_to_migrate = settings.DATABASES.keys() print("\nThe following databases will be migrated:\n * {}\n".format("\n * ".join(dbs_to_migrate))) - if dbs_to_skip: - print("\nThe following databases will be skipped:\n * {}\n".format("\n * ".join(dbs_to_skip))) jobs = [gevent.spawn(migrate_db, db_alias) for db_alias in dbs_to_migrate] diff --git a/commcare_connect/opportunity/management/commands/setup_logical_replication.py b/commcare_connect/opportunity/management/commands/setup_logical_replication.py index 42a504f1..6e0d4344 100644 --- a/commcare_connect/opportunity/management/commands/setup_logical_replication.py +++ b/commcare_connect/opportunity/management/commands/setup_logical_replication.py @@ -13,19 +13,23 @@ PaymentUnit, UserVisit, ) +from commcare_connect.organization.models import Organization +from commcare_connect.program.models import Program from commcare_connect.users.models import User REPLICATION_ALLOWED_MODELS = [ + Assessment, + CompletedModule, + CompletedWork, + LearnModule, Opportunity, OpportunityAccess, - LearnModule, - CompletedModule, + Organization, Payment, + PaymentUnit, + Program, User, UserVisit, - CompletedWork, - PaymentUnit, - Assessment, ] @@ -34,7 +38,7 @@ class Command(BaseCommand): def handle(self, *args, **options): secondary_db_alias = settings.SECONDARY_DB_ALIAS - if secondary_db_alias == "default": + if not secondary_db_alias: raise CommandError("'secondary' database needs to be configured") # Ensure secondary database has table schemas @@ -68,20 +72,17 @@ def handle(self, *args, **options): # Create publication with default_conn.cursor() as cursor: - try: - # Check if publication exists - cursor.execute("SELECT pubname FROM pg_publication WHERE pubname = %s;", [publication_name]) - if cursor.fetchone(): - self.stdout.write( - self.style.WARNING(f"Publication '{publication_name}' already exists. Skipping creation.") - ) - else: - # Create new publication - tables = ", ".join([f'"{table}"' for table in table_list]) - cursor.execute(f"CREATE PUBLICATION {publication_name} FOR TABLE {tables};") - self.stdout.write(self.style.SUCCESS(f"Publication '{publication_name}' created successfully.")) - except Exception as e: - raise CommandError(f"Failed to create publication: {e}") + # Check if publication exists + cursor.execute("SELECT pubname FROM pg_publication WHERE pubname = %s;", [publication_name]) + if cursor.fetchone(): + self.stdout.write( + self.style.WARNING(f"Publication '{publication_name}' already exists. Skipping creation.") + ) + else: + # Create new publication + tables = ", ".join([f'"{table}"' for table in table_list]) + cursor.execute(f"CREATE PUBLICATION {publication_name} FOR TABLE {tables};") + self.stdout.write(self.style.SUCCESS(f"Publication '{publication_name}' created successfully.")) # Create subscription in the secondary database secondary_conn = connections[secondary_db_alias] @@ -89,32 +90,29 @@ def handle(self, *args, **options): subscription_name = "tables_for_superset_sub" with secondary_conn.cursor() as cursor: - try: - # Check if subscription exists - cursor.execute("SELECT subname FROM pg_subscription WHERE subname = %s;", [subscription_name]) - if cursor.fetchone(): - self.stdout.write( - self.style.WARNING(f"Subscription '{subscription_name}' already exists. Skipping creation.") - ) - else: - # Create new subscription - default_db_settings = default_conn.settings_dict - primary_conn_info = ( - f"host={default_db_settings['HOST']} " - f"port={default_db_settings['PORT']} " - f"dbname={default_db_settings['NAME']} " - f"user={default_db_settings['USER']} " - f"password={default_db_settings['PASSWORD']}" - ) - cursor.execute( - f""" - CREATE SUBSCRIPTION {subscription_name} - CONNECTION '{primary_conn_info}' - PUBLICATION {publication_name}; - """ - ) - self.stdout.write(self.style.SUCCESS(f"Subscription '{subscription_name}' created successfully.")) - except Exception as e: - raise CommandError(f"Failed to create subscription: {e}") + # Check if subscription exists + cursor.execute("SELECT subname FROM pg_subscription WHERE subname = %s;", [subscription_name]) + if cursor.fetchone(): + self.stdout.write( + self.style.WARNING(f"Subscription '{subscription_name}' already exists. Skipping creation.") + ) + else: + # Create new subscription + default_db_settings = default_conn.settings_dict + primary_conn_info = ( + f"host={default_db_settings['HOST']} " + f"port={default_db_settings['PORT']} " + f"dbname={default_db_settings['NAME']} " + f"user={default_db_settings['USER']} " + f"password={default_db_settings['PASSWORD']}" + ) + cursor.execute( + f""" + CREATE SUBSCRIPTION {subscription_name} + CONNECTION '{primary_conn_info}' + PUBLICATION {publication_name}; + """ + ) + self.stdout.write(self.style.SUCCESS(f"Subscription '{subscription_name}' created successfully.")) self.stdout.write(self.style.SUCCESS("Publication and subscription setup completed.")) diff --git a/config/db_router.py b/config/db_router.py index 57084ef4..f5d75ae2 100644 --- a/config/db_router.py +++ b/config/db_router.py @@ -2,7 +2,7 @@ from django.db import DEFAULT_DB_ALIAS -class SecondaryDatabaseRouter: +class ConnectDatabaseRouter: """ A router to direct migration operations for specific models to the secondary database. """ @@ -22,7 +22,7 @@ def allow_relation(self, obj1, obj2, **hints): def allow_migrate(self, db, app_label, model_name=None, **hints): if db == DEFAULT_DB_ALIAS: return True - elif db == settings.SECONDARY_DB_ALIAS: + elif db and db == settings.SECONDARY_DB_ALIAS: # Data migrations using RunPython don't need to be # applied on secondary DB as they are replicated # at the database level. @@ -45,15 +45,15 @@ def allow_migrate(self, db, app_label, model_name=None, **hints): ) ALLOWED_OPERATIONS = ( - CreateModel, - DeleteModel, - AlterModelTable, + AddField, + AlterField, + AlterIndexTogether, AlterModelOptions, + AlterModelTable, AlterUniqueTogether, - AlterIndexTogether, - AddField, + CreateModel, + DeleteModel, RemoveField, - AlterField, RenameField, ) diff --git a/config/settings/base.py b/config/settings/base.py index bbb86f1f..02072a6a 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -30,15 +30,14 @@ default="postgres:///commcare_connect", ), } -DATABASES["default"]["ATOMIC_REQUESTS"] = True -DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" - -SECONDARY_DB_ALIAS = "default" - +SECONDARY_DB_ALIAS = None if env.db("SECONDARY_DATABASE_URL", default=""): SECONDARY_DB_ALIAS = "secondary" DATABASES[SECONDARY_DB_ALIAS] = env.db("SECONDARY_DATABASE_URL") - DATABASE_ROUTERS = ["config.db_router.SecondaryDatabaseRouter"] + DATABASE_ROUTERS = ["config.db_router.ConnectDatabaseRouter"] + +DATABASES["default"]["ATOMIC_REQUESTS"] = True +DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" # URLS