Skip to content

Commit

Permalink
Create compressed chunks at insert time
Browse files Browse the repository at this point in the history
To avoid lock contention during compression operation, we are moving
compressed chunk creation together with uncompressed chunk creation
during insert time. Now compressed chunks live while the uncompressed
chunk is alive, we don't remove them during decompression but rather
truncate them. This moves lock contention over compressed hypertable
to coincide with lock contention over uncompressed hypertable.
  • Loading branch information
antekresic committed Aug 24, 2023
1 parent 0da18a9 commit 4f33a4f
Show file tree
Hide file tree
Showing 106 changed files with 8,500 additions and 8,077 deletions.
1 change: 1 addition & 0 deletions .unreleased/PR_5849
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #5849 Create compressed chunks at insert time
4 changes: 4 additions & 0 deletions sql/maintenance_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.create_compressed_chunk(
numrows_post_compression BIGINT
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_create_compressed_chunk' LANGUAGE C STRICT VOLATILE;

CREATE OR REPLACE FUNCTION _timescaledb_functions.create_compressed_chunks_for_hypertable(
hypertable REGCLASS
) RETURNS BOOL AS '$libdir/timescaledb-2.12.0-dev', 'ts_create_compressed_chunks_for_hypertable' LANGUAGE C STRICT VOLATILE;

CREATE OR REPLACE FUNCTION @[email protected]_chunk(
uncompressed_chunk REGCLASS,
if_not_compressed BOOLEAN = false
Expand Down
6 changes: 3 additions & 3 deletions sql/size_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,10 @@ SELECT
srcht.table_name AS hypertable_name,
srcch.schema_name AS chunk_schema,
srcch.table_name AS chunk_name,
CASE WHEN srcch.compressed_chunk_id IS NULL THEN
'Uncompressed'::text
ELSE
CASE WHEN srcch.status & 1 > 0 THEN
'Compressed'::text
ELSE
'Uncompressed'::text
END AS compression_status,
map.uncompressed_heap_size,
map.uncompressed_index_size,
Expand Down
3 changes: 3 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,6 @@ ALTER FUNCTION _timescaledb_internal.get_chunk_colstats(regclass) SET SCHEMA _ti

UPDATE _timescaledb_catalog.hypertable SET chunk_sizing_func_schema = '_timescaledb_functions' WHERE chunk_sizing_func_schema = '_timescaledb_internal' AND chunk_sizing_func_name = 'calculate_chunk_interval';

CREATE OR REPLACE FUNCTION _timescaledb_functions.create_compressed_chunks_for_hypertable(
hypertable REGCLASS
) RETURNS BOOL AS '$libdir/timescaledb-2.12.0-dev', 'ts_create_compressed_chunks_for_hypertable' LANGUAGE C STRICT VOLATILE;
1 change: 1 addition & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,4 @@ ALTER FUNCTION _timescaledb_functions.attach_osm_table_chunk(regclass, regclass)

UPDATE _timescaledb_catalog.hypertable SET chunk_sizing_func_schema = '_timescaledb_internal' WHERE chunk_sizing_func_schema = '_timescaledb_functions' AND chunk_sizing_func_name = 'calculate_chunk_interval';

DROP FUNCTION IF EXISTS _timescaledb_functions.create_compressed_chunks_for_hypertable(REGCLASS);
163 changes: 102 additions & 61 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ TS_FUNCTION_INFO_V1(ts_chunk_create);
TS_FUNCTION_INFO_V1(ts_chunk_status);

static bool ts_chunk_add_status(Chunk *chunk, int32 status);
static int32 create_compressed_chunk(int32 compressed_hypertable_id, const char *schema_name);

static const char *
DatumGetNameString(Datum datum)
Expand Down Expand Up @@ -1200,6 +1201,7 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
}
}
#endif

/* Insert any new dimension slices into metadata */
ts_dimension_slice_insert_multi(cube->slices, cube->num_slices);

Expand All @@ -1210,13 +1212,62 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
prefix,
get_next_chunk_id());

/* If we have compressoin enabled, we create the compressed chunk here
* to reduce locking contention since we already use heavy locks to attach
* chunks to the hypertable.
*/
if (ht->fd.compressed_hypertable_id != 0)
{
chunk->fd.compressed_chunk_id =
create_compressed_chunk(ht->fd.compressed_hypertable_id, schema_name);
}

chunk_add_constraints(chunk);
chunk_insert_into_metadata_after_lock(chunk);
chunk_create_table_constraints(ht, chunk);

return chunk;
}

static int32
create_compressed_chunk(int32 compressed_hypertable_id, const char *schema_name)
{
Hypertable *compress_ht = ts_hypertable_get_by_id(compressed_hypertable_id);
int32 chunk_id = get_next_chunk_id();
NameData *compress_chunk_table_name = palloc0(sizeof(*compress_chunk_table_name));

int namelen = snprintf(NameStr(*compress_chunk_table_name),
NAMEDATALEN,
"compress%s_%d_chunk",
NameStr(compress_ht->fd.associated_table_prefix),
chunk_id);

if (namelen >= NAMEDATALEN)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("invalid name \"%s\" for compressed chunk",
NameStr(*compress_chunk_table_name)),
errdetail("The associated table prefix is too long.")));
Hypercube *c_cube = ts_hypercube_alloc(0);

Chunk *compressed_chunk =
chunk_create_only_table_after_lock(compress_ht,
c_cube,
schema_name,
NameStr(*compress_chunk_table_name),
NULL,
chunk_id);

compressed_chunk->constraints = ts_chunk_constraints_alloc(1, CurrentMemoryContext);

chunk_add_constraints(compressed_chunk);
chunk_insert_into_metadata_after_lock(compressed_chunk);
chunk_create_table_constraints(compress_ht, compressed_chunk);
pfree(c_cube);
pfree(compress_chunk_table_name);
return compressed_chunk->fd.id;
}

/*
* Make a chunk table inherit a hypertable.
*
Expand Down Expand Up @@ -1453,6 +1504,7 @@ Chunk *
ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, const char *schema,
const char *prefix)
{
Hypertable *compress_ht = NULL;
/*
* We're going to have to resurrect or create the chunk.
* Serialize chunk creation around a lock on the "main table" to avoid
Expand All @@ -1461,6 +1513,15 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, con
* conflicts with itself. The lock needs to be held until transaction end.
*/
LockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
/* If we have a compressed hypertable set, lock up the compressed hypertable
* so we can create the compressed chunk too
*/
if (ht->fd.compressed_hypertable_id != 0)
{
compress_ht = ts_hypertable_get_by_id(ht->fd.compressed_hypertable_id);
Assert(compress_ht);
LockRelationOid(compress_ht->main_table_relid, ShareUpdateExclusiveLock);
}

DEBUG_WAITPOINT("chunk_create_for_point");

Expand All @@ -1481,6 +1542,8 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, con
* release the lock early.
*/
UnlockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
if (compress_ht)
UnlockRelationOid(compress_ht->main_table_relid, ShareUpdateExclusiveLock);
if (found)
*found = true;
return chunk;
Expand Down Expand Up @@ -1993,6 +2056,12 @@ chunk_resurrect(const Hypertable *ht, int chunk_id)
chunk->table_id = chunk_create_table(chunk, ht);
chunk_create_table_constraints(ht, chunk);

if (ht->fd.compressed_hypertable_id != 0)
{
chunk->fd.compressed_chunk_id = create_compressed_chunk(ht->fd.compressed_hypertable_id,
NameStr(ht->fd.schema_name));
}

/* Finally, update the chunk tuple to no longer be a tombstone */
chunk->fd.dropped = false;
new_tuple = chunk_formdata_make_tuple(&chunk->fd, ts_scan_iterator_tupledesc(&iterator));
Expand Down Expand Up @@ -3171,14 +3240,18 @@ ts_chunk_exists_with_compression(int32 hypertable_id)
ts_scanner_foreach(&iterator)
{
bool isnull_dropped;
bool isnull_status;
bool isnull_chunk_id =
slot_attisnull(ts_scan_iterator_slot(&iterator), Anum_chunk_compressed_chunk_id);
bool dropped = DatumGetBool(
slot_getattr(ts_scan_iterator_slot(&iterator), Anum_chunk_dropped, &isnull_dropped));
/* dropped is not NULLABLE */
Assert(!isnull_dropped);
int status = DatumGetInt32(
slot_getattr(ts_scan_iterator_slot(&iterator), Anum_chunk_status, &isnull_status));
Assert(!isnull_status);

if (!isnull_chunk_id && !dropped)
if (!isnull_chunk_id && !dropped && (status & CHUNK_STATUS_COMPRESSED) > 0)
{
found = true;
break;
Expand Down Expand Up @@ -3525,6 +3598,22 @@ ts_chunk_set_schema(Chunk *chunk, const char *newschema)
return chunk_update_form(&chunk->fd);
}

bool
ts_chunk_set_uncompressed(Chunk *chunk)
{
Assert(ts_chunk_is_compressed(chunk));
return ts_chunk_clear_status(chunk,
CHUNK_STATUS_COMPRESSED | CHUNK_STATUS_COMPRESSED_UNORDERED |
CHUNK_STATUS_COMPRESSED_PARTIAL);
}

bool
ts_chunk_set_compressed(Chunk *chunk)
{
Assert(!ts_chunk_is_compressed(chunk));
return ts_chunk_add_status(chunk, CHUNK_STATUS_COMPRESSED);
}

bool
ts_chunk_set_unordered(Chunk *chunk)
{
Expand Down Expand Up @@ -3618,34 +3707,17 @@ ts_chunk_add_status(Chunk *chunk, int32 status)
return chunk_update_status(&chunk->fd);
}

/*
* Setting (INVALID_CHUNK_ID, true) is valid for an Access Node. It means
* the data nodes contain the actual compressed chunks, and the meta-chunk is
* marked as compressed in the Access Node.
* Setting (is_compressed => false) means that the chunk is uncompressed.
*/
static ScanTupleResult
chunk_change_compressed_status_in_tuple(TupleInfo *ti, int32 compressed_chunk_id,
bool is_compressed)
chunk_set_compressed_id_in_tuple(TupleInfo *ti, void *data)
{
int32 compressed_chunk_id = *((int32 *) data);

FormData_chunk form;
HeapTuple new_tuple;
CatalogSecurityContext sec_ctx;

ts_chunk_formdata_fill(&form, ti);
if (is_compressed)
{
form.compressed_chunk_id = compressed_chunk_id;
form.status = ts_set_flags_32(form.status, CHUNK_STATUS_COMPRESSED);
}
else
{
form.compressed_chunk_id = INVALID_CHUNK_ID;
form.status =
ts_clear_flags_32(form.status,
CHUNK_STATUS_COMPRESSED | CHUNK_STATUS_COMPRESSED_UNORDERED |
CHUNK_STATUS_COMPRESSED_PARTIAL);
}
form.compressed_chunk_id = compressed_chunk_id;
new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
Expand All @@ -3656,20 +3728,6 @@ chunk_change_compressed_status_in_tuple(TupleInfo *ti, int32 compressed_chunk_id
return SCAN_DONE;
}

static ScanTupleResult
chunk_clear_compressed_status_in_tuple(TupleInfo *ti, void *data)
{
return chunk_change_compressed_status_in_tuple(ti, INVALID_CHUNK_ID, false);
}

static ScanTupleResult
chunk_set_compressed_id_in_tuple(TupleInfo *ti, void *data)
{
int32 compressed_chunk_id = *((int32 *) data);

return chunk_change_compressed_status_in_tuple(ti, compressed_chunk_id, true);
}

/*Assume permissions are already checked */
bool
ts_chunk_set_compressed_chunk(Chunk *chunk, int32 compressed_chunk_id)
Expand All @@ -3692,29 +3750,6 @@ ts_chunk_set_compressed_chunk(Chunk *chunk, int32 compressed_chunk_id)
CurrentMemoryContext) > 0;
}

/*Assume permissions are already checked */
bool
ts_chunk_clear_compressed_chunk(Chunk *chunk)
{
int32 compressed_chunk_id = INVALID_CHUNK_ID;
ScanKeyData scankey[1];
ScanKeyInit(&scankey[0],
Anum_chunk_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(chunk->fd.id));
return chunk_scan_internal(CHUNK_ID_INDEX,
scankey,
1,
chunk_check_ignorearg_dropped_filter,
chunk_clear_compressed_status_in_tuple,
&compressed_chunk_id,
0,
ForwardScanDirection,
RowExclusiveLock,
CurrentMemoryContext) > 0;
}

/* Used as a tuple found function */
static ScanTupleResult
chunk_rename_schema_name(TupleInfo *ti, void *data)
Expand Down Expand Up @@ -4392,21 +4427,27 @@ ts_chunk_validate_chunk_status_for_operation(const Chunk *chunk, ChunkOperation
case CHUNK_COMPRESS:
{
if (ts_flags_are_set_32(chunk_status, CHUNK_STATUS_COMPRESSED))
{
ereport((throw_error ? ERROR : NOTICE),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already compressed",
get_rel_name(chunk_relid))));
return false;
return false;
}
break;
}
/* Only compressed chunks can be decompressed */
case CHUNK_DECOMPRESS:
{
if (!ts_flags_are_set_32(chunk_status, CHUNK_STATUS_COMPRESSED))
{
ereport((throw_error ? ERROR : NOTICE),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already decompressed",
get_rel_name(chunk_relid))));
return false;
return false;
}
break;
}
default:
break;
Expand Down
3 changes: 2 additions & 1 deletion src/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,14 @@ extern TSDLLEXPORT List *ts_chunk_get_window(int32 dimension_id, int64 point, in
MemoryContext mctx);
extern void ts_chunks_rename_schema_name(char *old_schema, char *new_schema);

extern TSDLLEXPORT bool ts_chunk_set_uncompressed(Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_set_compressed(Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_set_partial(Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_set_unordered(Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_set_frozen(Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_unset_frozen(Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_frozen(Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_set_compressed_chunk(Chunk *chunk, int32 compressed_chunk_id);
extern TSDLLEXPORT bool ts_chunk_clear_compressed_chunk(Chunk *chunk);
extern TSDLLEXPORT void ts_chunk_drop(const Chunk *chunk, DropBehavior behavior, int32 log_level);
extern TSDLLEXPORT void ts_chunk_drop_preserve_catalog_row(const Chunk *chunk,
DropBehavior behavior, int32 log_level);
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ CROSSMODULE_WRAPPER(dictionary_compressor_finish);
CROSSMODULE_WRAPPER(array_compressor_append);
CROSSMODULE_WRAPPER(array_compressor_finish);
CROSSMODULE_WRAPPER(create_compressed_chunk);
CROSSMODULE_WRAPPER(create_compressed_chunks_for_hypertable);
CROSSMODULE_WRAPPER(compress_chunk);
CROSSMODULE_WRAPPER(decompress_chunk);

Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ typedef struct CrossModuleFunctions
void (*process_altertable_cmd)(Hypertable *ht, const AlterTableCmd *cmd);
void (*process_rename_cmd)(Oid relid, Cache *hcache, const RenameStmt *stmt);
PGFunction create_compressed_chunk;
PGFunction create_compressed_chunks_for_hypertable;
PGFunction compress_chunk;
PGFunction decompress_chunk;
void (*decompress_batches_for_insert)(ChunkInsertState *state, Chunk *chunk,
Expand Down
2 changes: 1 addition & 1 deletion src/planner/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,7 @@ timescaledb_get_relation_info_hook(PlannerInfo *root, Oid relation_objectid, boo
RangeTblEntry *chunk_rte = planner_rt_fetch(rel->relid, root);
Chunk *chunk = ts_chunk_get_by_relid(chunk_rte->relid, true);

if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID)
if (ts_chunk_is_compressed(chunk))
{
Relation uncompressed_chunk = table_open(relation_objectid, NoLock);

Expand Down
9 changes: 4 additions & 5 deletions test/sql/updates/post.compression.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ SELECT g, 'QW', g::text, 2, 0, (100,4)::custom_type_for_compression, false
FROM generate_series('2019-11-01 00:00'::timestamp, '2019-12-15 00:00'::timestamp, '1 day') g;

SELECT
count(compress_chunk(chunk.schema_name || '.' || chunk.table_name)) AS count_compressed
count(compress_chunk(chunks.schema_name || '.' || chunks.chunk_name)) AS count_compressed
FROM
_timescaledb_catalog.chunk chunk
INNER JOIN _timescaledb_catalog.hypertable hypertable ON (chunk.hypertable_id = hypertable.id)
timescaledb_information.chunks
WHERE
hypertable.table_name = 'compress'
AND chunk.compressed_chunk_id IS NULL;
hypertable_name = 'compress'
AND NOT is_compressed;

SELECT * FROM compress ORDER BY time DESC, small_cardinality, large_cardinality, some_double, some_int, some_custom, some_bool;

Expand Down
Loading

0 comments on commit 4f33a4f

Please sign in to comment.