Skip to content

Commit

Permalink
Improve transition order, log primary failover point
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy committed Nov 12, 2023
1 parent 106bcf9 commit 4da25b7
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 63 deletions.
2 changes: 2 additions & 0 deletions src/brad/config/system_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ class SystemEvent(enum.Enum):
PreTransitionCompleted = "pre_transition_completed"
PostTransitionStarted = "post_transition_started"
PostTransitionCompleted = "post_transition_completed"

AuroraPrimaryFailover = "aurora_primary_failover"
2 changes: 1 addition & 1 deletion src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ async def _handle_new_blueprint(
if self._planner is not None:
self._planner.set_disable_triggers(disable=True)
self._transition_orchestrator = TransitionOrchestrator(
self._config, self._blueprint_mgr
self._config, self._blueprint_mgr, self._system_event_logger
)
self._transition_task = asyncio.create_task(self._run_transition_part_one())

Expand Down
135 changes: 73 additions & 62 deletions src/brad/daemon/transition_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
AURORA_SEQ_COLUMN,
AURORA_EXTRACT_PROGRESS_TABLE_NAME,
)
from brad.config.system_event import SystemEvent
from brad.daemon.system_event_logger import SystemEventLogger
from brad.data_sync.execution.context import ExecutionContext
from brad.data_sync.operators.drop_aurora_triggers import DropAuroraTriggers
from brad.data_sync.execution.executor import DataSyncExecutor
Expand All @@ -37,6 +39,7 @@ def __init__(
self,
config: ConfigFile,
blueprint_mgr: BlueprintManager,
system_event_logger: Optional[SystemEventLogger] = None,
) -> None:
self._config = config
self._blueprint_mgr = blueprint_mgr
Expand All @@ -45,6 +48,7 @@ def __init__(
self._waiting_for_front_ends = 0
self._data_sync_executor = DataSyncExecutor(self._config, self._blueprint_mgr)
self._cxns: Optional[EngineConnections] = None
self._system_event_logger = system_event_logger

self._refresh_transition_metadata()

Expand Down Expand Up @@ -294,48 +298,40 @@ async def _run_aurora_pre_transition(

# NOTE: We will need a more robust process to deal with cases where we
# are at the replica limit (max. 15 replicas).
#
# We should also take into account the kind of transition to make the
# change more graceful (e.g., when switching to lower resourced
# provisionings).

if old.instance_type() != new.instance_type():
# Handle the primary first.
old_primary_instance = (
self._blueprint_mgr.get_directory().aurora_writer().instance_id()
)
new_primary_instance = _AURORA_PRIMARY_FORMAT.format(
cluster_id=self._config.aurora_cluster_id,
version=str(next_version).zfill(5),
)
logger.debug("Creating new Aurora replica: %s", new_primary_instance)
await self._rds.create_replica(
self._config.aurora_cluster_id,
new_primary_instance,
new,
wait_until_available=True,
)
logger.debug(
"Failing over %s to the new replica: %s",
self._config.aurora_cluster_id,
new_primary_instance,
)
await self._rds.run_primary_failover(
self._config.aurora_cluster_id,
new_primary_instance,
wait_until_complete=True,
)
logger.debug("Failover complete for %s", self._config.aurora_cluster_id)

logger.debug("Deleting the old primary: %s", old_primary_instance)
await self._rds.delete_replica(old_primary_instance)
logger.debug("Done deleting the old primary: %s", old_primary_instance)

# Create new replicas first.
new_replica_count = max(new.num_nodes() - 1, 0)
old_replica_count = max(old.num_nodes() - 1, 0)
if new_replica_count > 0 and new_replica_count > old_replica_count:
next_index = old_replica_count
while next_index < new_replica_count:
new_replica_id = _AURORA_REPLICA_FORMAT.format(
cluster_id=self._config.aurora_cluster_id,
version=str(next_version).zfill(5),
index=str(next_index).zfill(2),
)
logger.debug("Creating replica %s", new_replica_id)
# Ideally we wait for the replicas to finish creation in
# parallel. Because of how we make the boto3 client async,
# there's a possibility of having multiple API calls in flight
# at the same time, which boto3 does not support. To be safe, we
# just run these replica creations sequentially.
await self._rds.create_replica(
self._config.aurora_cluster_id,
new_replica_id,
new,
wait_until_available=True,
)
next_index += 1
await self._blueprint_mgr.refresh_directory()
if on_instance_identity_change is not None:
# The primary changed. We run the callback so that clients can
# update any cached state that relies on instance identities
# (e.g., Performance Insights metrics).
on_instance_identity_change()

if old.instance_type() != new.instance_type():
# Handle the replicas first (need to change their instance type).
replicas_to_modify = min(new.num_nodes() - 1, old.num_nodes() - 1)

if replicas_to_modify == 1 and old.num_nodes() - 1 == 1:
# Special case: The current blueprint only has one read replica
# and we need to modify it to transition to the next blueprint.
Expand Down Expand Up @@ -364,7 +360,7 @@ async def _run_aurora_pre_transition(

else:
# Modify replicas one-by-one. At most one reader replica is down
# at any time - but we consider this acceptable.
# at any time, but we consider this acceptable.
for idx, replica in enumerate(
self._blueprint_mgr.get_directory().aurora_readers()
):
Expand All @@ -380,30 +376,45 @@ async def _run_aurora_pre_transition(
replica.instance_id(), new, wait_until_available=True
)

new_replica_count = max(new.num_nodes() - 1, 0)
old_replica_count = max(old.num_nodes() - 1, 0)
if new_replica_count > 0 and new_replica_count > old_replica_count:
next_index = old_replica_count
while next_index < new_replica_count:
new_replica_id = _AURORA_REPLICA_FORMAT.format(
cluster_id=self._config.aurora_cluster_id,
version=str(next_version).zfill(5),
index=str(next_index).zfill(2),
)
logger.debug("Creating replica %s", new_replica_id)
# Ideally we wait for the replicas to finish creation in
# parallel. Because of how we make the boto3 client async,
# there's a possibility of having multiple API calls in flight
# at the same time, which boto3 does not support. To be safe, we
# just run these replica creations sequentially.
await self._rds.create_replica(
self._config.aurora_cluster_id,
new_replica_id,
new,
wait_until_available=True,
)
next_index += 1
# Handle the primary last.
old_primary_instance = (
self._blueprint_mgr.get_directory().aurora_writer().instance_id()
)
new_primary_instance = _AURORA_PRIMARY_FORMAT.format(
cluster_id=self._config.aurora_cluster_id,
version=str(next_version).zfill(5),
)
logger.debug("Creating new Aurora replica: %s", new_primary_instance)
await self._rds.create_replica(
self._config.aurora_cluster_id,
new_primary_instance,
new,
wait_until_available=True,
)
logger.debug(
"Failing over %s to the new replica: %s",
self._config.aurora_cluster_id,
new_primary_instance,
)
await self._rds.run_primary_failover(
self._config.aurora_cluster_id,
new_primary_instance,
wait_until_complete=True,
)
logger.debug("Failover complete for %s", self._config.aurora_cluster_id)
if self._system_event_logger is not None:
self._system_event_logger.log(SystemEvent.AuroraPrimaryFailover)

logger.debug("Deleting the old primary: %s", old_primary_instance)
await self._rds.delete_replica(old_primary_instance)
logger.debug("Done deleting the old primary: %s", old_primary_instance)

await self._blueprint_mgr.refresh_directory()
if on_instance_identity_change is not None:
# The primary changed. We run the callback so that clients can
# update any cached state that relies on instance identities
# (e.g., Performance Insights metrics).
on_instance_identity_change()

# Aurora's pre-transition work is complete!

Expand Down

0 comments on commit 4da25b7

Please sign in to comment.