Skip to content

Commit

Permalink
Show batches/tuples decompressed in EXPLAIN output
Browse files Browse the repository at this point in the history
This patch adds tracking number of batches and tuples that needed
to be decompressed as part of DML operations on compressed hypertables.
These will be visible in EXPLAIN ANALYZE output like so:

QUERY PLAN
 Custom Scan (HypertableModify) (actual rows=0 loops=1)
   Batches decompressed: 2
   Tuples decompressed: 25
   ->  Insert on decompress_tracking (actual rows=0 loops=1)
         ->  Custom Scan (ChunkDispatch) (actual rows=2 loops=1)
               ->  Values Scan on "*VALUES*" (actual rows=2 loops=1)
(6 rows)
  • Loading branch information
svenklemm committed Oct 14, 2023
1 parent f12ed00 commit 332bbdb
Show file tree
Hide file tree
Showing 12 changed files with 207 additions and 16 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_6178
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #6178 Show batches/tuples decompressed during DML operations in EXPLAIN output
3 changes: 2 additions & 1 deletion src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ typedef struct Hypertable Hypertable;
typedef struct Chunk Chunk;
typedef struct ChunkInsertState ChunkInsertState;
typedef struct CopyChunkState CopyChunkState;
typedef struct HypertableModifyState HypertableModifyState;

typedef struct CrossModuleFunctions
{
Expand Down Expand Up @@ -139,7 +140,7 @@ typedef struct CrossModuleFunctions
PGFunction decompress_chunk;
void (*decompress_batches_for_insert)(ChunkInsertState *state, Chunk *chunk,
TupleTableSlot *slot);
bool (*decompress_target_segments)(ModifyTableState *ps);
bool (*decompress_target_segments)(HypertableModifyState *ht_state);
/* The compression functions below are not installed in SQL as part of create extension;
* They are installed and tested during testing scripts. They are exposed in cross-module
* functions because they may be very useful for debugging customer problems if the sql
Expand Down
4 changes: 3 additions & 1 deletion src/nodes/chunk_dispatch/chunk_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
typedef struct ChunkDispatch
{
/* Link to the executor state for INSERTs. This is not set for COPY path. */
const struct ChunkDispatchState *dispatch_state;
struct ChunkDispatchState *dispatch_state;
Hypertable *hypertable;
SubspaceStore *cache;
EState *estate;
Expand Down Expand Up @@ -74,6 +74,8 @@ typedef struct ChunkDispatchState
ResultRelInfo *rri;
/* flag to represent dropped attributes */
bool is_dropped_attr_exists;
int64 batches_decompressed;
int64 tuples_decompressed;
} ChunkDispatchState;

extern TSDLLEXPORT bool ts_is_chunk_dispatch_state(PlanState *state);
Expand Down
1 change: 1 addition & 0 deletions src/nodes/chunk_dispatch/chunk_insert_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
CheckValidResultRel(relinfo, chunk_dispatch_get_cmd_type(dispatch));

state = palloc0(sizeof(ChunkInsertState));
state->cds = dispatch->dispatch_state;
state->mctx = cis_context;
state->rel = rel;
state->result_relation_info = relinfo;
Expand Down
2 changes: 2 additions & 0 deletions src/nodes/chunk_dispatch/chunk_insert_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
#include "cross_module_fn.h"

typedef struct TSCopyMultiInsertBuffer TSCopyMultiInsertBuffer;
typedef struct ChunkDispatchState ChunkDispatchState;

typedef struct ChunkInsertState
{
Relation rel;
ResultRelInfo *result_relation_info;
/* Per-chunk arbiter indexes for ON CONFLICT handling */
List *arbiter_indexes;
ChunkDispatchState *cds;

/* When the tuple descriptors for the main hypertable (root) and a chunk
* differs, it is necessary to convert tuples to chunk format before
Expand Down
28 changes: 27 additions & 1 deletion src/nodes/hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,32 @@ hypertable_modify_explain(CustomScanState *node, List *ancestors, ExplainState *
mtstate->ps.instrument = node->ss.ps.instrument;
#endif

/*
* For INSERT we have to read the number of decompressed batches and
* tuples from the ChunkDispatchState below the ModifyTable.
*/
if ((mtstate->operation == CMD_INSERT
#if PG15_GE
|| mtstate->operation == CMD_MERGE
#endif
) &&
outerPlanState(mtstate))
{
List *chunk_dispatch_states = get_chunk_dispatch_states(outerPlanState(mtstate));
ListCell *lc;

foreach (lc, chunk_dispatch_states)
{
ChunkDispatchState *cds = (ChunkDispatchState *) lfirst(lc);
state->batches_decompressed += cds->batches_decompressed;
state->tuples_decompressed += cds->tuples_decompressed;
}
}
if (state->batches_decompressed > 0)
ExplainPropertyInteger("Batches decompressed", NULL, state->batches_decompressed, es);
if (state->tuples_decompressed > 0)
ExplainPropertyInteger("Tuples decompressed", NULL, state->tuples_decompressed, es);

if (NULL != state->fdwroutine)
{
appendStringInfo(es->str, "Insert on distributed hypertable");
Expand Down Expand Up @@ -793,7 +819,7 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate)
{
if (ts_cm_functions->decompress_target_segments)
{
ts_cm_functions->decompress_target_segments(node);
ts_cm_functions->decompress_target_segments(ht_state);
ht_state->comp_chunks_processed = true;
/*
* save snapshot set during ExecutorStart(), since this is the same
Expand Down
2 changes: 2 additions & 0 deletions src/nodes/hypertable_modify.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ typedef struct HypertableModifyState
bool comp_chunks_processed;
Snapshot snapshot;
FdwRoutine *fdwroutine;
int64 tuples_decompressed;
int64 batches_decompressed;
} HypertableModifyState;

extern void ts_hypertable_modify_fixup_tlist(Plan *plan);
Expand Down
46 changes: 35 additions & 11 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "gorilla.h"
#include "guc.h"
#include "nodes/chunk_dispatch/chunk_insert_state.h"
#include "nodes/hypertable_modify.h"
#include "indexing.h"
#include "segment_meta.h"
#include "ts_catalog/compression_chunk_size.h"
Expand Down Expand Up @@ -1556,6 +1557,7 @@ row_decompressor_decompress_row(RowDecompressor *decompressor, Tuplesortstate *t
decompressor->compressed_datums,
decompressor->compressed_is_nulls);

decompressor->batches_decompressed++;
do
{
/* we're done if all the decompressors return NULL */
Expand All @@ -1578,6 +1580,7 @@ row_decompressor_decompress_row(RowDecompressor *decompressor, Tuplesortstate *t
decompressor->decompressed_datums,
decompressor->decompressed_is_nulls);
TupleTableSlot *slot = MakeSingleTupleTableSlot(decompressor->out_desc, &TTSOpsVirtual);
decompressor->tuples_decompressed++;

if (tuplesortstate == NULL)
{
Expand Down Expand Up @@ -2097,6 +2100,8 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
&tmfd,
false);
Assert(result == TM_Ok);
cis->cds->batches_decompressed += decompressor.batches_decompressed;
cis->cds->tuples_decompressed += decompressor.tuples_decompressed;
}

table_endscan(heapScan);
Expand Down Expand Up @@ -3199,7 +3204,8 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel
* 4. Update catalog table to change status of moved chunk.
*/
static void
decompress_batches_for_update_delete(Chunk *chunk, List *predicates, EState *estate)
decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chunk,
List *predicates, EState *estate)
{
/* process each chunk with its corresponding predicates */

Expand Down Expand Up @@ -3292,26 +3298,41 @@ decompress_batches_for_update_delete(Chunk *chunk, List *predicates, EState *est
filter = lfirst(lc);
pfree(filter);
}
ht_state->batches_decompressed += decompressor.batches_decompressed;
ht_state->tuples_decompressed += decompressor.tuples_decompressed;
}

/*
* Traverse the plan tree to look for Scan nodes on uncompressed chunks.
* Once Scan node is found check if chunk is compressed, if so then
* decompress those segments which match the filter conditions if present.
*/
static bool decompress_chunk_walker(PlanState *ps, List *relids);

struct decompress_chunk_context
{
List *relids;
HypertableModifyState *ht_state;
};

static bool decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx);

bool
decompress_target_segments(ModifyTableState *ps)
decompress_target_segments(HypertableModifyState *ht_state)
{
List *relids = castNode(ModifyTable, ps->ps.plan)->resultRelations;
Assert(relids);
ModifyTableState *ps =
linitial_node(ModifyTableState, castNode(CustomScanState, ht_state)->custom_ps);

struct decompress_chunk_context ctx = {
.ht_state = ht_state,
.relids = castNode(ModifyTable, ps->ps.plan)->resultRelations,
};
Assert(ctx.relids);

return decompress_chunk_walker(&ps->ps, relids);
return decompress_chunk_walker(&ps->ps, &ctx);
}

static bool
decompress_chunk_walker(PlanState *ps, List *relids)
decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx)
{
RangeTblEntry *rte = NULL;
bool needs_decompression = false;
Expand All @@ -3330,7 +3351,7 @@ decompress_chunk_walker(PlanState *ps, List *relids)
case T_IndexScanState:
{
/* Get the index quals on the original table and also include
* any filters that are used to for filtering heap tuples
* any filters that are used for filtering heap tuples
*/
predicates = list_union(((IndexScan *) ps->plan)->indexqualorig, ps->plan->qual);
needs_decompression = true;
Expand Down Expand Up @@ -3362,7 +3383,7 @@ decompress_chunk_walker(PlanState *ps, List *relids)
* even when it is a self join
*/
int scanrelid = ((Scan *) ps->plan)->scanrelid;
if (list_member_int(relids, scanrelid))
if (list_member_int(ctx->relids, scanrelid))
{
rte = rt_fetch(scanrelid, ps->state->es_range_table);
current_chunk = ts_chunk_get_by_relid(rte->relid, false);
Expand All @@ -3374,7 +3395,10 @@ decompress_chunk_walker(PlanState *ps, List *relids)
errmsg("UPDATE/DELETE is disabled on compressed chunks"),
errhint("Set timescaledb.enable_dml_decompression to TRUE.")));

decompress_batches_for_update_delete(current_chunk, predicates, ps->state);
decompress_batches_for_update_delete(ctx->ht_state,
current_chunk,
predicates,
ps->state);

/* This is a workaround specifically for bitmap heap scans:
* during node initialization, initialize the scan state with the active snapshot
Expand All @@ -3400,7 +3424,7 @@ decompress_chunk_walker(PlanState *ps, List *relids)
if (predicates)
pfree(predicates);

return planstate_tree_walker(ps, decompress_chunk_walker, relids);
return planstate_tree_walker(ps, decompress_chunk_walker, ctx);
}

#endif
5 changes: 4 additions & 1 deletion tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ typedef struct RowDecompressor
bool *decompressed_is_nulls;

MemoryContext per_compressed_row_ctx;
int64 batches_decompressed;
int64 tuples_decompressed;
} RowDecompressor;

/*
Expand Down Expand Up @@ -323,7 +325,8 @@ typedef struct ChunkInsertState ChunkInsertState;
extern void decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk,
TupleTableSlot *slot);
#if PG14_GE
extern bool decompress_target_segments(ModifyTableState *ps);
typedef struct HypertableModifyState HypertableModifyState;
extern bool decompress_target_segments(HypertableModifyState *ht_state);
#endif
/* CompressSingleRowState methods */
struct CompressSingleRowState;
Expand Down
100 changes: 100 additions & 0 deletions tsl/test/shared/expected/decompress_tracking.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\set EXPLAIN 'EXPLAIN (costs off,timing off,summary off)'
\set EXPLAIN_ANALYZE 'EXPLAIN (analyze,costs off,timing off,summary off)'
CREATE TABLE decompress_tracking(time timestamptz not null, device text, value float, primary key(time, device));
SELECT table_name FROM create_hypertable('decompress_tracking','time');
table_name
decompress_tracking
(1 row)

ALTER TABLE decompress_tracking SET (timescaledb.compress, timescaledb.compress_segmentby='device');
INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd1', random() FROM generate_series(1,10) g;
INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd2', random() FROM generate_series(1,20) g;
INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd3', random() FROM generate_series(1,30) g;
SELECT count(compress_chunk(ch)) FROM show_chunks('decompress_tracking') ch;
count
2
(1 row)

-- no tracking without analyze
:EXPLAIN UPDATE decompress_tracking SET value = value + 3;
QUERY PLAN
Custom Scan (HypertableModify)
-> Update on decompress_tracking
Update on _hyper_X_X_chunk decompress_tracking_1
Update on _hyper_X_X_chunk decompress_tracking_2
-> Result
-> Append
-> Seq Scan on _hyper_X_X_chunk decompress_tracking_1
-> Seq Scan on _hyper_X_X_chunk decompress_tracking_2
(8 rows)

BEGIN; :EXPLAIN_ANALYZE UPDATE decompress_tracking SET value = value + 3; ROLLBACK;
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
Batches decompressed: 5
Tuples decompressed: 60
-> Update on decompress_tracking (actual rows=0 loops=1)
Update on _hyper_X_X_chunk decompress_tracking_1
Update on _hyper_X_X_chunk decompress_tracking_2
-> Result (actual rows=60 loops=1)
-> Append (actual rows=60 loops=1)
-> Seq Scan on _hyper_X_X_chunk decompress_tracking_1 (actual rows=40 loops=1)
-> Seq Scan on _hyper_X_X_chunk decompress_tracking_2 (actual rows=20 loops=1)
(10 rows)

BEGIN; :EXPLAIN_ANALYZE DELETE FROM decompress_tracking; ROLLBACK;
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
Batches decompressed: 5
Tuples decompressed: 60
-> Delete on decompress_tracking (actual rows=0 loops=1)
Delete on _hyper_X_X_chunk decompress_tracking_1
Delete on _hyper_X_X_chunk decompress_tracking_2
-> Append (actual rows=60 loops=1)
-> Seq Scan on _hyper_X_X_chunk decompress_tracking_1 (actual rows=40 loops=1)
-> Seq Scan on _hyper_X_X_chunk decompress_tracking_2 (actual rows=20 loops=1)
(9 rows)

BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01 1:30','d1',random(); ROLLBACK;
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
Batches decompressed: 1
Tuples decompressed: 10
-> Insert on decompress_tracking (actual rows=0 loops=1)
-> Custom Scan (ChunkDispatch) (actual rows=1 loops=1)
-> Subquery Scan on "*SELECT*" (actual rows=1 loops=1)
-> Result (actual rows=1 loops=1)
(7 rows)

BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01','d2',random(); ROLLBACK;
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
-> Insert on decompress_tracking (actual rows=0 loops=1)
-> Custom Scan (ChunkDispatch) (actual rows=1 loops=1)
-> Subquery Scan on "*SELECT*" (actual rows=1 loops=1)
-> Result (actual rows=1 loops=1)
(5 rows)

BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01','d4',random(); ROLLBACK;
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
-> Insert on decompress_tracking (actual rows=0 loops=1)
-> Custom Scan (ChunkDispatch) (actual rows=1 loops=1)
-> Subquery Scan on "*SELECT*" (actual rows=1 loops=1)
-> Result (actual rows=1 loops=1)
(5 rows)

BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking (VALUES ('2020-01-01 1:30','d1',random()),('2020-01-01 1:30','d2',random())); ROLLBACK;
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
Batches decompressed: 2
Tuples decompressed: 25
-> Insert on decompress_tracking (actual rows=0 loops=1)
-> Custom Scan (ChunkDispatch) (actual rows=2 loops=1)
-> Values Scan on "*VALUES*" (actual rows=2 loops=1)
(6 rows)

DROP TABLE decompress_tracking;
3 changes: 2 additions & 1 deletion tsl/test/shared/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ set(TEST_TEMPLATES_SHARED
transparent_decompress_chunk.sql.in space_constraint.sql.in)

if((${PG_VERSION_MAJOR} GREATER_EQUAL "14"))
list(APPEND TEST_FILES_SHARED compression_dml.sql memoize.sql)
list(APPEND TEST_FILES_SHARED compression_dml.sql decompress_tracking.sql
memoize.sql)
endif()

# this test was changing the contents of tables in shared_setup.sql thus causing
Expand Down
Loading

0 comments on commit 332bbdb

Please sign in to comment.