Skip to content

Commit

Permalink
Reduce decompressions for compressed UPDATE/DELETE
Browse files Browse the repository at this point in the history
Only decompress batches for compressed UPDATE/DELETE when the batch
actually has tuples that match the query constraints. This will
work even for columns we have no metadata on.
  • Loading branch information
svenklemm committed Jul 5, 2024
1 parent c10fae7 commit 1e04331
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 27 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7101
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7101 Reduce decompressions for compressed UPDATE/DELETE
13 changes: 13 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify = true;
TSDLLEXPORT int ts_guc_cagg_max_individual_materializations = 10;
bool ts_guc_enable_osm_reads = true;
TSDLLEXPORT bool ts_guc_enable_dml_decompression = true;
TSDLLEXPORT bool ts_guc_enable_dml_decompression_tuple_filtering = true;
TSDLLEXPORT int ts_guc_max_tuples_decompressed_per_dml = 100000;
TSDLLEXPORT bool ts_guc_enable_transparent_decompression = true;
TSDLLEXPORT bool ts_guc_enable_compression_wal_markers = false;
Expand Down Expand Up @@ -437,6 +438,18 @@ _guc_init(void)
NULL,
NULL);

DefineCustomBoolVariable(MAKE_EXTOPTION("enable_dml_decompression_tuple_filtering"),
"Enable DML decompression tuple filtering",
"Recheck tuples during DML decompression to only decompress batches "
"with matching tuples",
&ts_guc_enable_dml_decompression_tuple_filtering,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomIntVariable(MAKE_EXTOPTION("max_tuples_decompressed_per_dml_transaction"),
"The max number of tuples that can be decompressed during an "
"INSERT, UPDATE, or DELETE.",
Expand Down
1 change: 1 addition & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ extern bool ts_guc_enable_now_constify;
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
extern bool ts_guc_enable_osm_reads;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression_tuple_filtering;
extern TSDLLEXPORT int ts_guc_max_tuples_decompressed_per_dml;
extern TSDLLEXPORT bool ts_guc_enable_transparent_decompression;
extern TSDLLEXPORT bool ts_guc_enable_compression_wal_markers;
Expand Down
69 changes: 56 additions & 13 deletions tsl/src/compression/compression_dml.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,

static bool batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys);
static void process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates,
ScanKeyData **mem_scankeys, int *num_mem_scankeys,
List **heap_filters, List **index_filters, List **is_null);
static Relation find_matching_index(Relation comp_chunk_rel, List **index_filters,
List **heap_filters);
Expand Down Expand Up @@ -89,13 +90,17 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
struct decompress_batches_stats stats;

/* the scan keys used for in memory tests of the decompressed tuples */
int num_mem_scankeys;
ScanKeyData *mem_scankeys = build_scankeys_for_uncompressed(cis->hypertable_relid,
settings,
out_rel,
key_columns,
slot,
&num_mem_scankeys);
int num_mem_scankeys = 0;
ScanKeyData *mem_scankeys = NULL;
if (ts_guc_enable_dml_decompression_tuple_filtering)
{
mem_scankeys = build_mem_scankeys_from_slot(cis->hypertable_relid,
settings,
out_rel,
key_columns,
slot,
&num_mem_scankeys);
}

int num_index_scankeys;
Relation index_rel = NULL;
Expand Down Expand Up @@ -211,11 +216,20 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
ScanKeyData *index_scankeys = NULL;
int num_index_scankeys = 0;
struct decompress_batches_stats stats;
int num_mem_scankeys = 0;
ScanKeyData *mem_scankeys = NULL;

comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
CompressionSettings *settings = ts_compression_settings_get(comp_chunk->table_id);

process_predicates(chunk, settings, predicates, &heap_filters, &index_filters, &is_null);
process_predicates(chunk,
settings,
predicates,
&mem_scankeys,
&num_mem_scankeys,
&heap_filters,
&index_filters,
&is_null);

chunk_rel = table_open(chunk->table_id, RowExclusiveLock);
comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock);
Expand Down Expand Up @@ -244,8 +258,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
num_index_scankeys,
scankeys,
num_scankeys,
NULL,
0,
mem_scankeys,
num_mem_scankeys,
null_columns,
is_null);
/* close the selected index */
Expand All @@ -258,8 +272,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
GetTransactionSnapshot(),
scankeys,
num_scankeys,
NULL,
0,
mem_scankeys,
num_mem_scankeys,
null_columns,
is_null);
}
Expand Down Expand Up @@ -390,6 +404,12 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
decompressor.compressed_datums,
decompressor.compressed_is_nulls);

if (num_mem_scankeys && !batch_matches(&decompressor, mem_scankeys, num_mem_scankeys))
{
row_decompressor_reset(&decompressor);
continue;
}

write_logical_replication_msg_decompression_start();
result = delete_compressed_tuple(&decompressor, snapshot, compressed_tuple);
/* skip reporting error if isolation level is < Repeatable Read
Expand Down Expand Up @@ -791,10 +811,17 @@ compressed_insert_key_columns(Relation relation)
* filters are put into heap_filters list.
*/
static void
process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, List **heap_filters,
process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates,
ScanKeyData **mem_scankeys, int *num_mem_scankeys, List **heap_filters,
List **index_filters, List **is_null)
{
ListCell *lc;
if (ts_guc_enable_dml_decompression_tuple_filtering)
{
*mem_scankeys = palloc0(sizeof(ScanKeyData) * list_length(predicates));
}
*num_mem_scankeys = 0;

/*
* We dont want to forward boundParams from the execution state here
* as we dont want to constify join params in the predicates.
Expand Down Expand Up @@ -837,6 +864,7 @@ process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, L
column_name = get_attname(ch->table_id, var->varattno, false);
TypeCacheEntry *tce = lookup_type_cache(var->vartype, TYPECACHE_BTREE_OPFAMILY);
int op_strategy = get_op_opfamily_strategy(opno, tce->btree_opf);

if (ts_array_is_member(settings->fd.segmentby, column_name))
{
switch (op_strategy)
Expand Down Expand Up @@ -864,6 +892,21 @@ process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, L
continue;
}

/*
* Segmentby columns are checked as part of batch scan so no need to redo the check.
*/
if (ts_guc_enable_dml_decompression_tuple_filtering)
{
ScanKeyEntryInitialize(&(*mem_scankeys)[(*num_mem_scankeys)++],
arg_value->constisnull ? SK_ISNULL : 0,
var->varattno,
op_strategy,
arg_value->consttype,
arg_value->constcollid,
opcode,
arg_value->constisnull ? 0 : arg_value->constvalue);
}

int min_attno = compressed_column_metadata_attno(settings,
ch->table_id,
var->varattno,
Expand Down
10 changes: 3 additions & 7 deletions tsl/src/compression/compression_dml.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

#include "ts_catalog/compression_settings.h"

ScanKeyData *build_scankeys_for_uncompressed(Oid ht_relid, CompressionSettings *settings,
Relation out_rel, Bitmapset *key_columns,
TupleTableSlot *slot, int *num_scankeys);
ScanKeyData *build_mem_scankeys_from_slot(Oid ht_relid, CompressionSettings *settings,
Relation out_rel, Bitmapset *key_columns,
TupleTableSlot *slot, int *num_scankeys);
ScanKeyData *build_index_scankeys(Relation index_rel, List *index_filters, int *num_scankeys);
ScanKeyData *build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel,
Relation out_rel, Bitmapset *key_columns,
Expand All @@ -23,7 +23,3 @@ ScanKeyData *build_heap_scankeys(Oid hypertable_relid, Relation in_rel, Relation
Bitmapset **null_columns, TupleTableSlot *slot, int *num_scankeys);
ScanKeyData *build_update_delete_scankeys(Relation in_rel, List *heap_filters, int *num_scankeys,
Bitmapset **null_columns);
int create_segment_filter_scankey(Relation in_rel, char *segment_filter_col_name,
StrategyNumber strategy, Oid subtype, ScanKeyData *scankeys,
int num_scankeys, Bitmapset **null_columns, Datum value,
bool is_null_check, bool is_array_op);
16 changes: 11 additions & 5 deletions tsl/src/compression/compression_scankey.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@
#include "ts_catalog/array_utils.h"

static Oid deduce_filter_subtype(BatchFilter *filter, Oid att_typoid);
static int create_segment_filter_scankey(Relation in_rel, char *segment_filter_col_name,
StrategyNumber strategy, Oid subtype,
ScanKeyData *scankeys, int num_scankeys,
Bitmapset **null_columns, Datum value, bool is_null_check,
bool is_array_op);

/*
* Build scankeys for decompressed tuple to check if it is part of the batch.
*
* The key_columns are the columns of the uncompressed chunk.
*/
ScanKeyData *
build_scankeys_for_uncompressed(Oid ht_relid, CompressionSettings *settings, Relation out_rel,
Bitmapset *key_columns, TupleTableSlot *slot, int *num_scankeys)
build_mem_scankeys_from_slot(Oid ht_relid, CompressionSettings *settings, Relation out_rel,
Bitmapset *key_columns, TupleTableSlot *slot, int *num_scankeys)
{
ScanKeyData *scankeys = NULL;
int key_index = 0;
TupleDesc out_desc = RelationGetDescr(out_rel);
TupleDesc in_desc = slot->tts_tupleDescriptor;

if (bms_is_empty(key_columns))
{
Expand Down Expand Up @@ -93,8 +99,8 @@ build_scankeys_for_uncompressed(Oid ht_relid, CompressionSettings *settings, Rel
isnull ? SK_ISNULL | SK_SEARCHNULL : 0,
attno,
BTEqualStrategyNumber,
InvalidOid,
out_desc->attrs[AttrNumberGetAttrOffset(attno)].attcollation,
in_desc->attrs[AttrNumberGetAttrOffset(ht_attno)].atttypid,
in_desc->attrs[AttrNumberGetAttrOffset(ht_attno)].attcollation,
get_opcode(opr),
isnull ? 0 : value);
}
Expand Down Expand Up @@ -437,7 +443,7 @@ build_update_delete_scankeys(Relation in_rel, List *heap_filters, int *num_scank
return scankeys;
}

int
static int
create_segment_filter_scankey(Relation in_rel, char *segment_filter_col_name,
StrategyNumber strategy, Oid subtype, ScanKeyData *scankeys,
int num_scankeys, Bitmapset **null_columns, Datum value,
Expand Down
6 changes: 4 additions & 2 deletions tsl/test/expected/compression_update_delete.out
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ FROM compressed_chunk_info_view
WHERE hypertable_name = 'sample_table' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+------------------
9 | _hyper_1_1_chunk
1 | _hyper_1_1_chunk
9 | _hyper_1_2_chunk
(2 rows)

-- recompress the partial chunks
SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk');
NOTICE: chunk "_hyper_1_1_chunk" is already compressed
compress_chunk
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
Expand Down Expand Up @@ -173,12 +174,13 @@ FROM compressed_chunk_info_view
WHERE hypertable_name = 'sample_table' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+------------------
9 | _hyper_1_1_chunk
1 | _hyper_1_1_chunk
9 | _hyper_1_2_chunk
(2 rows)

-- recompress the paritial chunks
SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk');
NOTICE: chunk "_hyper_1_1_chunk" is already compressed
compress_chunk
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
Expand Down
Loading

0 comments on commit 1e04331

Please sign in to comment.