From 332bbdb6e7a03080f19516ca3c51b4a43dea1ac7 Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Sun, 8 Oct 2023 15:39:05 +0200 Subject: [PATCH] Show batches/tuples decompressed in EXPLAIN output This patch adds tracking number of batches and tuples that needed to be decompressed as part of DML operations on compressed hypertables. These will be visible in EXPLAIN ANALYZE output like so: QUERY PLAN Custom Scan (HypertableModify) (actual rows=0 loops=1) Batches decompressed: 2 Tuples decompressed: 25 -> Insert on decompress_tracking (actual rows=0 loops=1) -> Custom Scan (ChunkDispatch) (actual rows=2 loops=1) -> Values Scan on "*VALUES*" (actual rows=2 loops=1) (6 rows) --- .unreleased/pr_6178 | 1 + src/cross_module_fn.h | 3 +- src/nodes/chunk_dispatch/chunk_dispatch.h | 4 +- src/nodes/chunk_dispatch/chunk_insert_state.c | 1 + src/nodes/chunk_dispatch/chunk_insert_state.h | 2 + src/nodes/hypertable_modify.c | 28 ++++- src/nodes/hypertable_modify.h | 2 + tsl/src/compression/compression.c | 46 ++++++-- tsl/src/compression/compression.h | 5 +- .../shared/expected/decompress_tracking.out | 100 ++++++++++++++++++ tsl/test/shared/sql/CMakeLists.txt | 3 +- tsl/test/shared/sql/decompress_tracking.sql | 28 +++++ 12 files changed, 207 insertions(+), 16 deletions(-) create mode 100644 .unreleased/pr_6178 create mode 100644 tsl/test/shared/expected/decompress_tracking.out create mode 100644 tsl/test/shared/sql/decompress_tracking.sql diff --git a/.unreleased/pr_6178 b/.unreleased/pr_6178 new file mode 100644 index 00000000000..15e1037f6bf --- /dev/null +++ b/.unreleased/pr_6178 @@ -0,0 +1 @@ +Implements: #6178 Show batches/tuples decompressed during DML operations in EXPLAIN output diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index e31ec3e203a..eade22adfc5 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -35,6 +35,7 @@ typedef struct Hypertable Hypertable; typedef struct Chunk Chunk; typedef struct ChunkInsertState ChunkInsertState; typedef struct CopyChunkState CopyChunkState; +typedef struct HypertableModifyState HypertableModifyState; typedef struct CrossModuleFunctions { @@ -139,7 +140,7 @@ typedef struct CrossModuleFunctions PGFunction decompress_chunk; void (*decompress_batches_for_insert)(ChunkInsertState *state, Chunk *chunk, TupleTableSlot *slot); - bool (*decompress_target_segments)(ModifyTableState *ps); + bool (*decompress_target_segments)(HypertableModifyState *ht_state); /* The compression functions below are not installed in SQL as part of create extension; * They are installed and tested during testing scripts. They are exposed in cross-module * functions because they may be very useful for debugging customer problems if the sql diff --git a/src/nodes/chunk_dispatch/chunk_dispatch.h b/src/nodes/chunk_dispatch/chunk_dispatch.h index c7c849fe8be..85bed5d6978 100644 --- a/src/nodes/chunk_dispatch/chunk_dispatch.h +++ b/src/nodes/chunk_dispatch/chunk_dispatch.h @@ -27,7 +27,7 @@ typedef struct ChunkDispatch { /* Link to the executor state for INSERTs. This is not set for COPY path. */ - const struct ChunkDispatchState *dispatch_state; + struct ChunkDispatchState *dispatch_state; Hypertable *hypertable; SubspaceStore *cache; EState *estate; @@ -74,6 +74,8 @@ typedef struct ChunkDispatchState ResultRelInfo *rri; /* flag to represent dropped attributes */ bool is_dropped_attr_exists; + int64 batches_decompressed; + int64 tuples_decompressed; } ChunkDispatchState; extern TSDLLEXPORT bool ts_is_chunk_dispatch_state(PlanState *state); diff --git a/src/nodes/chunk_dispatch/chunk_insert_state.c b/src/nodes/chunk_dispatch/chunk_insert_state.c index c44779a6531..a65096b0a50 100644 --- a/src/nodes/chunk_dispatch/chunk_insert_state.c +++ b/src/nodes/chunk_dispatch/chunk_insert_state.c @@ -595,6 +595,7 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch) CheckValidResultRel(relinfo, chunk_dispatch_get_cmd_type(dispatch)); state = palloc0(sizeof(ChunkInsertState)); + state->cds = dispatch->dispatch_state; state->mctx = cis_context; state->rel = rel; state->result_relation_info = relinfo; diff --git a/src/nodes/chunk_dispatch/chunk_insert_state.h b/src/nodes/chunk_dispatch/chunk_insert_state.h index fe3cf131690..d3428c314d8 100644 --- a/src/nodes/chunk_dispatch/chunk_insert_state.h +++ b/src/nodes/chunk_dispatch/chunk_insert_state.h @@ -15,6 +15,7 @@ #include "cross_module_fn.h" typedef struct TSCopyMultiInsertBuffer TSCopyMultiInsertBuffer; +typedef struct ChunkDispatchState ChunkDispatchState; typedef struct ChunkInsertState { @@ -22,6 +23,7 @@ typedef struct ChunkInsertState ResultRelInfo *result_relation_info; /* Per-chunk arbiter indexes for ON CONFLICT handling */ List *arbiter_indexes; + ChunkDispatchState *cds; /* When the tuple descriptors for the main hypertable (root) and a chunk * differs, it is necessary to convert tuples to chunk format before diff --git a/src/nodes/hypertable_modify.c b/src/nodes/hypertable_modify.c index afc5414567b..77135e3affc 100644 --- a/src/nodes/hypertable_modify.c +++ b/src/nodes/hypertable_modify.c @@ -242,6 +242,32 @@ hypertable_modify_explain(CustomScanState *node, List *ancestors, ExplainState * mtstate->ps.instrument = node->ss.ps.instrument; #endif + /* + * For INSERT we have to read the number of decompressed batches and + * tuples from the ChunkDispatchState below the ModifyTable. + */ + if ((mtstate->operation == CMD_INSERT +#if PG15_GE + || mtstate->operation == CMD_MERGE +#endif + ) && + outerPlanState(mtstate)) + { + List *chunk_dispatch_states = get_chunk_dispatch_states(outerPlanState(mtstate)); + ListCell *lc; + + foreach (lc, chunk_dispatch_states) + { + ChunkDispatchState *cds = (ChunkDispatchState *) lfirst(lc); + state->batches_decompressed += cds->batches_decompressed; + state->tuples_decompressed += cds->tuples_decompressed; + } + } + if (state->batches_decompressed > 0) + ExplainPropertyInteger("Batches decompressed", NULL, state->batches_decompressed, es); + if (state->tuples_decompressed > 0) + ExplainPropertyInteger("Tuples decompressed", NULL, state->tuples_decompressed, es); + if (NULL != state->fdwroutine) { appendStringInfo(es->str, "Insert on distributed hypertable"); @@ -793,7 +819,7 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate) { if (ts_cm_functions->decompress_target_segments) { - ts_cm_functions->decompress_target_segments(node); + ts_cm_functions->decompress_target_segments(ht_state); ht_state->comp_chunks_processed = true; /* * save snapshot set during ExecutorStart(), since this is the same diff --git a/src/nodes/hypertable_modify.h b/src/nodes/hypertable_modify.h index e69df42a472..581c45ed831 100644 --- a/src/nodes/hypertable_modify.h +++ b/src/nodes/hypertable_modify.h @@ -30,6 +30,8 @@ typedef struct HypertableModifyState bool comp_chunks_processed; Snapshot snapshot; FdwRoutine *fdwroutine; + int64 tuples_decompressed; + int64 batches_decompressed; } HypertableModifyState; extern void ts_hypertable_modify_fixup_tlist(Plan *plan); diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index fcdf77417a2..5fa698b28b1 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -59,6 +59,7 @@ #include "gorilla.h" #include "guc.h" #include "nodes/chunk_dispatch/chunk_insert_state.h" +#include "nodes/hypertable_modify.h" #include "indexing.h" #include "segment_meta.h" #include "ts_catalog/compression_chunk_size.h" @@ -1556,6 +1557,7 @@ row_decompressor_decompress_row(RowDecompressor *decompressor, Tuplesortstate *t decompressor->compressed_datums, decompressor->compressed_is_nulls); + decompressor->batches_decompressed++; do { /* we're done if all the decompressors return NULL */ @@ -1578,6 +1580,7 @@ row_decompressor_decompress_row(RowDecompressor *decompressor, Tuplesortstate *t decompressor->decompressed_datums, decompressor->decompressed_is_nulls); TupleTableSlot *slot = MakeSingleTupleTableSlot(decompressor->out_desc, &TTSOpsVirtual); + decompressor->tuples_decompressed++; if (tuplesortstate == NULL) { @@ -2097,6 +2100,8 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo &tmfd, false); Assert(result == TM_Ok); + cis->cds->batches_decompressed += decompressor.batches_decompressed; + cis->cds->tuples_decompressed += decompressor.tuples_decompressed; } table_endscan(heapScan); @@ -3199,7 +3204,8 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel * 4. Update catalog table to change status of moved chunk. */ static void -decompress_batches_for_update_delete(Chunk *chunk, List *predicates, EState *estate) +decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chunk, + List *predicates, EState *estate) { /* process each chunk with its corresponding predicates */ @@ -3292,6 +3298,8 @@ decompress_batches_for_update_delete(Chunk *chunk, List *predicates, EState *est filter = lfirst(lc); pfree(filter); } + ht_state->batches_decompressed += decompressor.batches_decompressed; + ht_state->tuples_decompressed += decompressor.tuples_decompressed; } /* @@ -3299,19 +3307,32 @@ decompress_batches_for_update_delete(Chunk *chunk, List *predicates, EState *est * Once Scan node is found check if chunk is compressed, if so then * decompress those segments which match the filter conditions if present. */ -static bool decompress_chunk_walker(PlanState *ps, List *relids); + +struct decompress_chunk_context +{ + List *relids; + HypertableModifyState *ht_state; +}; + +static bool decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx); bool -decompress_target_segments(ModifyTableState *ps) +decompress_target_segments(HypertableModifyState *ht_state) { - List *relids = castNode(ModifyTable, ps->ps.plan)->resultRelations; - Assert(relids); + ModifyTableState *ps = + linitial_node(ModifyTableState, castNode(CustomScanState, ht_state)->custom_ps); + + struct decompress_chunk_context ctx = { + .ht_state = ht_state, + .relids = castNode(ModifyTable, ps->ps.plan)->resultRelations, + }; + Assert(ctx.relids); - return decompress_chunk_walker(&ps->ps, relids); + return decompress_chunk_walker(&ps->ps, &ctx); } static bool -decompress_chunk_walker(PlanState *ps, List *relids) +decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx) { RangeTblEntry *rte = NULL; bool needs_decompression = false; @@ -3330,7 +3351,7 @@ decompress_chunk_walker(PlanState *ps, List *relids) case T_IndexScanState: { /* Get the index quals on the original table and also include - * any filters that are used to for filtering heap tuples + * any filters that are used for filtering heap tuples */ predicates = list_union(((IndexScan *) ps->plan)->indexqualorig, ps->plan->qual); needs_decompression = true; @@ -3362,7 +3383,7 @@ decompress_chunk_walker(PlanState *ps, List *relids) * even when it is a self join */ int scanrelid = ((Scan *) ps->plan)->scanrelid; - if (list_member_int(relids, scanrelid)) + if (list_member_int(ctx->relids, scanrelid)) { rte = rt_fetch(scanrelid, ps->state->es_range_table); current_chunk = ts_chunk_get_by_relid(rte->relid, false); @@ -3374,7 +3395,10 @@ decompress_chunk_walker(PlanState *ps, List *relids) errmsg("UPDATE/DELETE is disabled on compressed chunks"), errhint("Set timescaledb.enable_dml_decompression to TRUE."))); - decompress_batches_for_update_delete(current_chunk, predicates, ps->state); + decompress_batches_for_update_delete(ctx->ht_state, + current_chunk, + predicates, + ps->state); /* This is a workaround specifically for bitmap heap scans: * during node initialization, initialize the scan state with the active snapshot @@ -3400,7 +3424,7 @@ decompress_chunk_walker(PlanState *ps, List *relids) if (predicates) pfree(predicates); - return planstate_tree_walker(ps, decompress_chunk_walker, relids); + return planstate_tree_walker(ps, decompress_chunk_walker, ctx); } #endif diff --git a/tsl/src/compression/compression.h b/tsl/src/compression/compression.h index fbfd6e49b1d..db850d40583 100644 --- a/tsl/src/compression/compression.h +++ b/tsl/src/compression/compression.h @@ -150,6 +150,8 @@ typedef struct RowDecompressor bool *decompressed_is_nulls; MemoryContext per_compressed_row_ctx; + int64 batches_decompressed; + int64 tuples_decompressed; } RowDecompressor; /* @@ -323,7 +325,8 @@ typedef struct ChunkInsertState ChunkInsertState; extern void decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlot *slot); #if PG14_GE -extern bool decompress_target_segments(ModifyTableState *ps); +typedef struct HypertableModifyState HypertableModifyState; +extern bool decompress_target_segments(HypertableModifyState *ht_state); #endif /* CompressSingleRowState methods */ struct CompressSingleRowState; diff --git a/tsl/test/shared/expected/decompress_tracking.out b/tsl/test/shared/expected/decompress_tracking.out new file mode 100644 index 00000000000..86132c9ce88 --- /dev/null +++ b/tsl/test/shared/expected/decompress_tracking.out @@ -0,0 +1,100 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set EXPLAIN 'EXPLAIN (costs off,timing off,summary off)' +\set EXPLAIN_ANALYZE 'EXPLAIN (analyze,costs off,timing off,summary off)' +CREATE TABLE decompress_tracking(time timestamptz not null, device text, value float, primary key(time, device)); +SELECT table_name FROM create_hypertable('decompress_tracking','time'); + table_name + decompress_tracking +(1 row) + +ALTER TABLE decompress_tracking SET (timescaledb.compress, timescaledb.compress_segmentby='device'); +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd1', random() FROM generate_series(1,10) g; +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd2', random() FROM generate_series(1,20) g; +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd3', random() FROM generate_series(1,30) g; +SELECT count(compress_chunk(ch)) FROM show_chunks('decompress_tracking') ch; + count + 2 +(1 row) + +-- no tracking without analyze +:EXPLAIN UPDATE decompress_tracking SET value = value + 3; +QUERY PLAN + Custom Scan (HypertableModify) + -> Update on decompress_tracking + Update on _hyper_X_X_chunk decompress_tracking_1 + Update on _hyper_X_X_chunk decompress_tracking_2 + -> Result + -> Append + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_1 + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_2 +(8 rows) + +BEGIN; :EXPLAIN_ANALYZE UPDATE decompress_tracking SET value = value + 3; ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + Batches decompressed: 5 + Tuples decompressed: 60 + -> Update on decompress_tracking (actual rows=0 loops=1) + Update on _hyper_X_X_chunk decompress_tracking_1 + Update on _hyper_X_X_chunk decompress_tracking_2 + -> Result (actual rows=60 loops=1) + -> Append (actual rows=60 loops=1) + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_1 (actual rows=40 loops=1) + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_2 (actual rows=20 loops=1) +(10 rows) + +BEGIN; :EXPLAIN_ANALYZE DELETE FROM decompress_tracking; ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + Batches decompressed: 5 + Tuples decompressed: 60 + -> Delete on decompress_tracking (actual rows=0 loops=1) + Delete on _hyper_X_X_chunk decompress_tracking_1 + Delete on _hyper_X_X_chunk decompress_tracking_2 + -> Append (actual rows=60 loops=1) + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_1 (actual rows=40 loops=1) + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_2 (actual rows=20 loops=1) +(9 rows) + +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01 1:30','d1',random(); ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + Batches decompressed: 1 + Tuples decompressed: 10 + -> Insert on decompress_tracking (actual rows=0 loops=1) + -> Custom Scan (ChunkDispatch) (actual rows=1 loops=1) + -> Subquery Scan on "*SELECT*" (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) +(7 rows) + +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01','d2',random(); ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + -> Insert on decompress_tracking (actual rows=0 loops=1) + -> Custom Scan (ChunkDispatch) (actual rows=1 loops=1) + -> Subquery Scan on "*SELECT*" (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) +(5 rows) + +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01','d4',random(); ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + -> Insert on decompress_tracking (actual rows=0 loops=1) + -> Custom Scan (ChunkDispatch) (actual rows=1 loops=1) + -> Subquery Scan on "*SELECT*" (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) +(5 rows) + +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking (VALUES ('2020-01-01 1:30','d1',random()),('2020-01-01 1:30','d2',random())); ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + Batches decompressed: 2 + Tuples decompressed: 25 + -> Insert on decompress_tracking (actual rows=0 loops=1) + -> Custom Scan (ChunkDispatch) (actual rows=2 loops=1) + -> Values Scan on "*VALUES*" (actual rows=2 loops=1) +(6 rows) + +DROP TABLE decompress_tracking; diff --git a/tsl/test/shared/sql/CMakeLists.txt b/tsl/test/shared/sql/CMakeLists.txt index 3b3d6fbc4ae..6c6ca32042d 100644 --- a/tsl/test/shared/sql/CMakeLists.txt +++ b/tsl/test/shared/sql/CMakeLists.txt @@ -15,7 +15,8 @@ set(TEST_TEMPLATES_SHARED transparent_decompress_chunk.sql.in space_constraint.sql.in) if((${PG_VERSION_MAJOR} GREATER_EQUAL "14")) - list(APPEND TEST_FILES_SHARED compression_dml.sql memoize.sql) + list(APPEND TEST_FILES_SHARED compression_dml.sql decompress_tracking.sql + memoize.sql) endif() # this test was changing the contents of tables in shared_setup.sql thus causing diff --git a/tsl/test/shared/sql/decompress_tracking.sql b/tsl/test/shared/sql/decompress_tracking.sql new file mode 100644 index 00000000000..ab09d8ac9ad --- /dev/null +++ b/tsl/test/shared/sql/decompress_tracking.sql @@ -0,0 +1,28 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +\set EXPLAIN 'EXPLAIN (costs off,timing off,summary off)' +\set EXPLAIN_ANALYZE 'EXPLAIN (analyze,costs off,timing off,summary off)' + +CREATE TABLE decompress_tracking(time timestamptz not null, device text, value float, primary key(time, device)); +SELECT table_name FROM create_hypertable('decompress_tracking','time'); +ALTER TABLE decompress_tracking SET (timescaledb.compress, timescaledb.compress_segmentby='device'); + +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd1', random() FROM generate_series(1,10) g; +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd2', random() FROM generate_series(1,20) g; +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd3', random() FROM generate_series(1,30) g; + +SELECT count(compress_chunk(ch)) FROM show_chunks('decompress_tracking') ch; + +-- no tracking without analyze +:EXPLAIN UPDATE decompress_tracking SET value = value + 3; + +BEGIN; :EXPLAIN_ANALYZE UPDATE decompress_tracking SET value = value + 3; ROLLBACK; +BEGIN; :EXPLAIN_ANALYZE DELETE FROM decompress_tracking; ROLLBACK; +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01 1:30','d1',random(); ROLLBACK; +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01','d2',random(); ROLLBACK; +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01','d4',random(); ROLLBACK; +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking (VALUES ('2020-01-01 1:30','d1',random()),('2020-01-01 1:30','d2',random())); ROLLBACK; + +DROP TABLE decompress_tracking;