Skip to content

Commit

Permalink
Reduce WAL activity by freezing tuples immediately
Browse files Browse the repository at this point in the history
When we compress a chunk, we create a new compressed chunk for storing
the compressed data. So far, the tuples were just inserted into the
compressed chunk and frozen by a later vacuum run.

However, freezing tuples causes WAL activity can be optimized because
the compressed chunk is created in the same transaction as the tuples.
This patch reduces the WAL activity by storing these tuples directly as
frozen and preventing a freeze operation in the future. This approach is
similar to PostgreSQL's COPY FREEZE.
  • Loading branch information
jnidzwetzki committed Oct 25, 2023
1 parent dc91938 commit 8767de6
Show file tree
Hide file tree
Showing 35 changed files with 8,604 additions and 787 deletions.
1 change: 1 addition & 0 deletions .unreleased/feature_5890
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #5890 Reduce WAL activity by freezing compressed tuples immediately
1 change: 1 addition & 0 deletions sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ CREATE TABLE _timescaledb_catalog.compression_chunk_size (
compressed_index_size bigint NOT NULL,
numrows_pre_compression bigint,
numrows_post_compression bigint,
numrows_frozen_immediately bigint,
-- table constraints
CONSTRAINT compression_chunk_size_pkey PRIMARY KEY (chunk_id),
CONSTRAINT compression_chunk_size_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE,
Expand Down
50 changes: 50 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,53 @@ DROP TABLE _timescaledb_internal.tmp_chunk_seq_value;
GRANT SELECT ON _timescaledb_catalog.chunk_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.chunk TO PUBLIC;
-- end recreate _timescaledb_catalog.chunk table --

--
-- Rebuild the catalog table `_timescaledb_catalog.compression_chunk_size` to
-- add new column `numrows_frozen_immediately`
--
CREATE TABLE _timescaledb_internal.compression_chunk_size_tmp
AS SELECT * from _timescaledb_catalog.compression_chunk_size;

-- Drop depended views
-- We assume that '_timescaledb_internal.compressed_chunk_stats' was already dropped in this update
-- (see above)

-- Drop table
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.compression_chunk_size;
DROP TABLE _timescaledb_catalog.compression_chunk_size;

CREATE TABLE _timescaledb_catalog.compression_chunk_size (
chunk_id integer NOT NULL,
compressed_chunk_id integer NOT NULL,
uncompressed_heap_size bigint NOT NULL,
uncompressed_toast_size bigint NOT NULL,
uncompressed_index_size bigint NOT NULL,
compressed_heap_size bigint NOT NULL,
compressed_toast_size bigint NOT NULL,
compressed_index_size bigint NOT NULL,
numrows_pre_compression bigint,
numrows_post_compression bigint,
numrows_frozen_immediately bigint,
-- table constraints
CONSTRAINT compression_chunk_size_pkey PRIMARY KEY (chunk_id),
CONSTRAINT compression_chunk_size_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE,
CONSTRAINT compression_chunk_size_compressed_chunk_id_fkey FOREIGN KEY (compressed_chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE
);

INSERT INTO _timescaledb_catalog.compression_chunk_size
(chunk_id, compressed_chunk_id, uncompressed_heap_size, uncompressed_toast_size,
uncompressed_index_size, compressed_heap_size, compressed_toast_size,
compressed_index_size, numrows_pre_compression, numrows_post_compression, numrows_frozen_immediately)
SELECT chunk_id, compressed_chunk_id, uncompressed_heap_size, uncompressed_toast_size,
uncompressed_index_size, compressed_heap_size, compressed_toast_size,
compressed_index_size, numrows_pre_compression, numrows_post_compression, 0
FROM _timescaledb_internal.compression_chunk_size_tmp;

DROP TABLE _timescaledb_internal.compression_chunk_size_tmp;

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_chunk_size', '');

GRANT SELECT ON _timescaledb_catalog.compression_chunk_size TO PUBLIC;

-- End modify `_timescaledb_catalog.compression_chunk_size`
50 changes: 50 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,53 @@ GRANT SELECT ON _timescaledb_catalog.chunk_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.chunk TO PUBLIC;

-- end recreate _timescaledb_catalog.chunk table --


--
-- Rebuild the catalog table `_timescaledb_catalog.compression_chunk_size` to
-- remove column `numrows_frozen_immediately`
--
CREATE TABLE _timescaledb_internal.compression_chunk_size_tmp
AS SELECT * from _timescaledb_catalog.compression_chunk_size;

-- Drop depended views
-- We assume that '_timescaledb_internal.compressed_chunk_stats' was already dropped in this update
-- (see above)

-- Drop table
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.compression_chunk_size;
DROP TABLE _timescaledb_catalog.compression_chunk_size;

CREATE TABLE _timescaledb_catalog.compression_chunk_size (
chunk_id integer NOT NULL,
compressed_chunk_id integer NOT NULL,
uncompressed_heap_size bigint NOT NULL,
uncompressed_toast_size bigint NOT NULL,
uncompressed_index_size bigint NOT NULL,
compressed_heap_size bigint NOT NULL,
compressed_toast_size bigint NOT NULL,
compressed_index_size bigint NOT NULL,
numrows_pre_compression bigint,
numrows_post_compression bigint,
-- table constraints
CONSTRAINT compression_chunk_size_pkey PRIMARY KEY (chunk_id),
CONSTRAINT compression_chunk_size_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE,
CONSTRAINT compression_chunk_size_compressed_chunk_id_fkey FOREIGN KEY (compressed_chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE
);

INSERT INTO _timescaledb_catalog.compression_chunk_size
(chunk_id, compressed_chunk_id, uncompressed_heap_size, uncompressed_toast_size,
uncompressed_index_size, compressed_heap_size, compressed_toast_size,
compressed_index_size, numrows_pre_compression, numrows_post_compression)
SELECT chunk_id, compressed_chunk_id, uncompressed_heap_size, uncompressed_toast_size,
uncompressed_index_size, compressed_heap_size, compressed_toast_size,
compressed_index_size, numrows_pre_compression, numrows_post_compression
FROM _timescaledb_internal.compression_chunk_size_tmp;

DROP TABLE _timescaledb_internal.compression_chunk_size_tmp;

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_chunk_size', '');

GRANT SELECT ON _timescaledb_catalog.compression_chunk_size TO PUBLIC;

-- End modify `_timescaledb_catalog.compression_chunk_size`
1 change: 1 addition & 0 deletions src/telemetry/stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ add_chunk_stats(HyperStats *stats, Form_pg_class class, const Chunk *chunk,
stats->uncompressed_toast_size += fd_compr->uncompressed_toast_size;
stats->uncompressed_row_count += fd_compr->numrows_pre_compression;
stats->compressed_row_count += fd_compr->numrows_post_compression;
stats->compressed_row_frozen_immediately_count += fd_compr->numrows_frozen_immediately;

/* Also add compressed sizes to total number for entire table */
stats->storage.relsize.heap_size += fd_compr->compressed_heap_size;
Expand Down
1 change: 1 addition & 0 deletions src/telemetry/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ typedef struct HyperStats
int64 compressed_indexes_size;
int64 compressed_toast_size;
int64 compressed_row_count;
int64 compressed_row_frozen_immediately_count;
int64 uncompressed_heap_size;
int64 uncompressed_indexes_size;
int64 uncompressed_toast_size;
Expand Down
4 changes: 4 additions & 0 deletions src/telemetry/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ format_iso8601(Datum value)
#define REQ_RELKIND_COMPRESSED_TOAST_SIZE "compressed_toast_size"
#define REQ_RELKIND_COMPRESSED_INDEXES_SIZE "compressed_indexes_size"
#define REQ_RELKIND_COMPRESSED_ROWCOUNT "compressed_row_count"
#define REQ_RELKIND_COMPRESSED_ROWCOUNT_FROZEN_IMMEDIATELY "compressed_row_count_frozen_immediately"

#define REQ_RELKIND_CAGG_ON_DISTRIBUTED_HYPERTABLE_COUNT "num_caggs_on_distributed_hypertables"
#define REQ_RELKIND_CAGG_USES_REAL_TIME_AGGREGATION_COUNT "num_caggs_using_real_time_aggregation"
Expand Down Expand Up @@ -639,6 +640,9 @@ add_compression_stats_object(JsonbParseState *parse_state, StatsRelType reltype,
ts_jsonb_add_int64(parse_state,
REQ_RELKIND_COMPRESSED_INDEXES_SIZE,
hs->compressed_indexes_size);
ts_jsonb_add_int64(parse_state,
REQ_RELKIND_COMPRESSED_ROWCOUNT_FROZEN_IMMEDIATELY,
hs->compressed_row_frozen_immediately_count);
ts_jsonb_add_int64(parse_state, REQ_RELKIND_UNCOMPRESSED_ROWCOUNT, hs->uncompressed_row_count);
ts_jsonb_add_int64(parse_state, REQ_RELKIND_UNCOMPRESSED_HEAP_SIZE, hs->uncompressed_heap_size);
ts_jsonb_add_int64(parse_state,
Expand Down
2 changes: 2 additions & 0 deletions src/ts_catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,7 @@ typedef enum Anum_compression_chunk_size
Anum_compression_chunk_size_compressed_index_size,
Anum_compression_chunk_size_numrows_pre_compression,
Anum_compression_chunk_size_numrows_post_compression,
Anum_compression_chunk_size_numrows_frozen_immediately,
_Anum_compression_chunk_size_max,
} Anum_compression_chunk_size;

Expand All @@ -1306,6 +1307,7 @@ typedef struct FormData_compression_chunk_size
int64 compressed_index_size;
int64 numrows_pre_compression;
int64 numrows_post_compression;
int64 numrows_frozen_immediately;
} FormData_compression_chunk_size;

typedef FormData_compression_chunk_size *Form_compression_chunk_size;
Expand Down
1 change: 1 addition & 0 deletions tsl/src/chunk_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ chunk_copy_get_source_compressed_chunk_stats(ChunkCopy *cc)
cc->fd_ccs.compressed_index_size = atoll(PQgetvalue(res, 0, 5));
cc->fd_ccs.numrows_pre_compression = atoll(PQgetvalue(res, 0, 6));
cc->fd_ccs.numrows_post_compression = atoll(PQgetvalue(res, 0, 7));
cc->fd_ccs.numrows_frozen_immediately = 0;

ts_dist_cmd_close_response(dist_res);
}
Expand Down
41 changes: 35 additions & 6 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ typedef struct CompressChunkCxt
static void
compression_chunk_size_catalog_insert(int32 src_chunk_id, const RelationSize *src_size,
int32 compress_chunk_id, const RelationSize *compress_size,
int64 rowcnt_pre_compression, int64 rowcnt_post_compression)
int64 rowcnt_pre_compression, int64 rowcnt_post_compression,
int64 rowcnt_frozen)
{
Catalog *catalog = ts_catalog_get();
Relation rel;
Expand Down Expand Up @@ -93,6 +94,8 @@ compression_chunk_size_catalog_insert(int32 src_chunk_id, const RelationSize *sr
Int64GetDatum(rowcnt_pre_compression);
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_numrows_post_compression)] =
Int64GetDatum(rowcnt_post_compression);
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_numrows_frozen_immediately)] =
Int64GetDatum(rowcnt_frozen);

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, desc, values, nulls);
Expand Down Expand Up @@ -487,6 +490,27 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
compress_ht_chunk = ts_chunk_get_by_id(mergable_chunk->fd.compressed_chunk_id, true);
result_chunk_id = mergable_chunk->table_id;
}

/* Since the compressed relation is created in the same transaction as the tuples that will be
* written by the compressor, we can insert the tuple directly in frozen state. This is the same
* logic as performed in COPY INSERT FROZEN.
*
* Note: Tuples inserted with HEAP_INSERT_FROZEN become immediately visible to all transactions
* (they violate the MVCC pattern). So, this flag can only be used when creating the compressed
* chunk in the same transaction as the compressed tuples are inserted.
*
* If this isn't the case, then tuples can be seen multiple times by parallel readers - once in
* the uncompressed part of the hypertable (since they are not deleted in the transaction) and
* once in the compressed part of the hypertable since the MVCC semantic is violated due to the
* flag.
*
* In contrast, when the compressed chunk part is created in the same transaction as the tuples
* are written, the compressed chunk (i.e., the catalog entry) becomes visible to other
* transactions only after the transaction that performs the compression is commited and
* the uncompressed chunk is truncated.
*/
int insert_options = new_compressed_chunk ? HEAP_INSERT_FROZEN : 0;

/* convert list to array of pointers for compress_chunk */
colinfo_array = palloc(sizeof(ColumnCompressionInfo *) * htcols_listlen);
foreach (lc, htcols_list)
Expand All @@ -498,7 +522,8 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
cstat = compress_chunk(cxt.srcht_chunk->table_id,
compress_ht_chunk->table_id,
colinfo_array,
htcols_listlen);
htcols_listlen,
insert_options);

/* Drop all FK constraints on the uncompressed chunk. This is needed to allow
* cascading deleted data in FK-referenced tables, while blocking deleting data
Expand All @@ -514,7 +539,8 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
compress_ht_chunk->fd.id,
&after_size,
cstat.rowcnt_pre_compression,
cstat.rowcnt_post_compression);
cstat.rowcnt_post_compression,
cstat.rowcnt_frozen);

/* Copy chunk constraints (including fkey) to compressed chunk.
* Do this after compressing the chunk to avoid holding strong, unnecessary locks on the
Expand Down Expand Up @@ -811,7 +837,8 @@ tsl_create_compressed_chunk(PG_FUNCTION_ARGS)
compress_ht_chunk->fd.id,
&compressed_size,
numrows_pre_compression,
numrows_post_compression);
numrows_post_compression,
0);

chunk_was_compressed = ts_chunk_is_compressed(cxt.srcht_chunk);
ts_chunk_set_compressed_chunk(cxt.srcht_chunk, compress_ht_chunk->fd.id);
Expand Down Expand Up @@ -1071,7 +1098,8 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS)
in_column_offsets,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
true /*reset_sequence*/);
true /*reset_sequence*/,
0 /*insert options*/);

/*
* Keep the ExclusiveLock on the compressed chunk. This lock will be requested
Expand Down Expand Up @@ -1372,7 +1400,8 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
in_column_offsets,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
true /*reset_sequence*/);
true /*reset_sequence*/,
0 /*insert options*/);

/* create an array of the segmentby column offsets in the compressed chunk */
int16 *segmentby_column_offsets_compressed =
Expand Down
20 changes: 16 additions & 4 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <utils/fmgroids.h>
#include <utils/lsyscache.h>
#include <utils/memutils.h>
#include <utils/portal.h>
#include <utils/rel.h>
#include <utils/relcache.h>
#include <utils/snapmgr.h>
Expand All @@ -53,6 +54,7 @@
#include "create.h"
#include "custom_type_cache.h"
#include "arrow_c_data_interface.h"
#include "debug_assert.h"
#include "debug_point.h"
#include "deltadelta.h"
#include "dictionary.h"
Expand Down Expand Up @@ -223,7 +225,7 @@ truncate_relation(Oid table_oid)

CompressionStats
compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column_compression_info,
int num_compression_infos)
int num_compression_infos, int insert_options)
{
int n_keys;
ListCell *lc;
Expand Down Expand Up @@ -399,7 +401,8 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
in_column_offsets,
out_desc->natts,
true /*need_bistate*/,
false /*reset_sequence*/);
false /*reset_sequence*/,
insert_options);

if (matched_index_rel != NULL)
{
Expand Down Expand Up @@ -441,12 +444,19 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
}

row_compressor_finish(&row_compressor);
DEBUG_WAITPOINT("compression_done_before_truncate_uncompressed");
truncate_relation(in_table);

table_close(out_rel, NoLock);
table_close(in_rel, NoLock);
cstat.rowcnt_pre_compression = row_compressor.rowcnt_pre_compression;
cstat.rowcnt_post_compression = row_compressor.num_compressed_rows;

if ((insert_options & HEAP_INSERT_FROZEN) == HEAP_INSERT_FROZEN)
cstat.rowcnt_frozen = row_compressor.num_compressed_rows;
else
cstat.rowcnt_frozen = 0;

return cstat;
}

Expand Down Expand Up @@ -836,7 +846,8 @@ void
row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_desc,
Relation compressed_table, int num_compression_infos,
const ColumnCompressionInfo **column_compression_info, int16 *in_column_offsets,
int16 num_columns_in_compressed_table, bool need_bistate, bool reset_sequence)
int16 num_columns_in_compressed_table, bool need_bistate, bool reset_sequence,
int insert_options)
{
TupleDesc out_desc = RelationGetDescr(compressed_table);
int col;
Expand Down Expand Up @@ -883,6 +894,7 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_
.sequence_num = SEQUENCE_NUM_GAP,
.reset_sequence = reset_sequence,
.first_iteration = true,
.insert_options = insert_options,
};

memset(row_compressor->compressed_is_null, 1, sizeof(bool) * num_columns_in_compressed_table);
Expand Down Expand Up @@ -1214,7 +1226,7 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
heap_insert(row_compressor->compressed_table,
compressed_tuple,
mycid,
0 /*=options*/,
row_compressor->insert_options /*=options*/,
row_compressor->bistate);
if (row_compressor->resultRelInfo->ri_NumIndices > 0)
{
Expand Down
7 changes: 5 additions & 2 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ typedef struct CompressionStats
{
int64 rowcnt_pre_compression;
int64 rowcnt_post_compression;
int64 rowcnt_frozen;
} CompressionStats;

typedef struct PerColumn
Expand Down Expand Up @@ -265,6 +266,8 @@ typedef struct RowCompressor
bool reset_sequence;
/* flag for checking if we are working on the first tuple */
bool first_iteration;
/* the heap insert options */
int insert_options;
} RowCompressor;

/* SegmentFilter is used for filtering segments based on qualifiers */
Expand Down Expand Up @@ -313,7 +316,7 @@ pg_attribute_unused() assert_num_compression_algorithms_sane(void)
extern CompressionStorage compression_get_toast_storage(CompressionAlgorithms algo);
extern CompressionStats compress_chunk(Oid in_table, Oid out_table,
const ColumnCompressionInfo **column_compression_info,
int num_compression_infos);
int num_compression_infos, int insert_options);
extern void decompress_chunk(Oid in_table, Oid out_table);

extern DecompressionIterator *(*tsl_get_decompression_iterator_init(
Expand Down Expand Up @@ -355,7 +358,7 @@ extern void row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompr
Relation compressed_table, int num_compression_infos,
const ColumnCompressionInfo **column_compression_info,
int16 *column_offsets, int16 num_columns_in_compressed_table,
bool need_bistate, bool reset_sequence);
bool need_bistate, bool reset_sequence, int insert_options);
extern void row_compressor_finish(RowCompressor *row_compressor);
extern void populate_per_compressed_columns_from_data(PerCompressedColumn *per_compressed_cols,
int16 num_cols, Datum *compressed_datums,
Expand Down
File renamed without changes.
Loading

0 comments on commit 8767de6

Please sign in to comment.