Skip to content

Commit

Permalink
Reduce WAL activity by freezing tuples immediately
Browse files Browse the repository at this point in the history
When we compress a chunk, we create a new compressed chunk for storing
the compressed data. So far, the tuples were just inserted into the
compressed chunk and frozen by a later vacuum run.

However, this can be optimized because the compressed chunk is created
in the same transaction as the tuples. This patch reduces the WAL
activity by storing these chunks as frozen and preventing a vacuum run
in the future (basically the same as implemented in COPY FREEZE).
  • Loading branch information
jnidzwetzki committed Sep 29, 2023
1 parent 6bef59a commit f760963
Show file tree
Hide file tree
Showing 21 changed files with 3,728 additions and 47 deletions.
1 change: 1 addition & 0 deletions .unreleased/feature_5890
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #5890 Reduce WAL activity by freezing compressed tuples immediately
21 changes: 18 additions & 3 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,18 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
compress_ht_chunk = ts_chunk_get_by_id(mergable_chunk->fd.compressed_chunk_id, true);
result_chunk_id = mergable_chunk->table_id;
}

/* Since the compressed relation is created in the same transaction as the tuples that will be
* written by the compressor, we can insert the tuple directly in frozen state. This is the same
* logic as performed in COPY INSERT FROZEN.
*
* Tuples written with HEAP_INSERT_FROZEN become immediately within all transactions. There are
* some Ensures in compress_chunk() to ensure that this flag is only passed when the option
* can only be passed when the creation of the compressed relation is done in the same
* transaction.
*/
int insert_options = new_compressed_chunk ? HEAP_INSERT_FROZEN : 0;

/* convert list to array of pointers for compress_chunk */
colinfo_array = palloc(sizeof(ColumnCompressionInfo *) * htcols_listlen);
foreach (lc, htcols_list)
Expand All @@ -498,7 +510,8 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
cstat = compress_chunk(cxt.srcht_chunk->table_id,
compress_ht_chunk->table_id,
colinfo_array,
htcols_listlen);
htcols_listlen,
insert_options);

/* Drop all FK constraints on the uncompressed chunk. This is needed to allow
* cascading deleted data in FK-referenced tables, while blocking deleting data
Expand Down Expand Up @@ -1071,7 +1084,8 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS)
in_column_offsets,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
true /*reset_sequence*/);
true /*reset_sequence*/,
0 /*insert options*/);

/*
* Keep the ExclusiveLock on the compressed chunk. This lock will be requested
Expand Down Expand Up @@ -1370,7 +1384,8 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
in_column_offsets,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
true /*reset_sequence*/);
true /*reset_sequence*/,
0 /*insert options*/);

/* create an array of the segmentby column offsets in the compressed chunk */
int16 *segmentby_column_offsets_compressed =
Expand Down
14 changes: 10 additions & 4 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <utils/fmgroids.h>
#include <utils/lsyscache.h>
#include <utils/memutils.h>
#include <utils/portal.h>
#include <utils/rel.h>
#include <utils/relcache.h>
#include <utils/snapmgr.h>
Expand All @@ -53,6 +54,7 @@
#include "create.h"
#include "custom_type_cache.h"
#include "arrow_c_data_interface.h"
#include "debug_assert.h"
#include "debug_point.h"
#include "deltadelta.h"
#include "dictionary.h"
Expand Down Expand Up @@ -222,7 +224,7 @@ truncate_relation(Oid table_oid)

CompressionStats
compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column_compression_info,
int num_compression_infos)
int num_compression_infos, int insert_options)
{
int n_keys;
ListCell *lc;
Expand Down Expand Up @@ -398,7 +400,8 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
in_column_offsets,
out_desc->natts,
true /*need_bistate*/,
false /*reset_sequence*/);
false /*reset_sequence*/,
insert_options);

if (matched_index_rel != NULL)
{
Expand Down Expand Up @@ -440,6 +443,7 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
}

row_compressor_finish(&row_compressor);
DEBUG_WAITPOINT("compression_done_before_truncate_uncompressed");
truncate_relation(in_table);

table_close(out_rel, NoLock);
Expand Down Expand Up @@ -835,7 +839,8 @@ void
row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_desc,
Relation compressed_table, int num_compression_infos,
const ColumnCompressionInfo **column_compression_info, int16 *in_column_offsets,
int16 num_columns_in_compressed_table, bool need_bistate, bool reset_sequence)
int16 num_columns_in_compressed_table, bool need_bistate, bool reset_sequence,
int insert_options)
{
TupleDesc out_desc = RelationGetDescr(compressed_table);
int col;
Expand Down Expand Up @@ -882,6 +887,7 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_
.sequence_num = SEQUENCE_NUM_GAP,
.reset_sequence = reset_sequence,
.first_iteration = true,
.insert_options = insert_options,
};

memset(row_compressor->compressed_is_null, 1, sizeof(bool) * num_columns_in_compressed_table);
Expand Down Expand Up @@ -1213,7 +1219,7 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
heap_insert(row_compressor->compressed_table,
compressed_tuple,
mycid,
0 /*=options*/,
row_compressor->insert_options /*=options*/,
row_compressor->bistate);
if (row_compressor->resultRelInfo->ri_NumIndices > 0)
{
Expand Down
6 changes: 4 additions & 2 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ typedef struct RowCompressor
bool reset_sequence;
/* flag for checking if we are working on the first tuple */
bool first_iteration;
/* the heap insert options */
int insert_options;
} RowCompressor;

/* SegmentFilter is used for filtering segments based on qualifiers */
Expand Down Expand Up @@ -310,7 +312,7 @@ pg_attribute_unused() assert_num_compression_algorithms_sane(void)
extern CompressionStorage compression_get_toast_storage(CompressionAlgorithms algo);
extern CompressionStats compress_chunk(Oid in_table, Oid out_table,
const ColumnCompressionInfo **column_compression_info,
int num_compression_infos);
int num_compression_infos, int insert_options);
extern void decompress_chunk(Oid in_table, Oid out_table);

extern DecompressionIterator *(*tsl_get_decompression_iterator_init(
Expand Down Expand Up @@ -351,7 +353,7 @@ extern void row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompr
Relation compressed_table, int num_compression_infos,
const ColumnCompressionInfo **column_compression_info,
int16 *column_offsets, int16 num_columns_in_compressed_table,
bool need_bistate, bool reset_sequence);
bool need_bistate, bool reset_sequence, int insert_options);
extern void row_compressor_finish(RowCompressor *row_compressor);
extern void populate_per_compressed_columns_from_data(PerCompressedColumn *per_compressed_cols,
int16 num_cols, Datum *compressed_datums,
Expand Down
File renamed without changes.
Loading

0 comments on commit f760963

Please sign in to comment.