Skip to content

Commit

Permalink
DPE-3547 mitigations for container restart (#377)
Browse files Browse the repository at this point in the history
* optimizations for container churn mitigation

* set floor for max_connections in 100
* function retries
* flush logs in single call
* + test coverage

* fix floor value

* switch order

* fix typo on constant

* using peek to get secret content
  • Loading branch information
paulomach authored Feb 15, 2024
1 parent 2194db9 commit e5b0f94
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 80 deletions.
91 changes: 53 additions & 38 deletions lib/charms/mysql/v0/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def wait_until_mysql_connection(self) -> None:

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 54
LIBPATCH = 55

UNIT_TEARDOWN_LOCKNAME = "unit-teardown"
UNIT_ADD_LOCKNAME = "unit-add"
Expand All @@ -122,6 +122,7 @@ def wait_until_mysql_connection(self) -> None:
BYTES_1MiB = 1048576 # 1 mebibyte
RECOVERY_CHECK_TIME = 10 # seconds
GET_MEMBER_STATE_TIME = 10 # seconds
MIN_MAX_CONNECTIONS = 100

SECRET_INTERNAL_LABEL = "secret-id"
SECRET_DELETED_LABEL = "None"
Expand Down Expand Up @@ -710,7 +711,7 @@ def render_mysqld_configuration(
innodb_buffer_pool_size = 20 * BYTES_1MiB
innodb_buffer_pool_chunk_size = 1 * BYTES_1MiB
group_replication_message_cache_size = 128 * BYTES_1MiB
max_connections = 20
max_connections = MIN_MAX_CONNECTIONS
performance_schema_instrument = "'memory/%=OFF'"
else:
available_memory = self.get_available_memory()
Expand All @@ -723,7 +724,7 @@ def render_mysqld_configuration(
innodb_buffer_pool_chunk_size,
group_replication_message_cache_size,
) = self.get_innodb_buffer_pool_parameters(available_memory)
max_connections = self.get_max_connections(available_memory)
max_connections = max(self.get_max_connections(available_memory), MIN_MAX_CONNECTIONS)
if available_memory < 2 * BYTES_1GiB:
# disable memory instruments if we have less than 2GiB of RAM
performance_schema_instrument = "'memory/%=OFF'"
Expand Down Expand Up @@ -1209,7 +1210,11 @@ def initialize_juju_units_operations_table(self) -> None:
raise MySQLInitializeJujuOperationsTableError(e.message)

def add_instance_to_cluster(
self, instance_address: str, instance_unit_label: str, from_instance: Optional[str] = None
self,
instance_address: str,
instance_unit_label: str,
from_instance: Optional[str] = None,
method: str = "auto",
) -> None:
"""Add an instance to the InnoDB cluster.
Expand All @@ -1223,6 +1228,7 @@ def add_instance_to_cluster(
instance_address: address of the instance to add to the cluster
instance_unit_label: the label/name of the unit
from_instance: address of the adding instance, e.g. primary
method: recovery method to use, either "auto" or "clone"
"""
options = {
"password": self.cluster_admin_password,
Expand All @@ -1243,39 +1249,37 @@ def add_instance_to_cluster(
"shell.options['dba.restartWaitTimeout'] = 3600",
)

for recovery_method in ["auto", "clone"]:
# Prefer "auto" recovery method, but if it fails, try "clone"
try:
options["recoveryMethod"] = recovery_method
add_instance_command = (
f"cluster.add_instance('{self.cluster_admin_user}@{instance_address}', {json.dumps(options)})",
)
# Prefer "auto" recovery method, but if it fails, try "clone"
try:
options["recoveryMethod"] = method
add_instance_command = (
f"cluster.add_instance('{self.cluster_admin_user}@{instance_address}', {options})",
)

logger.debug(
f"Adding instance {instance_address}/{instance_unit_label} to cluster {self.cluster_name} with recovery method {recovery_method}"
)
self._run_mysqlsh_script("\n".join(connect_commands + add_instance_command))
logger.info(
f"Adding instance {instance_address}/{instance_unit_label} to {self.cluster_name=}"
f"with recovery {method=}"
)
self._run_mysqlsh_script("\n".join(connect_commands + add_instance_command))

break
except MySQLClientError as e:
if recovery_method == "clone":
logger.exception(
f"Failed to add instance {instance_address} to cluster {self.cluster_name} on {self.instance_address}",
exc_info=e,
)
self._release_lock(
from_instance or self.instance_address,
instance_unit_label,
UNIT_ADD_LOCKNAME,
)
raise MySQLAddInstanceToClusterError(e.message)

logger.debug(
f"Failed to add instance {instance_address} to cluster {self.cluster_name} with recovery method 'auto'. Trying method 'clone'"
except MySQLClientError:
if method == "clone":
logger.exception(
f"Failed to add {instance_address=} to {self.cluster_name=} on {self.instance_address=}",
)
self._release_lock(
from_instance or self.instance_address, instance_unit_label, UNIT_ADD_LOCKNAME
)
raise MySQLAddInstanceToClusterError

logger.debug(
f"Cannot add {instance_address=} to {self.cluster_name=} with recovery {method=}. Trying method 'clone'"
)
self.add_instance_to_cluster(
instance_address, instance_unit_label, from_instance, method="clone"
)
finally:
# always release the lock
self._release_lock(
from_instance or self.instance_address, instance_unit_label, UNIT_ADD_LOCKNAME
)

def is_instance_configured_for_innodb(
self, instance_address: str, instance_unit_label: str
Expand Down Expand Up @@ -1398,6 +1402,11 @@ def is_instance_in_cluster(self, unit_label: str) -> bool:
)
return False

@retry(
wait=wait_fixed(2),
stop=stop_after_attempt(3),
retry=retry_if_exception_type(TimeoutError),
)
def get_cluster_status(self, extended: Optional[bool] = False) -> Optional[dict]:
"""Get the cluster status.
Expand Down Expand Up @@ -2505,13 +2514,19 @@ def get_pid_of_port_3306(self) -> Optional[str]:
except MySQLExecError:
return None

def flush_mysql_logs(self, logs_type: MySQLTextLogs) -> None:
def flush_mysql_logs(self, logs_type: Union[MySQLTextLogs, list[MySQLTextLogs]]) -> None:
"""Flushes the specified logs_type logs."""
flush_logs_commands = (
flush_logs_commands = [
f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')",
'session.run_sql("SET sql_log_bin = 0")',
f'session.run_sql("FLUSH {logs_type.value}")',
)
]

if type(logs_type) is list:
flush_logs_commands.extend(
[f"session.run_sql('FLUSH {log.value}')" for log in logs_type]
)
else:
flush_logs_commands.append(f'session.run_sql("FLUSH {logs_type.value}")') # type: ignore

try:
self._run_mysqlsh_script("\n".join(flush_logs_commands))
Expand Down
11 changes: 7 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ authors = []

[tool.poetry.dependencies]
python = "^3.10"
ops = "^2.5.0"
ops = "^2.7.0"
lightkube = "^0.14.0"
tenacity = "^8.2.2"
boto3 = "^1.28.22"
Expand Down
3 changes: 1 addition & 2 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,7 @@ def _open_ports(self) -> None:
"""
if ops.JujuVersion.from_environ().supports_open_port_on_k8s:
try:
self.unit.open_port("tcp", 3306)
self.unit.open_port("tcp", 33060)
self.unit.set_ports(3306, 33060)
except ops.ModelError:
logger.exception("failed to open port")

Expand Down
11 changes: 7 additions & 4 deletions src/k8s_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@

import logging
import socket
import typing
from typing import Dict, List, Optional, Tuple

from lightkube import Client
from lightkube.core.client import Client
from lightkube.core.exceptions import ApiError
from lightkube.models.core_v1 import ServicePort, ServiceSpec
from lightkube.models.meta_v1 import ObjectMeta
from lightkube.resources.apps_v1 import StatefulSet
from lightkube.resources.core_v1 import Node, Pod, Service
from ops.charm import CharmBase
from tenacity import retry, stop_after_attempt, wait_fixed

from utils import any_memory_to_bytes
Expand All @@ -24,6 +24,9 @@
logging.getLogger("httpcore").setLevel(logging.ERROR)
logging.getLogger("httpx").setLevel(logging.ERROR)

if typing.TYPE_CHECKING:
from charm import MySQLOperatorCharm


class KubernetesClientError(Exception):
"""Exception raised when client can't execute."""
Expand All @@ -32,7 +35,7 @@ class KubernetesClientError(Exception):
class KubernetesHelpers:
"""Kubernetes helpers for service exposure."""

def __init__(self, charm: CharmBase):
def __init__(self, charm: "MySQLOperatorCharm"):
"""Initialize Kubernetes helpers.
Args:
Expand All @@ -42,7 +45,7 @@ def __init__(self, charm: CharmBase):
self.namespace = charm.model.name
self.app_name = charm.model.app.name
self.cluster_name = charm.app_peer_data.get("cluster-name")
self.client = Client()
self.client = Client() # type: ignore

def create_endpoint_services(self, roles: List[str]) -> None:
"""Create kubernetes service for endpoints.
Expand Down
27 changes: 16 additions & 11 deletions src/mysql_k8s_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
MySQLStopMySQLDError,
)
from ops.model import Container
from ops.pebble import ChangeError, ExecError
from ops.pebble import ChangeError, ExecError, PathError
from tenacity import (
retry,
retry_if_exception_type,
Expand Down Expand Up @@ -195,14 +195,13 @@ def fix_data_dir(self, container: Container) -> None:
logger.debug(f"Changing ownership to {MYSQL_SYSTEM_USER}:{MYSQL_SYSTEM_GROUP}")
try:
container.exec(
f"chown -R {MYSQL_SYSTEM_USER}:{MYSQL_SYSTEM_GROUP} {MYSQL_DATA_DIR}".split(
" "
)
["chown", "-R", f"{MYSQL_SYSTEM_USER}:{MYSQL_SYSTEM_GROUP}", MYSQL_DATA_DIR]
)
except ExecError as e:
logger.error(f"Exited with code {e.exit_code}. Stderr:\n{e.stderr}")
raise MySQLInitialiseMySQLDError(e.stderr or "")

@retry(reraise=True, stop=stop_after_delay(30), wait=wait_fixed(5))
def initialise_mysqld(self) -> None:
"""Execute instance first run.
Expand All @@ -217,13 +216,11 @@ def initialise_mysqld(self) -> None:
user=MYSQL_SYSTEM_USER,
group=MYSQL_SYSTEM_GROUP,
)
process.wait_output()
except ExecError as e:
logger.error("Exited with code %d. Stderr:", e.exit_code)
if e.stderr:
for line in e.stderr.splitlines():
logger.error(" %s", line)
raise MySQLInitialiseMySQLDError(e.stderr if e.stderr else "")
process.wait()
except (ExecError, ChangeError, PathError, TimeoutError):
logger.exception("Failed to initialise MySQL data directory")
self.reset_data_dir()
raise MySQLInitialiseMySQLDError

@retry(reraise=True, stop=stop_after_delay(30), wait=wait_fixed(5))
def wait_until_mysql_connection(self, check_port: bool = True) -> None:
Expand Down Expand Up @@ -756,6 +753,14 @@ def remove_file(self, path: str) -> None:
if self.container.exists(path):
self.container.remove_path(path)

def reset_data_dir(self) -> None:
"""Remove all files from the data directory."""
content = self.container.list_files(MYSQL_DATA_DIR)
content_set = {item.name for item in content}
logger.debug("Resetting MySQL data directory.")
for item in content_set:
self.container.remove_path(f"{MYSQL_DATA_DIR}/{item}", recursive=True)

def check_if_mysqld_process_stopped(self) -> bool:
"""Checks if the mysqld process is stopped on the container."""
command = ["ps", "-eo", "comm,stat"]
Expand Down
12 changes: 5 additions & 7 deletions src/rotate_mysql_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import typing

from charms.mysql.v0.mysql import MySQLExecError, MySQLTextLogs
from charms.mysql.v0.mysql import MySQLClientError, MySQLExecError, MySQLTextLogs
from ops.charm import CharmEvents
from ops.framework import EventBase, EventSource, Object

Expand Down Expand Up @@ -48,10 +48,8 @@ def _rotate_mysql_logs(self, _) -> None:

try:
self.charm._mysql._execute_commands(["logrotate", "-f", LOG_ROTATE_CONFIG_FILE])
self.charm._mysql.flush_mysql_logs(list(MySQLTextLogs))
except MySQLExecError:
logger.exception("Failed to rotate mysql logs")
return

self.charm._mysql.flush_mysql_logs(MySQLTextLogs.ERROR)
self.charm._mysql.flush_mysql_logs(MySQLTextLogs.GENERAL)
self.charm._mysql.flush_mysql_logs(MySQLTextLogs.SLOW)
logger.warning("Failed to rotate MySQL logs")
except MySQLClientError:
logger.warning("Failed to flush MySQL logs")
2 changes: 1 addition & 1 deletion tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ async def test_custom_variables(ops_test: OpsTest) -> None:
application = ops_test.model.applications[APP_NAME]

custom_vars = {}
custom_vars["max_connections"] = 20
custom_vars["max_connections"] = 100
custom_vars["innodb_buffer_pool_size"] = 20971520
custom_vars["innodb_buffer_pool_chunk_size"] = 1048576
custom_vars["group_replication_message_cache_size"] = 134217728
Expand Down
Loading

0 comments on commit e5b0f94

Please sign in to comment.