Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Point In Time Recovery #531

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4c65f78
Add binlog_utils_udf plugin.
Zvirovyi Nov 24, 2024
b8287d7
Enable gtid_mode and enforce_gtid_consistency for the MySQL.
Zvirovyi Nov 24, 2024
d8aabf4
Add S3 compatibility check based on the group replication id.
Zvirovyi Nov 24, 2024
cc61944
Point-in-time-recovery.
Zvirovyi Dec 8, 2024
373fc04
Merge branch 'refs/heads/main' into pitr
Zvirovyi Dec 8, 2024
dd1a2ed
Fix constants.
Zvirovyi Dec 9, 2024
19e05ef
Integration tests.
Zvirovyi Dec 9, 2024
ff4f865
Merge branch 'main' into pitr
Zvirovyi Dec 28, 2024
9979001
Binlogs collector service improvement.
Zvirovyi Jan 4, 2025
170268b
Format restore function.
Zvirovyi Jan 4, 2025
160857e
Use context manager for ca_file in s3_helpers.
Zvirovyi Jan 7, 2025
6776fbf
Rename start_stop_binlogs_collecting to reconcile_binlogs_collection.
Zvirovyi Jan 8, 2025
7b8f6d8
Delete binlogs collector config when not needed.
Zvirovyi Jan 9, 2025
9d3abbf
Improve update_binlogs_collector_config.
Zvirovyi Jan 9, 2025
904ab13
Merge branch 'main' into pitr
Zvirovyi Jan 9, 2025
39aad21
Add restore-to-time validation and format notice.
Zvirovyi Jan 11, 2025
7469ec4
Merge branch 'main' into pitr
Zvirovyi Jan 17, 2025
ddd58e7
Merge branch 'main' into pitr
Zvirovyi Jan 22, 2025
2f8d898
Sync lib changes from VM PR.
Zvirovyi Jan 23, 2025
9c7b122
Merge branch 'main' into pitr
Zvirovyi Jan 23, 2025
e02519a
Improve binlogs collection service.
Zvirovyi Jan 23, 2025
c5e0965
Merge branch 'main' into pitr
Zvirovyi Jan 25, 2025
d8fd9cd
Increment LIBPATCH for libs.
Zvirovyi Jan 25, 2025
9466460
Fix errors after main merge.
Zvirovyi Jan 26, 2025
99ab5a0
Merge branch 'main' into pitr
Zvirovyi Jan 29, 2025
7d4733d
LIBPATCH
Zvirovyi Jan 29, 2025
c656eae
Move binlogs collector config to the env.
Zvirovyi Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add S3 compatibility check based on the group replication id.
  • Loading branch information
Zvirovyi committed Nov 24, 2024
commit d8aabf448f003a389d392964ce8b40f1a4e703bd
88 changes: 81 additions & 7 deletions lib/charms/mysql/v0/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ def is_unit_blocked(self) -> bool:
import typing
from typing import Dict, List, Optional, Tuple

from charms.data_platform_libs.v0.s3 import S3Requirer
from charms.data_platform_libs.v0.s3 import (
CredentialsChangedEvent,
CredentialsGoneEvent,
S3Requirer,
)
from charms.mysql.v0.mysql import (
MySQLConfigureInstanceError,
MySQLCreateClusterError,
Expand All @@ -76,6 +80,7 @@ def is_unit_blocked(self) -> bool:
MySQLUnableToGetMemberStateError,
)
from charms.mysql.v0.s3_helpers import (
ensure_s3_compatible_group_replication_id,
fetch_and_check_existence_of_s3_path,
list_backups_in_s3_path,
upload_content_to_s3,
Expand All @@ -102,6 +107,10 @@ def is_unit_blocked(self) -> bool:
# to 0 if you are raising the major API version
LIBPATCH = 12

ANOTHER_S3_CLUSTER_REPOSITORY_ERROR_MESSAGE = "S3 repository claimed by another cluster"
MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR = (
"Move restored cluster to another S3 repository"
)

if typing.TYPE_CHECKING:
from charm import MySQLOperatorCharm
Expand All @@ -119,6 +128,13 @@ def __init__(self, charm: "MySQLOperatorCharm", s3_integrator: S3Requirer) -> No
self.framework.observe(self.charm.on.create_backup_action, self._on_create_backup)
self.framework.observe(self.charm.on.list_backups_action, self._on_list_backups)
self.framework.observe(self.charm.on.restore_action, self._on_restore)
self.framework.observe(
self.s3_integrator.on.credentials_changed, self._on_s3_credentials_changed
)
self.framework.observe(self.charm.on.leader_elected, self._on_s3_credentials_changed)
self.framework.observe(
self.s3_integrator.on.credentials_gone, self._on_s3_credentials_gone
)

# ------------------ Helpers ------------------
@property
Expand Down Expand Up @@ -235,18 +251,33 @@ def _on_list_backups(self, event: ActionEvent) -> None:

# ------------------ Create Backup ------------------

def _on_create_backup(self, event: ActionEvent) -> None:
"""Handle the create backup action."""
logger.info("A backup has been requested on unit")
def _pre_create_backup_checks(self, event: ActionEvent) -> bool:
"""Run some checks before creating the backup.

Returns: a boolean indicating whether operation should be run.
"""
if not self._s3_integrator_relation_exists:
logger.error("Backup failed: missing relation with S3 integrator charm")
event.fail("Missing relation with S3 integrator charm")
return
return False

if "s3-block-message" in self.charm.app_peer_data:
logger.error("Backup failed: S3 relation is blocked for write")
event.fail("S3 relation is blocked for write")
return False

if not self.charm._mysql.is_mysqld_running():
logger.error(f"Backup failed: process mysqld is not running on {self.charm.unit.name}")
event.fail("Process mysqld not running")
return False

return True

def _on_create_backup(self, event: ActionEvent) -> None:
"""Handle the create backup action."""
logger.info("A backup has been requested on unit")

if not self._pre_create_backup_checks(event):
return

datetime_backup_requested = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
Expand Down Expand Up @@ -519,14 +550,19 @@ def _on_restore(self, event: ActionEvent) -> None:
if not success:
logger.error(f"Restore failed: {error_message}")
event.fail(error_message)

if recoverable:
self._clean_data_dir_and_start_mysqld()
else:
self.charm.app_peer_data.update({
"s3-block-message": MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR,
})
self.charm.unit.status = BlockedStatus(error_message)

return

self.charm.app_peer_data.update({
"s3-block-message": MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR,
})

# Run post-restore operations
self.charm.unit.status = MaintenanceStatus("Running post-restore operations")
success, error_message = self._post_restore()
Expand Down Expand Up @@ -674,3 +710,41 @@ def _post_restore(self) -> Tuple[bool, str]:
return False, "Failed to rescan the cluster"

return True, ""

def _on_s3_credentials_changed(self, event: CredentialsChangedEvent) -> None:
if not self.charm.unit.is_leader():
logger.debug("Early exit on _on_s3_credentials_changed: unit is not a leader")
return

if not self._s3_integrator_relation_exists:
logger.debug(
"Early exit on _on_s3_credentials_changed: s3 integrator relation does not exist"
)
return

logger.info("Retrieving s3 parameters from the s3-integrator relation")
s3_parameters, missing_parameters = self._retrieve_s3_parameters()
if missing_parameters:
logger.error(f"Missing S3 parameters: {missing_parameters}")
return

logger.info("Ensuring compatibility with the provided S3 repository")
if ensure_s3_compatible_group_replication_id(
self.charm._mysql.get_current_group_replication_id(), s3_parameters
):
self.charm.app_peer_data.update({
"s3-block-message": "",
})
else:
self.charm.app_peer_data.update({
"s3-block-message": ANOTHER_S3_CLUSTER_REPOSITORY_ERROR_MESSAGE,
})

def _on_s3_credentials_gone(self, event: CredentialsGoneEvent) -> None:
if not self.charm.unit.is_leader():
logger.debug("Early exit on _on_s3_credentials_gone: unit is not a leader")
return

self.charm.app_peer_data.update({
"s3-block-message": "",
})
44 changes: 42 additions & 2 deletions lib/charms/mysql/v0/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def wait_until_mysql_connection(self) -> None:
# Increment this major API version when introducing breaking changes
LIBAPI = 0

LIBPATCH = 75
LIBPATCH = 76

UNIT_TEARDOWN_LOCKNAME = "unit-teardown"
UNIT_ADD_LOCKNAME = "unit-add"
Expand Down Expand Up @@ -419,6 +419,10 @@ class MySQLPluginInstallError(Error):
"""Exception raised when there is an issue installing a MySQL plugin."""


class MySQLGetGroupReplicationIDError(Error):
"""Exception raised when there is an issue acquiring current current group replication id."""


@dataclasses.dataclass
class RouterUser:
"""MySQL Router user."""
Expand Down Expand Up @@ -2474,9 +2478,22 @@ def stop_group_replication(self) -> None:
" session.run_sql('STOP GROUP_REPLICATION')",
)
try:
logger.debug("Stopping Group Replication for unit")
self._run_mysqlsh_script("\n".join(stop_gr_command))
except MySQLClientError:
logger.debug("Failed to stop Group Replication for unit")
logger.warning("Failed to stop Group Replication for unit")

def start_group_replication(self) -> None:
"""Start Group replication on the instance."""
start_gr_command = (
f"shell.connect('{self.instance_def(self.server_config_user)}')",
"session.run_sql('START GROUP_REPLICATION')",
)
try:
logger.debug("Starting Group Replication for unit")
self._run_mysqlsh_script("\n".join(start_gr_command))
except MySQLClientError:
logger.warning("Failed to start Group Replication for unit")

def reboot_from_complete_outage(self) -> None:
"""Wrapper for reboot_cluster_from_complete_outage command."""
Expand Down Expand Up @@ -3098,6 +3115,29 @@ def strip_off_passwords(self, input_string: str) -> str:
stripped_input = stripped_input.replace(password, "xxxxxxxxxxxx")
return stripped_input

def get_current_group_replication_id(self) -> str:
"""Get the current group replication id."""
logger.debug("Getting current group replication id")

commands = (
f"shell.connect('{self.instance_def(self.server_config_user)}')",
'result = session.run_sql("SELECT @@GLOBAL.group_replication_group_name")',
'print(f"<ID>{result.fetch_one()[0]}</ID>")',
)

try:
output = self._run_mysqlsh_script("\n".join(commands))
except MySQLClientError as e:
logger.warning("Failed to get current group replication id", exc_info=e)
raise MySQLGetGroupReplicationIDError(e.message)

matches = re.search(r"<ID>(.+)</ID>", output)

if not matches:
raise MySQLGetGroupReplicationIDError("Failed to get current group replication id")

return matches.group(1)

@abstractmethod
def is_mysqld_running(self) -> bool:
"""Returns whether mysqld is running."""
Expand Down
93 changes: 93 additions & 0 deletions lib/charms/mysql/v0/s3_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

import base64
import logging
import pathlib
import tempfile
import time
from io import BytesIO
from typing import Dict, List, Tuple

import boto3
import botocore
import botocore.exceptions

logger = logging.getLogger(__name__)

Expand All @@ -34,6 +38,8 @@
# to 0 if you are raising the major API version
LIBPATCH = 9

S3_GROUP_REPLICATION_ID_FILE = "group_replication_id.txt"

# botocore/urllib3 clutter the logs when on debug
logging.getLogger("botocore").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
Expand Down Expand Up @@ -93,6 +99,68 @@ def upload_content_to_s3(content: str, content_path: str, s3_parameters: Dict) -
return True


def _read_content_from_s3(content_path: str, s3_parameters: dict) -> str | None:
"""Reads specified content from the provided S3 bucket.

Args:
content_path: The S3 path from which download the content
s3_parameters: A dictionary containing the S3 parameters
The following are expected keys in the dictionary: bucket, region,
endpoint, access-key and secret-key

Returns:
a string with the content if object is successfully downloaded and None if file is not existing or error
occurred during download.
"""
ca_file = None
try:
logger.info(f"Reading content from bucket={s3_parameters['bucket']}, path={content_path}")
session = boto3.session.Session(
aws_access_key_id=s3_parameters["access-key"],
aws_secret_access_key=s3_parameters["secret-key"],
region_name=s3_parameters["region"],
)
verif = True
if ca_chain := s3_parameters.get("tls-ca-chain"):
ca_file = tempfile.NamedTemporaryFile()
ca = "\n".join([base64.b64decode(s).decode() for s in ca_chain])
ca_file.write(ca.encode())
ca_file.flush()
verif = ca_file.name

s3 = session.resource(
"s3",
endpoint_url=s3_parameters["endpoint"],
verify=verif,
)

bucket = s3.Bucket(s3_parameters["bucket"])

with BytesIO() as buf:
bucket.download_fileobj(content_path, buf)
return buf.getvalue().decode("utf-8")
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
logger.info(
f"No such object to read from S3 bucket={s3_parameters['bucket']}, path={content_path}"
)
else:
logger.exception(
f"Failed to read content from S3 bucket={s3_parameters['bucket']}, path={content_path}",
exc_info=e,
)
except Exception as e:
logger.exception(
f"Failed to read content from S3 bucket={s3_parameters['bucket']}, path={content_path}",
exc_info=e,
)
finally:
if ca_file:
ca_file.close()

return None


def _compile_backups_from_file_ids(
metadata_ids: List[str], md5_ids: List[str], log_ids: List[str]
) -> List[Tuple[str, str]]:
Expand Down Expand Up @@ -217,3 +285,28 @@ def fetch_and_check_existence_of_s3_path(path: str, s3_parameters: Dict[str, str
exc_info=e,
)
raise


def ensure_s3_compatible_group_replication_id(
group_replication_id: str, s3_parameters: Dict[str, str]
) -> bool:
"""Checks if group replication id is equal to the one in the provided S3 repository.

If S3 doesn't have this claim (so it's not initialized),
then it will be populated automatically with the provided id.

Args:
group_replication_id: group replication id of the current cluster
s3_parameters: A dictionary containing the S3 parameters
The following are expected keys in the dictionary: bucket, region,
endpoint, access-key and secret-key
"""
s3_id_path = str(pathlib.Path(s3_parameters["path"]) / S3_GROUP_REPLICATION_ID_FILE)
s3_id = _read_content_from_s3(s3_id_path, s3_parameters)
if s3_id and s3_id != group_replication_id:
logger.info(
f"s3 repository is not compatible based on group replication id: {group_replication_id} != {s3_id}"
)
return False
upload_content_to_s3(group_replication_id, s3_id_path, s3_parameters)
return True
4 changes: 4 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,10 @@ def _set_app_status(self) -> None:
if not primary_address:
return

if "s3-block-message" in self.app_peer_data:
self.app.status = BlockedStatus(self.app_peer_data["s3-block-message"])
return

# Set active status when primary is known
self.app.status = ActiveStatus()

Expand Down