From d9f05398d1279c6c9eca78dcd00ee4a26adefd3c Mon Sep 17 00:00:00 2001 From: Keyur Panchal Date: Tue, 3 Sep 2024 10:40:53 -0400 Subject: [PATCH] Refactor decompress_batches_scan functions (#7207) The code paths for `decompress_batches_indexscan` and `decompress_batches_seqscan` are mostly duplicated. This unifies the code path by refactoring it into a single function to make it easier to maintain. Closes #7144. --- .unreleased/pr_7207 | 1 + tsl/src/compression/compression_dml.c | 450 ++++++++++---------------- 2 files changed, 176 insertions(+), 275 deletions(-) create mode 100644 .unreleased/pr_7207 diff --git a/.unreleased/pr_7207 b/.unreleased/pr_7207 new file mode 100644 index 00000000000..5147094a238 --- /dev/null +++ b/.unreleased/pr_7207 @@ -0,0 +1 @@ +Implements: #7207 Refactor decompress_batches_scan functions diff --git a/tsl/src/compression/compression_dml.c b/tsl/src/compression/compression_dml.c index 54fa0978f3d..2130c94f8a7 100644 --- a/tsl/src/compression/compression_dml.c +++ b/tsl/src/compression/compression_dml.c @@ -5,6 +5,8 @@ */ #include #include +#include +#include #include #include #include @@ -30,16 +32,12 @@ #include static struct decompress_batches_stats -decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_rel, - Snapshot snapshot, ScanKeyData *index_scankeys, int num_index_scankeys, - ScanKeyData *heap_scankeys, int num_heap_scankeys, - ScanKeyData *mem_scankeys, int num_mem_scankeys, - tuple_filtering_constraints *constraints, bool *skip_current_tuple, - bool delete_only, Bitmapset *null_columns, List *is_nulls); -static struct decompress_batches_stats decompress_batches_seqscan( - Relation in_rel, Relation out_rel, Snapshot snapshot, ScanKeyData *scankeys, int num_scankeys, - ScanKeyData *mem_scankeys, int num_mem_scankeys, tuple_filtering_constraints *constraints, - bool *skip_current_tuple, bool delete_only, Bitmapset *null_columns, List *is_nulls); +decompress_batches_scan(Relation in_rel, Relation out_rel, Relation index_rel, Snapshot snapshot, + ScanKeyData *index_scankeys, int num_index_scankeys, + ScanKeyData *heap_scankeys, int num_heap_scankeys, + ScanKeyData *mem_scankeys, int num_mem_scankeys, + tuple_filtering_constraints *constraints, bool *skip_current_tuple, + bool delete_only, Bitmapset *null_columns, List *is_nulls); static bool batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys, tuple_filtering_constraints *constraints, bool *skip_current_tuple); @@ -113,6 +111,9 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot) int num_index_scankeys = 0; ScanKeyData *index_scankeys = NULL; Relation index_rel = NULL; + ScanKeyData *heap_scankeys = NULL; + int num_heap_scankeys = 0; + Bitmapset *key_columns = constraints->key_columns; if (ts_guc_enable_dml_decompression_tuple_filtering) { @@ -140,66 +141,45 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot) * Prepare the heap scan keys for all * key columns not found in the index */ - Bitmapset *key_columns = bms_difference(constraints->key_columns, index_columns); + key_columns = bms_difference(constraints->key_columns, index_columns); + } - int num_heap_scankeys; - ScanKeyData *heap_scankeys = build_heap_scankeys(cis->hypertable_relid, - in_rel, - out_rel, - settings, - key_columns, - &null_columns, - slot, - &num_heap_scankeys); + heap_scankeys = build_heap_scankeys(cis->hypertable_relid, + in_rel, + out_rel, + settings, + key_columns, + &null_columns, + slot, + &num_heap_scankeys); - /* - * Using latest snapshot to scan the heap since we are doing this to build - * the index on the uncompressed chunks in order to do speculative insertion - * which is always built from all tuples (even in higher levels of isolation). - */ - stats = decompress_batches_indexscan(in_rel, - out_rel, - index_rel, - GetLatestSnapshot(), - index_scankeys, - num_index_scankeys, - heap_scankeys, - num_heap_scankeys, - mem_scankeys, - num_mem_scankeys, - constraints, - &skip_current_tuple, - false, - NULL, /* no null column check for non-segmentby - columns */ - NIL); + /* no null column check for non-segmentby columns in case of index scan */ + if (index_rel) + null_columns = NULL; + + /* + * Using latest snapshot to scan the heap since we are doing this to build + * the index on the uncompressed chunks in order to do speculative insertion + * which is always built from all tuples (even in higher levels of isolation). + */ + stats = decompress_batches_scan(in_rel, + out_rel, + index_rel, + GetLatestSnapshot(), + index_scankeys, + num_index_scankeys, + heap_scankeys, + num_heap_scankeys, + mem_scankeys, + num_mem_scankeys, + constraints, + &skip_current_tuple, + false, + null_columns, /* no null column check for non-segmentby + columns */ + NIL); + if (index_rel) index_close(index_rel, AccessShareLock); - } - else - { - int num_heap_scankeys; - ScanKeyData *heap_scankeys = build_heap_scankeys(cis->hypertable_relid, - in_rel, - out_rel, - settings, - constraints->key_columns, - &null_columns, - slot, - &num_heap_scankeys); - - stats = decompress_batches_seqscan(in_rel, - out_rel, - GetLatestSnapshot(), - heap_scankeys, - num_heap_scankeys, - mem_scankeys, - num_mem_scankeys, - constraints, - &skip_current_tuple, - false, - null_columns, - NIL); - } Assert(cis->cds != NULL); if (skip_current_tuple) @@ -280,44 +260,33 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu &num_scankeys, &null_columns); } + if (matching_index_rel) { index_scankeys = build_index_scankeys(matching_index_rel, index_filters, &num_index_scankeys); - stats = decompress_batches_indexscan(comp_chunk_rel, - chunk_rel, - matching_index_rel, - GetTransactionSnapshot(), - index_scankeys, - num_index_scankeys, - scankeys, - num_scankeys, - mem_scankeys, - num_mem_scankeys, - NULL, - NULL, - delete_only, - null_columns, - is_null); - /* close the selected index */ - index_close(matching_index_rel, AccessShareLock); - } - else - { - stats = decompress_batches_seqscan(comp_chunk_rel, - chunk_rel, - GetTransactionSnapshot(), - scankeys, - num_scankeys, - mem_scankeys, - num_mem_scankeys, - NULL, - NULL, - delete_only, - null_columns, - is_null); } + stats = decompress_batches_scan(comp_chunk_rel, + chunk_rel, + matching_index_rel, + GetTransactionSnapshot(), + index_scankeys, + num_index_scankeys, + scankeys, + num_scankeys, + mem_scankeys, + num_mem_scankeys, + NULL, + NULL, + delete_only, + null_columns, + is_null); + + /* close the selected index */ + if (matching_index_rel) + index_close(matching_index_rel, AccessShareLock); + /* * tuples from compressed chunk has been decompressed and moved * to staging area, thus mark this chunk as partially compressed @@ -346,44 +315,119 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu return stats.batches_decompressed > 0; } +typedef struct DecompressBatchScanData +{ + TableScanDesc scan; + IndexScanDesc index_scan; +} DecompressBatchScanData; + +typedef struct DecompressBatchScanData *DecompressBatchScanDesc; + +static DecompressBatchScanDesc +decompress_batch_beginscan(Relation in_rel, Relation index_rel, Snapshot snapshot, int num_scankeys, + ScanKeyData *scankeys) +{ + DecompressBatchScanDesc scan; + scan = (DecompressBatchScanDesc) palloc(sizeof(DecompressBatchScanData)); + + if (index_rel) + { + scan->index_scan = index_beginscan(in_rel, index_rel, snapshot, num_scankeys, 0); + index_rescan(scan->index_scan, scankeys, num_scankeys, NULL, 0); + scan->scan = NULL; + } + else + { + scan->scan = table_beginscan(in_rel, snapshot, num_scankeys, scankeys); + scan->index_scan = NULL; + } + + return scan; +} + +static bool +decompress_batch_scan_getnext_slot(DecompressBatchScanDesc scan, ScanDirection direction, + struct TupleTableSlot *slot) +{ + if (scan->index_scan) + { + return index_getnext_slot(scan->index_scan, direction, slot); + } + else + { + return table_scan_getnextslot(scan->scan, direction, slot); + } +} + +static void +decompress_batch_endscan(DecompressBatchScanDesc scan) +{ + if (scan->index_scan) + { + index_endscan(scan->index_scan); + } + else + { + table_endscan(scan->scan); + } + + pfree(scan); +} + /* * This method will: - * 1.Scan the index created with SEGMENT BY columns. + * 1.Scan the index created with SEGMENT BY columns or the entire compressed chunk * 2.Fetch matching rows and decompress the row - * 3.insert decompressed rows to uncompressed chunk - * 4.delete this row from compressed chunk + * 3.Delete this row from compressed chunk + * 4.Insert decompressed rows to uncompressed chunk * * Returns whether we decompressed anything. + * */ static struct decompress_batches_stats -decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_rel, - Snapshot snapshot, ScanKeyData *index_scankeys, int num_index_scankeys, - ScanKeyData *heap_scankeys, int num_heap_scankeys, - ScanKeyData *mem_scankeys, int num_mem_scankeys, - tuple_filtering_constraints *constraints, bool *skip_current_tuple, - bool delete_only, Bitmapset *null_columns, List *is_nulls) +decompress_batches_scan(Relation in_rel, Relation out_rel, Relation index_rel, Snapshot snapshot, + ScanKeyData *index_scankeys, int num_index_scankeys, + ScanKeyData *heap_scankeys, int num_heap_scankeys, + ScanKeyData *mem_scankeys, int num_mem_scankeys, + tuple_filtering_constraints *constraints, bool *skip_current_tuple, + bool delete_only, Bitmapset *null_columns, List *is_nulls) { HeapTuple compressed_tuple; RowDecompressor decompressor; bool decompressor_initialized = false; bool valid = false; - int num_segmentby_filtered_rows = 0; - int num_heap_filtered_rows = 0; + int num_scanned_rows = 0; + int num_filtered_rows = 0; + TM_Result result; + DecompressBatchScanDesc scan = NULL; struct decompress_batches_stats stats = { 0 }; /* TODO: Optimization by reusing the index scan while working on a single chunk */ - IndexScanDesc scan = index_beginscan(in_rel, index_rel, snapshot, num_index_scankeys, 0); + + if (index_rel) + { + scan = decompress_batch_beginscan(in_rel, + index_rel, + snapshot, + num_index_scankeys, + index_scankeys); + } + else + { + scan = decompress_batch_beginscan(in_rel, NULL, snapshot, num_heap_scankeys, heap_scankeys); + } TupleTableSlot *slot = table_slot_create(in_rel, NULL); - index_rescan(scan, index_scankeys, num_index_scankeys, NULL, 0); - while (index_getnext_slot(scan, ForwardScanDirection, slot)) + + while (decompress_batch_scan_getnext_slot(scan, ForwardScanDirection, slot)) { - TM_Result result; + num_scanned_rows++; + /* Deconstruct the tuple */ Assert(slot->tts_ops->get_heap_tuple); compressed_tuple = slot->tts_ops->get_heap_tuple(slot); - num_segmentby_filtered_rows++; - if (num_heap_scankeys) + + if (index_rel && num_heap_scankeys) { /* filter tuple based on compress_orderby columns */ valid = false; @@ -401,7 +445,7 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r #endif if (!valid) { - num_heap_filtered_rows++; + num_filtered_rows++; continue; } } @@ -411,6 +455,10 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r bool is_null_condition = 0; bool seg_col_is_null = false; valid = true; + /* + * Since the heap scan API does not support SK_SEARCHNULL we have to check + * for NULL values manually when those are part of the constraints. + */ for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno)) { is_null_condition = is_nulls && list_nth_int(is_nulls, pos); @@ -427,9 +475,10 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r } pos++; } + if (!valid) { - num_heap_filtered_rows++; + num_filtered_rows++; continue; } @@ -437,7 +486,6 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r { decompressor = build_decompressor(in_rel, out_rel); decompressor.delete_only = delete_only; - decompressor_initialized = true; } @@ -460,12 +508,10 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r if (skip_current_tuple && *skip_current_tuple) { row_decompressor_close(&decompressor); - index_endscan(scan); - index_close(index_rel, AccessShareLock); + decompress_batch_endscan(scan); ExecDropSingleTupleTableSlot(slot); return stats; } - write_logical_replication_msg_decompression_start(); result = delete_compressed_tuple(&decompressor, snapshot, compressed_tuple); /* skip reporting error if isolation level is < Repeatable Read @@ -482,8 +528,7 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r { write_logical_replication_msg_decompression_end(); row_decompressor_close(&decompressor); - index_endscan(scan); - index_close(index_rel, AccessShareLock); + decompress_batch_endscan(scan); report_error(result); return stats; } @@ -498,156 +543,8 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r } write_logical_replication_msg_decompression_end(); } - - if (ts_guc_debug_compression_path_info) - { - elog(INFO, - "Number of compressed rows fetched from index: %d. " - "Number of compressed rows filtered by heap filters: %d.", - num_segmentby_filtered_rows, - num_heap_filtered_rows); - } - ExecDropSingleTupleTableSlot(slot); - index_endscan(scan); - if (decompressor_initialized) - { - row_decompressor_close(&decompressor); - } - CommandCounterIncrement(); - return stats; -} - -/* - * This method will: - * 1.scan compressed chunk - * 2.decompress the row - * 3.delete this row from compressed chunk - * 4.insert decompressed rows to uncompressed chunk - * - * Return value: - * if all 4 steps defined above pass return whether we decompressed anything. - * if step 4 fails return false. Step 3 will fail if there are conflicting concurrent operations on - * same chunk. - */ -static struct decompress_batches_stats -decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot, - ScanKeyData *scankeys, int num_scankeys, ScanKeyData *mem_scankeys, - int num_mem_scankeys, tuple_filtering_constraints *constraints, - bool *skip_current_tuple, bool delete_only, Bitmapset *null_columns, - List *is_nulls) -{ - RowDecompressor decompressor; - bool decompressor_initialized = false; - - TupleTableSlot *slot = table_slot_create(in_rel, NULL); - TableScanDesc scan = table_beginscan(in_rel, snapshot, num_scankeys, scankeys); - int num_scanned_rows = 0; - int num_filtered_rows = 0; - struct decompress_batches_stats stats = { 0 }; - - while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) - { - num_scanned_rows++; - bool skip_tuple = false; - int attrno = bms_next_member(null_columns, -1); - int pos = 0; - bool is_null_condition = 0; - bool seg_col_is_null = false; - /* - * Since the heap scan API does not support SK_SEARCHNULL we have to check - * for NULL values manually when those are part of the constraints. - */ - for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno)) - { - /* Treat all conditions as IS NULL if the list is empty */ - is_null_condition = is_nulls == NIL || list_nth_int(is_nulls, pos); - seg_col_is_null = slot_attisnull(slot, attrno); - if ((seg_col_is_null && !is_null_condition) || (!seg_col_is_null && is_null_condition)) - { - /* - * if segment by column in the scanned tuple has non null value - * and IS NULL is specified, OR segment by column has null value - * and IS NOT NULL is specified then skip this tuple - */ - skip_tuple = true; - break; - } - pos++; - } - if (skip_tuple) - { - num_filtered_rows++; - continue; - } - - TM_Result result; - Assert(slot->tts_ops->get_heap_tuple); - HeapTuple compressed_tuple = slot->tts_ops->get_heap_tuple(slot); - - if (!decompressor_initialized) - { - decompressor = build_decompressor(in_rel, out_rel); - decompressor.delete_only = delete_only; - decompressor_initialized = true; - } - - heap_deform_tuple(compressed_tuple, - decompressor.in_desc, - decompressor.compressed_datums, - decompressor.compressed_is_nulls); - - if (num_mem_scankeys && !batch_matches(&decompressor, - mem_scankeys, - num_mem_scankeys, - constraints, - skip_current_tuple)) - { - row_decompressor_reset(&decompressor); - stats.batches_filtered++; - continue; - } - - if (skip_current_tuple && *skip_current_tuple) - { - row_decompressor_close(&decompressor); - ExecDropSingleTupleTableSlot(slot); - table_endscan(scan); - return stats; - } - - write_logical_replication_msg_decompression_start(); - result = delete_compressed_tuple(&decompressor, snapshot, compressed_tuple); - /* skip reporting error if isolation level is < Repeatable Read - * since somebody decompressed the data concurrently, we need to take - * that data into account as well when in Read Committed level - */ - if (result == TM_Deleted && !IsolationUsesXactSnapshot()) - { - write_logical_replication_msg_decompression_end(); - stats.batches_decompressed++; - continue; - } - if (result != TM_Ok) - { - table_endscan(scan); - report_error(result); - } - if (decompressor.delete_only) - { - stats.batches_deleted++; - } - else - { - stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor); - stats.batches_decompressed++; - } - write_logical_replication_msg_decompression_end(); - } - if (scankeys) - pfree(scankeys); - ExecDropSingleTupleTableSlot(slot); - table_endscan(scan); + decompress_batch_endscan(scan); if (decompressor_initialized) { row_decompressor_close(&decompressor); @@ -656,11 +553,14 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot, if (ts_guc_debug_compression_path_info) { elog(INFO, - "Number of compressed rows fetched from table scan: %d. " - "Number of compressed rows filtered: %d.", + "Number of compressed rows fetched from %s: %d. " + "Number of compressed rows filtered%s: %d.", + index_rel ? "index" : "table scan", num_scanned_rows, + index_rel ? " by heap filters" : "", num_filtered_rows); } + return stats; }