diff --git a/images/warehousekeeper/warehousekeeper.py b/images/warehousekeeper/warehousekeeper.py index 2d627e2..1909ad7 100644 --- a/images/warehousekeeper/warehousekeeper.py +++ b/images/warehousekeeper/warehousekeeper.py @@ -3,7 +3,7 @@ import boto3 import click -from deltalake import DeltaTable +from deltalake import DeltaTable, WriterProperties from loguru import logger from pyspark.sql import SparkSession @@ -159,10 +159,32 @@ def vacuum( @cli.command(cls=BaseCommand) -def optimize(bucket_name: str, database_name_prefix: str): - """Run OPTIMIZE against all Delta tables in the given folder""" +@click.option( + "--compression-level", + type=click.INT, + help="The compression level to use", + required=False, + default=9, +) +@click.option( + "--compression-type", + type=click.STRING, + help="The compression type to use", + required=False, + default="ZSTD", +) +def optimize( + bucket_name: str, + database_name_prefix: str, + compression_type: str, + compression_level: int, +): + """Run OPTIMIZE against all Delta tables in the database""" - spark = spark_builder.getOrCreate() + wp = WriterProperties( + compression=compression_type, + compression_level=compression_level, + ) for dt in list_tables(bucket_name=bucket_name, prefix=database_name_prefix): logger.info( @@ -170,9 +192,8 @@ def optimize(bucket_name: str, database_name_prefix: str): table=dt.table_uri, metadata=dt.metadata(), ) - optimize_query = f"OPTIMIZE delta.`{dt.table_uri}`" - logger.info(optimize_query) - spark.sql(optimize_query).show(truncate=False) + metrics = dt.optimize.compact(writer_properties=wp) + logger.info(metrics) @cli.command(cls=BaseCommand)