Skip to content

Commit

Permalink
Create compressed chunks at insert time
Browse files Browse the repository at this point in the history
To avoid lock contention during compression operation, we are moving
compressed chunk creation together with uncompressed chunk creation
during insert time. Now compressed chunks live while the uncompressed
chunk is alive, we don't remove them during decompression but rather
truncate them. This moves lock contention over compressed hypertable
to coincide with lock contention over uncompressed hypertable.
  • Loading branch information
sb230132 committed Jul 4, 2023
1 parent 9bbf521 commit da6e542
Show file tree
Hide file tree
Showing 74 changed files with 6,578 additions and 6,824 deletions.
1 change: 1 addition & 0 deletions .unreleased/feature_5489
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #5489 Create compressed chunks at insert time
6 changes: 3 additions & 3 deletions sql/size_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,10 @@ SELECT
srcht.table_name AS hypertable_name,
srcch.schema_name AS chunk_schema,
srcch.table_name AS chunk_name,
CASE WHEN srcch.compressed_chunk_id IS NULL THEN
'Uncompressed'::text
ELSE
CASE WHEN srcch.status & 1 > 0 THEN
'Compressed'::text
ELSE
'Uncompressed'::text
END AS compression_status,
map.uncompressed_heap_size,
map.uncompressed_index_size,
Expand Down
74 changes: 70 additions & 4 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,8 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
}
}
#endif
Chunk *compressed_chunk = NULL;

/* Insert any new dimension slices into metadata */
ts_dimension_slice_insert_multi(cube->slices, cube->num_slices);

Expand All @@ -1211,6 +1213,49 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
prefix,
get_next_chunk_id());

/* If we have compressoin enabled, we create the compressed chunk here
* to reduce locking contention since we already use heavy locks to attach
* chunks to the hypertable.
*/
if (ht->fd.compressed_hypertable_id != 0)
{
Hypertable *compress_ht = ts_hypertable_get_by_id(ht->fd.compressed_hypertable_id);
int32 chunk_id = get_next_chunk_id();
NameData *compress_chunk_table_name = palloc0(sizeof(*compress_chunk_table_name));

int namelen = snprintf(NameStr(*compress_chunk_table_name),
NAMEDATALEN,
"compress%s_%d_chunk",
NameStr(compress_ht->fd.associated_table_prefix),
chunk_id);

if (namelen >= NAMEDATALEN)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("invalid name \"%s\" for compressed chunk",
NameStr(*compress_chunk_table_name)),
errdetail("The associated table prefix is too long.")));
Hypercube *c_cube = ts_hypercube_alloc(0);

compressed_chunk = chunk_create_only_table_after_lock(compress_ht,
c_cube,
schema_name,
NameStr(*compress_chunk_table_name),
NULL,
chunk_id);

compressed_chunk->constraints = ts_chunk_constraints_alloc(1, CurrentMemoryContext);

chunk_add_constraints(compressed_chunk);
chunk_insert_into_metadata_after_lock(compressed_chunk);
chunk_create_table_constraints(compress_ht, compressed_chunk);
pfree(c_cube);
pfree(compress_chunk_table_name);
}

if (compressed_chunk)
chunk->fd.compressed_chunk_id = compressed_chunk->fd.id;

chunk_add_constraints(chunk);
chunk_insert_into_metadata_after_lock(chunk);
chunk_create_table_constraints(ht, chunk);
Expand Down Expand Up @@ -1458,6 +1503,7 @@ Chunk *
ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, const char *schema,
const char *prefix)
{
Hypertable *compress_ht = NULL;
/*
* We're going to have to resurrect or create the chunk.
* Serialize chunk creation around a lock on the "main table" to avoid
Expand All @@ -1466,6 +1512,15 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, con
* conflicts with itself. The lock needs to be held until transaction end.
*/
LockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
/* If we have a compressed hypertable set, lock up the compressed hypertable
* so we can create the compressed chunk too
*/
if (ht->fd.compressed_hypertable_id != 0)
{
compress_ht = ts_hypertable_get_by_id(ht->fd.compressed_hypertable_id);
Assert(compress_ht);
LockRelationOid(compress_ht->main_table_relid, ShareUpdateExclusiveLock);
}

DEBUG_WAITPOINT("chunk_create_for_point");

Expand All @@ -1486,6 +1541,8 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, con
* release the lock early.
*/
UnlockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
if (compress_ht)
UnlockRelationOid(compress_ht->main_table_relid, ShareUpdateExclusiveLock);
if (found)
*found = true;
return chunk;
Expand Down Expand Up @@ -3168,14 +3225,18 @@ ts_chunk_exists_with_compression(int32 hypertable_id)
ts_scanner_foreach(&iterator)
{
bool isnull_dropped;
bool isnull_status;
bool isnull_chunk_id =
slot_attisnull(ts_scan_iterator_slot(&iterator), Anum_chunk_compressed_chunk_id);
bool dropped = DatumGetBool(
slot_getattr(ts_scan_iterator_slot(&iterator), Anum_chunk_dropped, &isnull_dropped));
/* dropped is not NULLABLE */
Assert(!isnull_dropped);
int status = DatumGetInt32(
slot_getattr(ts_scan_iterator_slot(&iterator), Anum_chunk_status, &isnull_status));
Assert(!isnull_status);

if (!isnull_chunk_id && !dropped)
if (!isnull_chunk_id && !dropped && (status & CHUNK_STATUS_COMPRESSED) > 0)
{
found = true;
break;
Expand Down Expand Up @@ -3637,7 +3698,6 @@ chunk_change_compressed_status_in_tuple(TupleInfo *ti, int32 compressed_chunk_id
}
else
{
form.compressed_chunk_id = INVALID_CHUNK_ID;
form.status =
ts_clear_flags_32(form.status,
CHUNK_STATUS_COMPRESSED | CHUNK_STATUS_COMPRESSED_UNORDERED |
Expand Down Expand Up @@ -4460,21 +4520,27 @@ ts_chunk_validate_chunk_status_for_operation(const Chunk *chunk, ChunkOperation
case CHUNK_COMPRESS:
{
if (ts_flags_are_set_32(chunk_status, CHUNK_STATUS_COMPRESSED))
{
ereport((throw_error ? ERROR : NOTICE),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already compressed",
get_rel_name(chunk_relid))));
return false;
return false;
}
break;
}
/* Only compressed chunks can be decompressed */
case CHUNK_DECOMPRESS:
{
if (!ts_flags_are_set_32(chunk_status, CHUNK_STATUS_COMPRESSED))
{
ereport((throw_error ? ERROR : NOTICE),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already decompressed",
get_rel_name(chunk_relid))));
return false;
return false;
}
break;
}
default:
break;
Expand Down
60 changes: 28 additions & 32 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ find_chunk_to_merge_into(Hypertable *ht, Chunk *current_chunk)
/* If there is no previous adjacent chunk along the time dimension or
* if it hasn't been compressed yet, we can't merge.
*/
if (!previous_chunk || !OidIsValid(previous_chunk->fd.compressed_chunk_id))
if (!previous_chunk || !ts_chunk_is_compressed(previous_chunk))
return NULL;

Assert(previous_chunk->cube->num_slices > 0);
Expand Down Expand Up @@ -476,9 +476,18 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
mergable_chunk = find_chunk_to_merge_into(cxt.srcht, cxt.srcht_chunk);
if (!mergable_chunk)
{
/* create compressed chunk and a new table */
compress_ht_chunk = create_compress_chunk(cxt.compress_ht, cxt.srcht_chunk, InvalidOid);
new_compressed_chunk = true;
compress_ht_chunk = ts_chunk_get_by_id(cxt.srcht_chunk->fd.compressed_chunk_id, false);

/*
* Compressed chunk should be created at insert times.
* If it happens that they are not created, create them here.
* This is to support previous behavior which always created them here.
*/
if (!compress_ht_chunk)
{
compress_ht_chunk = create_compress_chunk(cxt.compress_ht, cxt.srcht_chunk, InvalidOid);
new_compressed_chunk = true;
}
}
else
{
Expand Down Expand Up @@ -506,7 +515,7 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
ts_chunk_drop_fks(cxt.srcht_chunk);
after_size = ts_relation_size_impl(compress_ht_chunk->table_id);

if (new_compressed_chunk)
if (!mergable_chunk)
{
compression_chunk_size_catalog_insert(cxt.srcht_chunk->fd.id,
&before_size,
Expand All @@ -515,12 +524,16 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
cstat.rowcnt_pre_compression,
cstat.rowcnt_post_compression);

/* Copy chunk constraints (including fkey) to compressed chunk.
* Do this after compressing the chunk to avoid holding strong, unnecessary locks on the
* referenced table during compression.
*/
ts_chunk_constraints_create(cxt.compress_ht, compress_ht_chunk);
ts_trigger_create_all_on_chunk(compress_ht_chunk);
/* If we made a new compressed chunk, we should copy the constraints */
if (new_compressed_chunk)
{
/* Copy chunk constraints (including fkey) to compressed chunk.
* Do this after compressing the chunk to avoid holding strong, unnecessary locks on the
* referenced table during compression.
*/
ts_chunk_constraints_create(cxt.compress_ht, compress_ht_chunk);
ts_trigger_create_all_on_chunk(compress_ht_chunk);
}
ts_chunk_set_compressed_chunk(cxt.srcht_chunk, compress_ht_chunk->fd.id);
}
else
Expand Down Expand Up @@ -583,12 +596,11 @@ decompress_chunk_impl(Oid uncompressed_hypertable_relid, Oid uncompressed_chunk_
if (uncompressed_chunk->fd.hypertable_id != uncompressed_hypertable->fd.id)
elog(ERROR, "hypertable and chunk do not match");

if (uncompressed_chunk->fd.compressed_chunk_id == INVALID_CHUNK_ID)
if (!ts_chunk_validate_chunk_status_for_operation(uncompressed_chunk,
CHUNK_DECOMPRESS,
!if_compressed))
{
ts_cache_release(hcache);
ereport((if_compressed ? NOTICE : ERROR),
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("chunk \"%s\" is not compressed", get_rel_name(uncompressed_chunk_relid))));
return false;
}

Expand Down Expand Up @@ -644,17 +656,6 @@ decompress_chunk_impl(Oid uncompressed_hypertable_relid, Oid uncompressed_chunk_
ts_compression_chunk_size_delete(uncompressed_chunk->fd.id);
ts_chunk_clear_compressed_chunk(uncompressed_chunk);

/*
* Lock the compressed chunk that is going to be deleted. At this point,
* the reference to the compressed chunk is already removed from the
* catalog. So, new readers do not include it in their operations.
*
* Note: Calling performMultipleDeletions in chunk_index_tuple_delete
* also requests an AccessExclusiveLock on the compressed_chunk. However,
* this call makes the lock on the chunk explicit.
*/
LockRelationOid(compressed_chunk->table_id, AccessExclusiveLock);
ts_chunk_drop(compressed_chunk, DROP_RESTRICT, -1);
ts_cache_release(hcache);
return true;
}
Expand All @@ -667,13 +668,8 @@ decompress_chunk_impl(Oid uncompressed_hypertable_relid, Oid uncompressed_chunk_
Oid
tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed)
{
if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID)
{
ereport((if_not_compressed ? NOTICE : ERROR),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already compressed", get_rel_name(chunk->table_id))));
if (!ts_chunk_validate_chunk_status_for_operation(chunk, CHUNK_COMPRESS, !if_not_compressed))
return chunk->table_id;
}

return compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
}
Expand Down
1 change: 1 addition & 0 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,7 @@ decompress_chunk(Oid in_table, Oid out_table)
FreeBulkInsertState(decompressor.bistate);
MemoryContextDelete(decompressor.per_compressed_row_ctx);
ts_catalog_close_indexes(decompressor.indexstate);
truncate_relation(in_table);

table_close(out_rel, NoLock);
table_close(in_rel, NoLock);
Expand Down
10 changes: 7 additions & 3 deletions tsl/src/compression/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -978,9 +978,13 @@ drop_existing_compression_table(Hypertable *ht)
" compressed hypertable could not be found.",
NameStr(ht->fd.table_name))));

/* need to drop the old compressed hypertable in case the segment by columns changed (and
* thus the column types of compressed hypertable need to change) */
ts_hypertable_drop(compressed, DROP_RESTRICT);
/* Need to drop the old compressed hypertable in case the segment by columns
* changed (and thus the column types of compressed hypertable need to change)
*
* We need to cascade the delete since chunks are now not removed during
* decompression.
* */
ts_hypertable_drop(compressed, DROP_CASCADE);
ts_hypertable_compression_delete_by_hypertable_id(ht->fd.id);
ts_hypertable_unset_compressed(ht);
}
Expand Down
4 changes: 2 additions & 2 deletions tsl/src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ tsl_set_rel_pathlist_query(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeT
{
Chunk *chunk = ts_chunk_get_by_relid(rte->relid, true);

if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID)
if (ts_chunk_is_compressed(chunk))
ts_decompress_chunk_generate_paths(root, rel, ht, chunk);
}
}
Expand Down Expand Up @@ -163,7 +163,7 @@ tsl_set_rel_pathlist_dml(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTbl
{
ListCell *lc;
Chunk *chunk = ts_chunk_get_by_relid(rte->relid, true);
if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID)
if (ts_chunk_is_compressed(chunk))
{
foreach (lc, rel->pathlist)
{
Expand Down
12 changes: 6 additions & 6 deletions tsl/test/expected/bgw_custom.out
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ SELECT * FROM _timescaledb_internal.compressed_chunk_stats ORDER BY chunk_name;
hypertable_schema | hypertable_name | chunk_schema | chunk_name | compression_status | uncompressed_heap_size | uncompressed_index_size | uncompressed_toast_size | uncompressed_total_size | compressed_heap_size | compressed_index_size | compressed_toast_size | compressed_total_size
-------------------+-----------------+-----------------------+------------------+--------------------+------------------------+-------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-----------------------
public | conditions | _timescaledb_internal | _hyper_1_1_chunk | Uncompressed | | | | | | | |
public | conditions | _timescaledb_internal | _hyper_1_2_chunk | Uncompressed | | | | | | | |
public | conditions | _timescaledb_internal | _hyper_1_3_chunk | Uncompressed | | | | | | | |
public | conditions | _timescaledb_internal | _hyper_1_5_chunk | Uncompressed | | | | | | | |
(3 rows)

-- Compression policy
Expand All @@ -454,8 +454,8 @@ SELECT * FROM _timescaledb_internal.compressed_chunk_stats ORDER BY chunk_name;
hypertable_schema | hypertable_name | chunk_schema | chunk_name | compression_status | uncompressed_heap_size | uncompressed_index_size | uncompressed_toast_size | uncompressed_total_size | compressed_heap_size | compressed_index_size | compressed_toast_size | compressed_total_size
-------------------+-----------------+-----------------------+------------------+--------------------+------------------------+-------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-----------------------
public | conditions | _timescaledb_internal | _hyper_1_1_chunk | Compressed | 8192 | 16384 | 8192 | 32768 | 8192 | 16384 | 8192 | 32768
public | conditions | _timescaledb_internal | _hyper_1_2_chunk | Compressed | 8192 | 16384 | 8192 | 32768 | 8192 | 16384 | 8192 | 32768
public | conditions | _timescaledb_internal | _hyper_1_3_chunk | Compressed | 8192 | 16384 | 8192 | 32768 | 8192 | 16384 | 8192 | 32768
public | conditions | _timescaledb_internal | _hyper_1_5_chunk | Compressed | 8192 | 16384 | 8192 | 32768 | 8192 | 16384 | 8192 | 32768
(3 rows)

--TEST compression job after inserting data into previously compressed chunk
Expand All @@ -468,8 +468,8 @@ order by id;
id | table_name | status
----+------------------+--------
1 | _hyper_1_1_chunk | 9
2 | _hyper_1_2_chunk | 9
3 | _hyper_1_3_chunk | 9
5 | _hyper_1_5_chunk | 9
(3 rows)

--running job second time, wait for it to complete
Expand All @@ -492,8 +492,8 @@ order by id;
id | table_name | status
----+------------------+--------
1 | _hyper_1_1_chunk | 1
2 | _hyper_1_2_chunk | 1
3 | _hyper_1_3_chunk | 1
5 | _hyper_1_5_chunk | 1
(3 rows)

-- Drop the compression job
Expand All @@ -508,8 +508,8 @@ SELECT decompress_chunk(c) FROM show_chunks('conditions') c;
decompress_chunk
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
_timescaledb_internal._hyper_1_2_chunk
_timescaledb_internal._hyper_1_3_chunk
_timescaledb_internal._hyper_1_5_chunk
(3 rows)

-- TEST Continuous Aggregate job
Expand Down Expand Up @@ -572,7 +572,7 @@ FROM _timescaledb_config.bgw_job WHERE id = :job_id_5;

--verify that job is dropped when cagg is dropped
DROP MATERIALIZED VIEW conditions_summary_daily;
NOTICE: drop cascades to table _timescaledb_internal._hyper_3_10_chunk
NOTICE: drop cascades to table _timescaledb_internal._hyper_3_7_chunk
SELECT id, proc_name, hypertable_id
FROM _timescaledb_config.bgw_job WHERE id = :job_id_5;
id | proc_name | hypertable_id
Expand Down
1 change: 1 addition & 0 deletions tsl/test/expected/cagg_errors.out
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ SELECT count(*) FROM (select decompress_chunk(ch) FROM show_chunks('i2980_cagg2'
(1 row)

ALTER MATERIALIZED VIEW i2980_cagg2 SET (timescaledb.compress = 'false');
NOTICE: drop cascades to table _timescaledb_internal.compress_hyper_16_4_chunk
SELECT compress_chunk(ch) FROM show_chunks('i2980_cagg2') ch;
ERROR: compression not enabled on "i2980_cagg2"
-- test error handling when trying to create cagg on internal hypertable
Expand Down
1 change: 1 addition & 0 deletions tsl/test/expected/cagg_errors_deprecated.out
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ SELECT count(*) FROM (select decompress_chunk(ch) FROM show_chunks('i2980_cagg2'
(1 row)

ALTER MATERIALIZED VIEW i2980_cagg2 SET (timescaledb.compress = 'false');
NOTICE: drop cascades to table _timescaledb_internal.compress_hyper_17_4_chunk
SELECT compress_chunk(ch) FROM show_chunks('i2980_cagg2') ch;
ERROR: compression not enabled on "i2980_cagg2"
-- test error handling when trying to create cagg on internal hypertable
Expand Down
Loading

0 comments on commit da6e542

Please sign in to comment.