Skip to content

Commit

Permalink
Reduce decompression for INSERT with unique constraints
Browse files Browse the repository at this point in the history
On INSERT into compressed chunks with unique constraints we can
check for conflict without decompressing when no ON CONFLICT clause
is present and we only have one unique constraint. With ON CONFLICT
clause with DO NOTHING we can just skip the INSERT if we detect conflict
and return early. Only for ON CONFLICT DO UPDATE/UPSERT do we need
to decompress when there is a constraint conflict.
Doing the optimization in the presence of multiple constraints is
also possible but not part of this patch.
  • Loading branch information
svenklemm committed Jul 13, 2024
1 parent 2c651f8 commit 47efc2f
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 77 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7108
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7108 Reduce decompressions for INSERTs with UNIQUE constraints
2 changes: 1 addition & 1 deletion src/nodes/chunk_dispatch/chunk_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ ts_chunk_dispatch_decompress_batches_for_insert(ChunkDispatch *dispatch, ChunkIn
if (ts_cm_functions->decompress_batches_for_insert)
{
ts_cm_functions->decompress_batches_for_insert(cis, slot);
OnConflictAction onconflict_action = chunk_dispatch_get_on_conflict_action(dispatch);
OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch);
/* mark rows visible */
if (onconflict_action == ONCONFLICT_UPDATE)
dispatch->estate->es_output_cid = GetCurrentCommandId(true);
Expand Down
3 changes: 3 additions & 0 deletions src/nodes/chunk_dispatch/chunk_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ typedef struct ChunkDispatchState
bool is_dropped_attr_exists;
int64 batches_decompressed;
int64 tuples_decompressed;

/* Should this INSERT be skipped due to ON CONFLICT DO NOTHING */
bool skip_current_tuple;
} ChunkDispatchState;

extern TSDLLEXPORT bool ts_is_chunk_dispatch_state(PlanState *state);
Expand Down
8 changes: 4 additions & 4 deletions src/nodes/chunk_dispatch/chunk_insert_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ chunk_dispatch_get_returning_clauses(const ChunkDispatch *dispatch)
}

OnConflictAction
chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch)
ts_chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch)
{
if (!dispatch->dispatch_state || !dispatch->dispatch_state->mtstate)
return ONCONFLICT_NONE;
Expand Down Expand Up @@ -265,7 +265,7 @@ setup_on_conflict_state(ChunkInsertState *state, const ChunkDispatch *dispatch,
ModifyTableState *mtstate = castNode(ModifyTableState, dispatch->dispatch_state->mtstate);
ModifyTable *mt = castNode(ModifyTable, mtstate->ps.plan);

Assert(chunk_dispatch_get_on_conflict_action(dispatch) == ONCONFLICT_UPDATE);
Assert(ts_chunk_dispatch_get_on_conflict_action(dispatch) == ONCONFLICT_UPDATE);

OnConflictSetState *onconfl = makeNode(OnConflictSetState);
memcpy(onconfl, hyper_rri->ri_onConflict, sizeof(OnConflictSetState));
Expand Down Expand Up @@ -433,7 +433,7 @@ adjust_projections(ChunkInsertState *cis, const ChunkDispatch *dispatch, Oid row
Relation hyper_rel = dispatch->hypertable_result_rel_info->ri_RelationDesc;
Relation chunk_rel = cis->rel;
TupleConversionMap *chunk_map = NULL;
OnConflictAction onconflict_action = chunk_dispatch_get_on_conflict_action(dispatch);
OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch);

if (chunk_dispatch_has_returning(dispatch))
{
Expand Down Expand Up @@ -479,7 +479,7 @@ ts_chunk_insert_state_create(Oid chunk_relid, const ChunkDispatch *dispatch)
MemoryContext cis_context = AllocSetContextCreate(dispatch->estate->es_query_cxt,
"chunk insert state memory context",
ALLOCSET_DEFAULT_SIZES);
OnConflictAction onconflict_action = chunk_dispatch_get_on_conflict_action(dispatch);
OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch);
ResultRelInfo *relinfo;
const Chunk *chunk;

Expand Down
3 changes: 2 additions & 1 deletion src/nodes/chunk_dispatch/chunk_insert_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,6 @@ extern ChunkInsertState *ts_chunk_insert_state_create(Oid chunk_relid,
const ChunkDispatch *dispatch);
extern void ts_chunk_insert_state_destroy(ChunkInsertState *state);

OnConflictAction chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch);
TSDLLEXPORT OnConflictAction
ts_chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch);
void ts_set_compression_status(ChunkInsertState *state, const Chunk *chunk);
8 changes: 8 additions & 0 deletions src/nodes/hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,14 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate)

context.planSlot = ExecProcNode(subplanstate);

if (cds && cds->rri && operation == CMD_INSERT && cds->skip_current_tuple)
{
cds->skip_current_tuple = false;
if (node->ps.instrument)
node->ps.instrument->ntuples2++;
return NULL;
}

/* No more tuples to process? */
if (TupIsNull(context.planSlot))
break;
Expand Down
Loading

0 comments on commit 47efc2f

Please sign in to comment.