Skip to content

Commit

Permalink
Setup logical replication
Browse files Browse the repository at this point in the history
  • Loading branch information
sravfeyn committed Jan 27, 2025
1 parent afea4f6 commit 630d48c
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 8 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,22 @@ Bootstrap v5 is installed using npm and customised by tweaking your variables in
You can find a list of available variables [in the bootstrap source](https://github.com/twbs/bootstrap/blob/v5.1.3/scss/_variables.scss), or get explanations on them in the [Bootstrap docs](https://getbootstrap.com/docs/5.1/customize/sass/).

Bootstrap's javascript as well as its dependencies are concatenated into a single file: `static/js/vendors.js`.

### Setting up logical replication

- Create another postgres database (in a seperate cluster)
- Add `SECONDARY_DATABASE_URL` to the `.env` file
- Enable replication on the default database (requires restart)
```
- "wal_level=logical"
- "max_replication_slots=5"
- "max_wal_senders=5"
```
- Create table schemas in the secondary database
```
./manage.py migrate_multi
```
- Initialize replication
```
./manage.py setup_logical_replication
```
85 changes: 85 additions & 0 deletions commcare_connect/opportunity/management/commands/migrate_multi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import sys
import traceback
from copy import copy

import gevent
from django.conf import settings
from django.core.management import call_command
from django.core.management.base import BaseCommand


def get_traceback_string():
from io import StringIO

f = StringIO()
traceback.print_exc(file=f)
return f.getvalue()


class Command(BaseCommand):
help = "Call 'migrate' for each configured database"

def add_arguments(self, parser):
parser.add_argument("app_label", nargs="?", help="App label of an application to synchronize the state.")
parser.add_argument(
"migration_name",
nargs="?",
help=(
"Database state will be brought to the state after that "
'migration. Use the name "zero" to unapply all migrations.'
),
)
parser.add_argument(
"--noinput",
action="store_false",
dest="interactive",
default=True,
help="Tells Django to NOT prompt the user for input of any kind.",
)
parser.add_argument(
"--fake",
action="store_true",
dest="fake",
default=False,
help="Mark migrations as run without actually running them.",
)

def handle(self, app_label, migration_name, **options):
args = []
if app_label is not None:
args.append(app_label)
if migration_name is not None:
args.append(migration_name)

options["verbosity"] = 0

def migrate_db(db_alias, options=options):
call_options = copy(options)
call_options["database"] = db_alias
call_command("migrate", *args, **call_options)

dbs_to_migrate = [
db_alias for db_alias in settings.DATABASES.keys() if settings.DATABASES[db_alias].get("MIGRATE", True)
]
dbs_to_skip = list(set(settings.DATABASES) - set(dbs_to_migrate))

print("\nThe following databases will be migrated:\n * {}\n".format("\n * ".join(dbs_to_migrate)))
if dbs_to_skip:
print("\nThe following databases will be skipped:\n * {}\n".format("\n * ".join(dbs_to_skip)))

jobs = [gevent.spawn(migrate_db, db_alias) for db_alias in dbs_to_migrate]

gevent.joinall(jobs)

migration_error_occured = False
for job in jobs:
try:
job.get()
except Exception:
print("\n======================= Error During Migration =======================")
print(repr(job))
print(get_traceback_string())
migration_error_occured = True

if migration_error_occured:
sys.exit(1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.db import DEFAULT_DB_ALIAS, connections

from commcare_connect.opportunity.models import (
Assessment,
CompletedModule,
CompletedWork,
LearnModule,
Opportunity,
OpportunityAccess,
Payment,
PaymentUnit,
UserVisit,
)
from commcare_connect.users.models import User

REPLICATION_ALLOWED_MODELS = [
Opportunity,
OpportunityAccess,
LearnModule,
CompletedModule,
Payment,
User,
UserVisit,
CompletedWork,
PaymentUnit,
Assessment,
]


class Command(BaseCommand):
help = "Create a publication for the default database and a subscription for the secondary database alias."

def handle(self, *args, **options):
secondary_db_alias = settings.SECONDARY_DB_ALIAS
if secondary_db_alias == "default":
raise CommandError("'secondary' database needs to be configured")

# Ensure secondary database has table schemas
self.stdout.write(
self.style.WARNING(
"Ensure that default database has logical replication enabled and"
"that the secondary database has django migration run."
)
)
confirm_migrate = input("Proceed? (yes/no): ").strip().lower()

if confirm_migrate != "yes":
self.stdout.write(self.style.ERROR("Aborting: Please run 'migrate --database=secondary' and try again."))
return

# Create publication in the default database
default_conn = connections[DEFAULT_DB_ALIAS]
self.stdout.write("Creating publication in the default database...")
publication_name = "tables_for_superset_pub"

# Construct publication table list
table_list = []
for model in REPLICATION_ALLOWED_MODELS:
try:
table_list.append(model._meta.db_table)
except Exception as e:
raise CommandError(f"Error resolving model {model}: {e}")

if not table_list:
raise CommandError("No valid tables found for publication.")

# Create publication
with default_conn.cursor() as cursor:
try:
# Check if publication exists
cursor.execute("SELECT pubname FROM pg_publication WHERE pubname = %s;", [publication_name])
if cursor.fetchone():
self.stdout.write(
self.style.WARNING(f"Publication '{publication_name}' already exists. Skipping creation.")
)
else:
# Create new publication
tables = ", ".join([f'"{table}"' for table in table_list])
cursor.execute(f"CREATE PUBLICATION {publication_name} FOR TABLE {tables};")
self.stdout.write(self.style.SUCCESS(f"Publication '{publication_name}' created successfully."))
except Exception as e:
raise CommandError(f"Failed to create publication: {e}")

# Create subscription in the secondary database
secondary_conn = connections[secondary_db_alias]
self.stdout.write("Creating subscription in the secondary database...")
subscription_name = "tables_for_superset_sub"

with secondary_conn.cursor() as cursor:
try:
# Check if subscription exists
cursor.execute("SELECT subname FROM pg_subscription WHERE subname = %s;", [subscription_name])
if cursor.fetchone():
self.stdout.write(
self.style.WARNING(f"Subscription '{subscription_name}' already exists. Skipping creation.")
)
else:
# Create new subscription
default_db_settings = default_conn.settings_dict
primary_conn_info = (
f"host={default_db_settings['HOST']} "
f"port={default_db_settings['PORT']} "
f"dbname={default_db_settings['NAME']} "
f"user={default_db_settings['USER']} "
f"password={default_db_settings['PASSWORD']}"
)
cursor.execute(
f"""
CREATE SUBSCRIPTION {subscription_name}
CONNECTION '{primary_conn_info}'
PUBLICATION {publication_name};
"""
)
self.stdout.write(self.style.SUCCESS(f"Subscription '{subscription_name}' created successfully."))
except Exception as e:
raise CommandError(f"Failed to create subscription: {e}")

self.stdout.write(self.style.SUCCESS("Publication and subscription setup completed."))
63 changes: 63 additions & 0 deletions config/db_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from django.conf import settings
from django.db import DEFAULT_DB_ALIAS


class SecondaryDatabaseRouter:
"""
A router to direct migration operations for specific models to the secondary database.
"""

def db_for_read(self, model, **hints):
# Prevent reads from the secondary database
return DEFAULT_DB_ALIAS

def db_for_write(self, model, **hints):
# Prevent writes to the secondary database
return DEFAULT_DB_ALIAS

def allow_relation(self, obj1, obj2, **hints):
# Allow relations only in the default database
return None

def allow_migrate(self, db, app_label, model_name=None, **hints):
if db == DEFAULT_DB_ALIAS:
return True
elif db == settings.SECONDARY_DB_ALIAS:
# Data migrations using RunPython don't need to be
# applied on secondary DB as they are replicated
# at the database level.
operation = hints.get("operation", None)
if operation is None:
return True

# Allow schema-only operations
from django.db.migrations.operations import (
AddField,
AlterField,
AlterIndexTogether,
AlterModelOptions,
AlterModelTable,
AlterUniqueTogether,
CreateModel,
DeleteModel,
RemoveField,
RenameField,
)

ALLOWED_OPERATIONS = (
CreateModel,
DeleteModel,
AlterModelTable,
AlterModelOptions,
AlterUniqueTogether,
AlterIndexTogether,
AddField,
RemoveField,
AlterField,
RenameField,
)

if not isinstance(operation, ALLOWED_OPERATIONS):
return False

return True
8 changes: 8 additions & 0 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@
DATABASES["default"]["ATOMIC_REQUESTS"] = True
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"

SECONDARY_DB_ALIAS = "default"

if env.db("SECONDARY_DATABASE_URL", default=""):
SECONDARY_DB_ALIAS = "secondary"
DATABASES[SECONDARY_DB_ALIAS] = env.db("SECONDARY_DATABASE_URL")
DATABASE_ROUTERS = ["config.db_router.SecondaryDatabaseRouter"]


# URLS
# ------------------------------------------------------------------------------
ROOT_URLCONF = "config.urls"
Expand Down
2 changes: 1 addition & 1 deletion docker/start_migrate
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ set -o nounset


echo "Django migrate"
python manage.py migrate --noinput
python manage.py migrate_multi --noinput
echo "Run Gunicorn"
gunicorn config.wsgi --bind 0.0.0.0:8000 --chdir=/app --timeout 300 -w 3
1 change: 1 addition & 0 deletions requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ tablib[xlsx]
flatten-dict
twilio
geopy
gevent

# Django
# ------------------------------------------------------------------------------
Expand Down
11 changes: 11 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ geographiclib==2.0
# via geopy
geopy==2.4.1
# via -r requirements/base.in
gevent==24.11.1
# via -r requirements/base.in
greenlet==3.1.1
# via gevent
h11==0.14.0
# via httpcore
h2==4.1.0
Expand Down Expand Up @@ -275,3 +279,10 @@ wrapt==1.15.0
# via deprecated
yarl==1.9.2
# via aiohttp
zope-event==5.0
# via gevent
zope-interface==7.2
# via gevent

# The following packages are considered to be unsafe in a requirements file:
# setuptools
2 changes: 2 additions & 0 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ anyio==3.7.1
# -c requirements/base.txt
# httpcore
# watchfiles
appnope==0.1.4
# via ipython
asgiref==3.7.2
# via
# -c requirements/base.txt
Expand Down
22 changes: 15 additions & 7 deletions requirements/production.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@ django-anymail[amazon-ses]==10.1
# via -r requirements/production.in
django-storages[boto3]==1.13.2
# via -r requirements/production.in
gevent==24.2.1
# via gunicorn
greenlet==3.1.0
# via gevent
gevent==24.11.1
# via
# -c requirements/base.txt
# gunicorn
greenlet==3.1.1
# via
# -c requirements/base.txt
# gevent
gunicorn[gevent]==21.2.0
# via -r requirements/production.in
idna==3.4
Expand Down Expand Up @@ -97,9 +101,13 @@ urllib3==1.26.16
# requests
# sentry-sdk
zope-event==5.0
# via gevent
zope-interface==7.0.3
# via gevent
# via
# -c requirements/base.txt
# gevent
zope-interface==7.2
# via
# -c requirements/base.txt
# gevent

# The following packages are considered to be unsafe in a requirements file:
# setuptools

0 comments on commit 630d48c

Please sign in to comment.