Skip to content

Commit

Permalink
Merge branch 'main' into datetimez
Browse files Browse the repository at this point in the history
  • Loading branch information
jcjones authored Feb 29, 2024
2 parents 3f7d2ac + 552118a commit e428d0f
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 74 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ jobs:
run: |
python -m pylint -E partitionmanager
- name: Checking format with Ruff
run: |
python -m ruff format .
- name: Lint Python code with Ruff
run: |
python -m ruff check --output-format=github
- name: Checking format with Ruff
run: |
python -m ruff format --check .
- name: Checking pyproject
run: |
validate-pyproject pyproject.toml
Expand Down
11 changes: 9 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ repos:
- id: requirements-txt-fixer
- id: trailing-whitespace

- repo: https://github.com/codespell-project/codespell
rev: v2.2.6
hooks:
- id: codespell
additional_dependencies:
- tomli

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.2.2
rev: v0.3.0
hooks:
- id: ruff
# - id: ruff-format
- id: ruff-format

- repo: https://github.com/PyCQA/pylint
rev: v3.1.0
Expand Down
4 changes: 2 additions & 2 deletions partitionmanager/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ def do_stats(conf, metrics=partitionmanager.stats.PrometheusMetrics()):
)
metrics.describe(
"age_of_retained_partitions",
help_text="The age in seconds of the first partition for the table, indicating the "
"retention of data in the table.",
help_text="The age in seconds of the first partition for the table, "
"indicating the retention of data in the table.",
type_name="gauge",
)
metrics.describe(
Expand Down
48 changes: 24 additions & 24 deletions partitionmanager/cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,10 @@ def test_partition_period_seven_days(self):
[
"INFO:partition:Evaluating Table partitioned_last_week "
"(duration=7 days, 0:00:00)",
"DEBUG:partition:Table partitioned_last_week has no pending SQL updates.",
"DEBUG:partition:Table partitioned_last_week has no pending SQL updates.", # noqa: E501
"INFO:partition:Evaluating Table partitioned_yesterday "
"(duration=7 days, 0:00:00)",
"DEBUG:partition:Table partitioned_yesterday has no pending SQL updates.",
"DEBUG:partition:Table partitioned_yesterday has no pending SQL updates.", # noqa: E501
]
),
)
Expand Down Expand Up @@ -494,26 +494,26 @@ def test_migrate_cmd_in(self):
"partitioned_yesterday": [
"DROP TABLE IF EXISTS partitioned_yesterday_new_20210421;",
"CREATE TABLE partitioned_yesterday_new_20210421 "
+ "LIKE partitioned_yesterday;",
"LIKE partitioned_yesterday;",
"ALTER TABLE partitioned_yesterday_new_20210421 "
+ "REMOVE PARTITIONING;",
"REMOVE PARTITIONING;",
"ALTER TABLE partitioned_yesterday_new_20210421 "
+ "PARTITION BY RANGE (id) (",
"PARTITION BY RANGE (id) (",
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
");",
"ALTER TABLE `partitioned_yesterday_new_20210421` WAIT 6 "
+ "REORGANIZE PARTITION `p_assumed` INTO (PARTITION "
+ "`p_20210421` VALUES LESS THAN (150), PARTITION "
+ "`p_20210521` VALUES LESS THAN (300), PARTITION "
+ "`p_20210620` VALUES LESS THAN MAXVALUE);",
"REORGANIZE PARTITION `p_assumed` INTO (PARTITION "
"`p_20210421` VALUES LESS THAN (150), PARTITION "
"`p_20210521` VALUES LESS THAN (300), PARTITION "
"`p_20210620` VALUES LESS THAN MAXVALUE);",
"CREATE OR REPLACE TRIGGER copy_inserts_from_"
+ "partitioned_yesterday_to_partitioned_yesterday",
"partitioned_yesterday_to_partitioned_yesterday",
"\tAFTER INSERT ON partitioned_yesterday FOR EACH ROW",
"\t\tINSERT INTO partitioned_yesterday_new_20210421 SET",
"\t\t\t`id` = NEW.`id`,",
"\t\t\t`serial` = NEW.`serial`;",
"CREATE OR REPLACE TRIGGER copy_updates_from_"
+ "partitioned_yesterday_to_partitioned_yesterday",
"partitioned_yesterday_to_partitioned_yesterday",
"\tAFTER UPDATE ON partitioned_yesterday FOR EACH ROW",
"\t\tUPDATE partitioned_yesterday_new_20210421 SET",
"\t\t\t`serial` = NEW.`serial`",
Expand All @@ -527,16 +527,16 @@ def test_migrate_cmd_in(self):
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
");",
"ALTER TABLE `two_new_20210421` WAIT 6 REORGANIZE PARTITION "
+ "`p_assumed` INTO (PARTITION `p_20210421` VALUES "
+ "LESS THAN (150), PARTITION `p_20210521` VALUES LESS "
+ "THAN (375), PARTITION `p_20210620` VALUES LESS THAN "
+ "MAXVALUE);",
"CREATE OR REPLACE TRIGGER copy_inserts_from_two_to_two_new_20210421",
"`p_assumed` INTO (PARTITION `p_20210421` VALUES "
"LESS THAN (150), PARTITION `p_20210521` VALUES LESS "
"THAN (375), PARTITION `p_20210620` VALUES LESS THAN "
"MAXVALUE);",
"CREATE OR REPLACE TRIGGER copy_inserts_from_two_to_two_new_20210421", # noqa: E501
"\tAFTER INSERT ON two FOR EACH ROW",
"\t\tINSERT INTO two_new_20210421 SET",
"\t\t\t`id` = NEW.`id`,",
"\t\t\t`serial` = NEW.`serial`;",
"CREATE OR REPLACE TRIGGER copy_updates_from_two_to_two_new_20210421",
"CREATE OR REPLACE TRIGGER copy_updates_from_two_to_two_new_20210421", # noqa: E501
"\tAFTER UPDATE ON two FOR EACH ROW",
"\t\tUPDATE two_new_20210421 SET",
"\t\t\t`serial` = NEW.`serial`",
Expand Down Expand Up @@ -583,22 +583,22 @@ def test_migrate_cmd_in_unpartitioned_with_override(self):
"DROP TABLE IF EXISTS unpartitioned_new_20210421;",
"CREATE TABLE unpartitioned_new_20210421 LIKE unpartitioned;",
"ALTER TABLE unpartitioned_new_20210421 REMOVE PARTITIONING;",
"ALTER TABLE unpartitioned_new_20210421 PARTITION BY RANGE (id) (",
"ALTER TABLE unpartitioned_new_20210421 PARTITION BY RANGE (id) (", # noqa: E501
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
");",
"ALTER TABLE `unpartitioned_new_20210421` WAIT 6 REORGANIZE "
+ "PARTITION `p_assumed` INTO (PARTITION `p_20210421` "
+ "VALUES LESS THAN (150), PARTITION `p_20210521` VALUES "
+ "LESS THAN (300), PARTITION `p_20210620` VALUES LESS "
+ "THAN MAXVALUE);",
"PARTITION `p_assumed` INTO (PARTITION `p_20210421` "
"VALUES LESS THAN (150), PARTITION `p_20210521` VALUES "
"LESS THAN (300), PARTITION `p_20210620` VALUES LESS "
"THAN MAXVALUE);",
"CREATE OR REPLACE TRIGGER copy_inserts_from_"
+ "unpartitioned_to_unpartitioned_new_20210421",
"unpartitioned_to_unpartitioned_new_20210421",
"\tAFTER INSERT ON unpartitioned FOR EACH ROW",
"\t\tINSERT INTO unpartitioned_new_20210421 SET",
"\t\t\t`id` = NEW.`id`,",
"\t\t\t`serial` = NEW.`serial`;",
"CREATE OR REPLACE TRIGGER copy_updates_from_"
+ "unpartitioned_to_unpartitioned_new_20210421",
"unpartitioned_to_unpartitioned_new_20210421",
"\tAFTER UPDATE ON unpartitioned FOR EACH ROW",
"\t\tUPDATE unpartitioned_new_20210421 SET",
"\t\t\t`serial` = NEW.`serial`",
Expand Down
5 changes: 3 additions & 2 deletions partitionmanager/dropper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def _drop_statement(table, partition_list):

partitions = ",".join(map(lambda x: f"`{x.name}`", partition_list))

alter_cmd = f"ALTER TABLE `{table.name}` " f"DROP PARTITION IF EXISTS {partitions};"
alter_cmd = f"ALTER TABLE `{table.name}` DROP PARTITION IF EXISTS {partitions};"

log.debug("Yielding %s", alter_cmd)

Expand Down Expand Up @@ -92,7 +92,8 @@ def get_droppable_partitions(
droppable.append(partition)
except partitionmanager.types.NoExactTimeException:
log.warning(
"Couldn't determine exact times for %s.%s, it is probably droppable too.",
"Couldn't determine exact times for %s.%s, it is probably droppable "
"too.",
table,
partition,
)
Expand Down
4 changes: 2 additions & 2 deletions partitionmanager/dropper_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,6 @@ def test_get_droppable_partitions_no_exact_times(caplog):
database, partitions, current_position, current_timestamp, table
)
assert (
"Couldn't determine exact times for Table burgers.1: (100), it is probably droppable too."
in caplog.messages
"Couldn't determine exact times for Table burgers.1: (100), it is probably "
"droppable too." in caplog.messages
)
10 changes: 7 additions & 3 deletions partitionmanager/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ def _generate_sql_copy_commands(
yield f"DROP TABLE IF EXISTS {new_table.name};"
yield f"CREATE TABLE {new_table.name} LIKE {existing_table.name};"
yield f"ALTER TABLE {new_table.name} REMOVE PARTITIONING;"
yield f"ALTER TABLE {new_table.name} PARTITION BY {range_cols_string} ({range_id_string}) ("
yield (
f"ALTER TABLE {new_table.name} PARTITION BY {range_cols_string} "
f"({range_id_string}) ("
)
yield f"\tPARTITION {max_val_part.name} VALUES LESS THAN {max_val_string}"
yield ");"

Expand Down Expand Up @@ -275,8 +278,9 @@ def calculate_sql_alters_from_state_info(conf, in_fp):
raise Exception("Unexpected part?")

log.info(
f"{table}, {time_delta:0.1f} hours, {ordered_prior_pos} - {ordered_current_pos}, "
f"{delta_positions} pos_change, {rate_of_change}/hour"
f"{table}, {time_delta:0.1f} hours, {ordered_prior_pos} - "
f"{ordered_current_pos}, {delta_positions} pos_change, "
f"{rate_of_change}/hour"
)

part_duration = conf.partition_period
Expand Down
24 changes: 12 additions & 12 deletions partitionmanager/migrate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,16 @@ def test_read_state_info(self):
"ALTER TABLE test_new_20210303 PARTITION BY RANGE (id) (",
"\tPARTITION p_start VALUES LESS THAN MAXVALUE",
");",
"ALTER TABLE `test_new_20210303` WAIT 6 REORGANIZE PARTITION `p_start` "
+ "INTO (PARTITION `p_20210303` VALUES LESS THAN (156), "
+ "PARTITION `p_20210402` VALUES LESS THAN (2406), PARTITION "
+ "`p_20210502` VALUES LESS THAN MAXVALUE);",
"CREATE OR REPLACE TRIGGER copy_inserts_from_test_to_test_new_20210303",
"ALTER TABLE `test_new_20210303` WAIT 6 REORGANIZE PARTITION `p_start` " # noqa: E501
"INTO (PARTITION `p_20210303` VALUES LESS THAN (156), "
"PARTITION `p_20210402` VALUES LESS THAN (2406), PARTITION "
"`p_20210502` VALUES LESS THAN MAXVALUE);",
"CREATE OR REPLACE TRIGGER copy_inserts_from_test_to_test_new_20210303", # noqa: E501
"\tAFTER INSERT ON test FOR EACH ROW",
"\t\tINSERT INTO test_new_20210303 SET",
"\t\t\t`id` = NEW.`id`,",
"\t\t\t`serial` = NEW.`serial`;",
"CREATE OR REPLACE TRIGGER copy_updates_from_test_to_test_new_20210303",
"CREATE OR REPLACE TRIGGER copy_updates_from_test_to_test_new_20210303", # noqa: E501
"\tAFTER UPDATE ON test FOR EACH ROW",
"\t\tUPDATE test_new_20210303 SET",
"\t\t\t`serial` = NEW.`serial`",
Expand Down Expand Up @@ -184,16 +184,16 @@ def test_read_state_info_map_table(self):
"CREATE TABLE map_table_new_20210303 LIKE map_table;",
"ALTER TABLE map_table_new_20210303 REMOVE PARTITIONING;",
"ALTER TABLE map_table_new_20210303 PARTITION BY RANGE "
+ "COLUMNS (orderID, authzID) (",
"COLUMNS (orderID, authzID) (",
"\tPARTITION p_assumed VALUES LESS THAN (MAXVALUE, MAXVALUE)",
");",
"ALTER TABLE `map_table_new_20210303` WAIT 6 REORGANIZE PARTITION "
+ "`p_assumed` INTO (PARTITION `p_20210303` VALUES LESS THAN "
+ "(11, 22), PARTITION `p_20210402` VALUES LESS THAN "
+ "(41, 82), PARTITION `p_20210502` VALUES LESS THAN "
+ "(MAXVALUE, MAXVALUE));",
"`p_assumed` INTO (PARTITION `p_20210303` VALUES LESS THAN "
"(11, 22), PARTITION `p_20210402` VALUES LESS THAN "
"(41, 82), PARTITION `p_20210502` VALUES LESS THAN "
"(MAXVALUE, MAXVALUE));",
"CREATE OR REPLACE TRIGGER copy_inserts_from_map_table_"
+ "to_map_table_new_20210303",
"to_map_table_new_20210303",
"\tAFTER INSERT ON map_table FOR EACH ROW",
"\t\tINSERT INTO map_table_new_20210303 SET",
"\t\t\t`authzID` = NEW.`authzID`,",
Expand Down
4 changes: 2 additions & 2 deletions partitionmanager/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def get_statistics(partitions, current_timestamp, table):
if not partitionmanager.types.is_partition_type(p):
log.warning(
f"{table} get_statistics called with a partition list "
+ f"that included a non-Partition entry: {p}"
f"that included a non-Partition entry: {p}"
)
raise partitionmanager.types.UnexpectedPartitionException(p)

Expand All @@ -78,7 +78,7 @@ def get_statistics(partitions, current_timestamp, table):
if not isinstance(tail_part, partitionmanager.types.MaxValuePartition):
log.warning(
f"{table} get_statistics called with a partition list tail "
+ f"that wasn't a MaxValuePartition: {tail_part}"
f"that wasn't a MaxValuePartition: {tail_part}"
)
raise partitionmanager.types.UnexpectedPartitionException(tail_part)

Expand Down
27 changes: 16 additions & 11 deletions partitionmanager/table_append_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_table_compatibility_problems(database, table):

sql_cmd = (
"SELECT CREATE_OPTIONS FROM INFORMATION_SCHEMA.TABLES "
+ f"WHERE TABLE_SCHEMA='{db_name}' and TABLE_NAME='{table.name}';"
f"WHERE TABLE_SCHEMA='{db_name}' and TABLE_NAME='{table.name}';"
).strip()
return _get_table_information_schema_problems(database.run(sql_cmd), table.name)

Expand Down Expand Up @@ -84,8 +84,8 @@ def _parse_partition_map(rows):
The "range_cols" is the ordered list of what columns are used as the
range identifiers for the partitions.
The "partitions" is a list of the Partition objects representing each
defined partition. There will be at least one partitionmanager.types.MaxValuePartition.
The "partitions" is a list of the Partition objects representing each defined
partition. There will be at least one partitionmanager.types.MaxValuePartition.
"""
log = logging.getLogger("parse_partition_map")

Expand Down Expand Up @@ -128,7 +128,8 @@ def _parse_partition_map(rows):

if len(part_vals) != len(range_cols):
log.error(
f"Partition columns {part_vals} don't match the partition range {range_cols}"
f"Partition columns {part_vals} don't match the partition range "
f"{range_cols}"
)
raise partitionmanager.types.MismatchedIdException(
"Partition columns mismatch"
Expand Down Expand Up @@ -241,7 +242,8 @@ def _get_position_increase_per_day(p1, p2):
return list()
if p1.timestamp() >= p2.timestamp():
log.warning(
f"Skipping rate of change between p1 {p1} and p2 {p2} as they are out-of-order"
f"Skipping rate of change between p1 {p1} and p2 {p2} as they are "
"out-of-order"
)
return list()

Expand Down Expand Up @@ -597,7 +599,7 @@ def _plan_partition_changes(
continue

log.debug(
f"{partition} has a conflict for its timestamp, increasing by 1 day."
f"{partition} has a conflict for its timestamp, increasing by 1 day"
)
partition.set_timestamp(partition.timestamp() + timedelta(days=1))
conflict_found = True
Expand Down Expand Up @@ -626,9 +628,12 @@ def _should_run_changes(table, altered_partitions):
log.debug(f"{p} is new")
return True

if isinstance(p, partitionmanager.types.ChangePlannedPartition) and p.important():
log.debug(f"{p} is marked important")
return True
if (
isinstance(p, partitionmanager.types.ChangePlannedPartition)
and p.important()
):
log.debug(f"{p} is marked important")
return True
return False


Expand Down Expand Up @@ -693,8 +698,8 @@ def generate_sql_reorganize_partition_commands(table, changes):
partition_update = ", ".join(partition_strings)

alter_cmd = (
f"ALTER TABLE `{table.name}` WAIT 6 "
f"REORGANIZE PARTITION `{modified_partition.old.name}` INTO ({partition_update});"
f"ALTER TABLE `{table.name}` WAIT 6 REORGANIZE "
f"PARTITION `{modified_partition.old.name}` INTO ({partition_update});"
)

log.debug(f"Yielding {alter_cmd}")
Expand Down
Loading

0 comments on commit e428d0f

Please sign in to comment.