Skip to content

Commit

Permalink
Collect relation-level stats during compression
Browse files Browse the repository at this point in the history
During compression, column min/max stats are collected on a
per-segment basis for orderby columns and those that have indexes.

This change uses the same mechanism to collect relation-level min/max
stats to be used by chunk skipping. This avoids, in worst case, an
extra full table scan to gather these chunk column stats.

For simplicity, stats gathering is enabled for all columns that can
support it, even though a column might use neither segment-level stats
nor relation-level (chunk column) stats. The overhead of collecting
min/max values should be negligible.
  • Loading branch information
erimatnor committed Dec 5, 2024
1 parent 396caf6 commit 25d61ff
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 85 deletions.
32 changes: 25 additions & 7 deletions src/ts_catalog/chunk_column_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ ts_chunk_column_stats_lookup(int32 hypertable_id, int32 chunk_id, const char *co
* updated.
*/
int
ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk)
ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk,
ChunkColumnStats **statsarray)
{
Size i = 0;
ChunkRangeSpace *rs = ht->range_space;
Expand All @@ -755,22 +756,38 @@ ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk)

for (int range_index = 0; range_index < rs->num_range_cols; range_index++)
{
Datum minmax[2];
const Form_chunk_column_stats form = &rs->range_cols[range_index];
const ChunkColumnStats *colstats = NULL;
ChunkColumnStats computed_stats;

AttrNumber attno;
char *col_name = NameStr(rs->range_cols[range_index].column_name);
const char *col_name = NameStr(form->column_name);
Oid col_type;

/* Get the attribute number in the HT for this column, and map to the chunk */
/* TODO: fix unnecessary mapping */
attno = get_attnum(ht->main_table_relid, col_name);
attno = ts_map_attno(ht->main_table_relid, chunk->table_id, attno);
col_type = get_atttype(ht->main_table_relid, attno);

if (statsarray)
colstats = statsarray[AttrNumberGetAttrOffset(attno)];

if (NULL == colstats && ts_chunk_get_minmax(chunk->table_id,
col_type,
attno,
"column range",
computed_stats.minmax))
{
colstats = &computed_stats;
}

/* calculate the min/max range for this column on this chunk */
if (ts_chunk_get_minmax(chunk->table_id, col_type, attno, "column range", minmax))
if (colstats)
{
Form_chunk_column_stats range;
int64 min = ts_time_value_to_internal(minmax[0], col_type);
int64 max = ts_time_value_to_internal(minmax[1], col_type);
int64 min = ts_time_value_to_internal(colstats->minmax[0], col_type);
int64 max = ts_time_value_to_internal(colstats->minmax[1], col_type);

/* The end value is exclusive to the range, so incr by 1 */
if (max != DIMENSION_SLICE_MAXVALUE)
Expand Down Expand Up @@ -821,7 +838,8 @@ ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk)
}
}
else
ereport(WARNING, errmsg("unable to calculate min/max values for column ranges"));
ereport(WARNING,
errmsg("unable to calculate min/max column range for \"%s\"", col_name));
}

MemoryContextSwitchTo(orig_mcxt);
Expand Down
10 changes: 9 additions & 1 deletion src/ts_catalog/chunk_column_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ typedef struct ChunkRangeSpace
FormData_chunk_column_stats range_cols[FLEXIBLE_ARRAY_MEMBER];
} ChunkRangeSpace;

typedef struct ChunkColumnStats
{
/* Min and max, in that order */
Datum minmax[2];
bool isnull[2];
} ChunkColumnStats;

#define CHUNKRANGESPACE_SIZE(num_columns) \
(sizeof(ChunkRangeSpace) + (sizeof(NameData) * (num_columns)))

Expand All @@ -35,7 +42,8 @@ extern int ts_chunk_column_stats_update_by_id(int32 chunk_column_stats_id,
extern Form_chunk_column_stats ts_chunk_column_stats_lookup(int32 hypertable_id, int32 chunk_id,
const char *col_name);

extern TSDLLEXPORT int ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk);
extern TSDLLEXPORT int ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk,
ChunkColumnStats **statsarray);
extern int ts_chunk_column_stats_insert(const Hypertable *ht, const Chunk *chunk);

extern void ts_chunk_column_stats_drop(const Hypertable *ht, const char *col_name, bool *dropped);
Expand Down
11 changes: 8 additions & 3 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,15 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
* In the future, we can look at computing min/max entries in the compressed chunk
* using the batch metadata and then recompute the range to handle DELETE cases.
*/
if (cxt.srcht->range_space)
ts_chunk_column_stats_calculate(cxt.srcht, cxt.srcht_chunk);

cstat = compress_chunk(cxt.srcht_chunk->table_id, compress_ht_chunk->table_id, insert_options);

if (cxt.srcht->range_space && cstat.colstats)
{
ts_chunk_column_stats_calculate(cxt.srcht, cxt.srcht_chunk, cstat.colstats);
pfree(cstat.colstats);
}

after_size = ts_relation_size_impl(compress_ht_chunk->table_id);

if (new_compressed_chunk)
Expand Down Expand Up @@ -1370,7 +1375,7 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
*/
Hypertable *ht = ts_hypertable_get_by_id(uncompressed_chunk->fd.hypertable_id);
if (ht->range_space)
ts_chunk_column_stats_calculate(ht, uncompressed_chunk);
ts_chunk_column_stats_calculate(ht, uncompressed_chunk, NULL);

/*************** tuplesort state *************************/
Tuplesortstate *segment_tuplesortstate;
Expand Down
104 changes: 76 additions & 28 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <libpq/pqformat.h>
#include <storage/predicate.h>
#include <utils/datum.h>
#include <utils/palloc.h>
#include <utils/snapmgr.h>
#include <utils/syscache.h>
#include <utils/typcache.h>
Expand All @@ -34,6 +35,7 @@
#include "segment_meta.h"
#include "ts_catalog/array_utils.h"
#include "ts_catalog/catalog.h"
#include "ts_catalog/chunk_column_stats.h"
#include "ts_catalog/compression_chunk_size.h"
#include "ts_catalog/compression_settings.h"

Expand Down Expand Up @@ -287,6 +289,7 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)

TupleDesc in_desc = RelationGetDescr(in_rel);
TupleDesc out_desc = RelationGetDescr(out_rel);

/* Before calling row compressor relation should be segmented and sorted as configured
* by compress_segmentby and compress_orderby.
* Cost of sorting can be mitigated if we find an existing BTREE index defined for
Expand Down Expand Up @@ -494,7 +497,8 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
tuplesort_end(sorted_rel);
}

row_compressor_close(&row_compressor);
cstat.colstats = row_compressor_close(&row_compressor);

if (!ts_guc_enable_delete_after_compression)
{
DEBUG_WAITPOINT("compression_done_before_truncate_uncompressed");
Expand Down Expand Up @@ -720,6 +724,17 @@ build_column_map(CompressionSettings *settings, Relation uncompressed_table,
bool is_segmentby = ts_array_is_member(settings->fd.segmentby, NameStr(attr->attname));
bool is_orderby = ts_array_is_member(settings->fd.orderby, NameStr(attr->attname));

SegmentMetaMinMaxBuilder *segment_min_max_builder = NULL;
TypeCacheEntry *type = lookup_type_cache(attr->atttypid, TYPECACHE_LT_OPR);

if (OidIsValid(type->lt_opr))
{
/* Always run the min-max builder if the type allows. It is
* useful to collect, e.g., column stats for chunk skipping. */
segment_min_max_builder =
segment_meta_min_max_builder_create(attr->atttypid, attr->attcollation);
}

if (!is_segmentby)
{
if (compressed_column_attr->atttypid != compressed_data_type_oid)
Expand All @@ -742,18 +757,6 @@ build_column_map(CompressionSettings *settings, Relation uncompressed_table,
int16 segment_min_attr_offset = segment_min_attr_number - 1;
int16 segment_max_attr_offset = segment_max_attr_number - 1;

SegmentMetaMinMaxBuilder *segment_min_max_builder = NULL;
if (segment_min_attr_number != InvalidAttrNumber ||
segment_max_attr_number != InvalidAttrNumber)
{
Ensure(segment_min_attr_number != InvalidAttrNumber,
"could not find the min metadata column");
Ensure(segment_max_attr_number != InvalidAttrNumber,
"could not find the min metadata column");
segment_min_max_builder =
segment_meta_min_max_builder_create(attr->atttypid, attr->attcollation);
}

Ensure(!is_orderby || segment_min_max_builder != NULL,
"orderby columns must have minmax metadata");

Expand All @@ -777,6 +780,7 @@ build_column_map(CompressionSettings *settings, Relation uncompressed_table,
.segmentby_column_index = index,
.min_metadata_attr_offset = -1,
.max_metadata_attr_offset = -1,
.min_max_metadata_builder = segment_min_max_builder,
};
}
}
Expand Down Expand Up @@ -965,7 +969,9 @@ row_compressor_append_row(RowCompressor *row_compressor, TupleTableSlot *row)
bool is_null;
Datum val;

/* if there is no compressor, this must be a segmenter, so just skip */
/* if there is no compressor, this must be a segmenter, so just
* skip. Note that, for segmentby columns, min/max stats are updated
* per segment (on flush) for instead of per row. */
if (compressor == NULL)
continue;

Expand Down Expand Up @@ -1024,11 +1030,9 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
row_compressor->compressed_values[compressed_col] =
PointerGetDatum(compressed_data);

if (column->min_max_metadata_builder != NULL)
if (column->min_max_metadata_builder != NULL && column->min_metadata_attr_offset >= 0 &&
column->max_metadata_attr_offset >= 0)
{
Assert(column->min_metadata_attr_offset >= 0);
Assert(column->max_metadata_attr_offset >= 0);

if (!segment_meta_min_max_builder_empty(column->min_max_metadata_builder))
{
Assert(compressed_data != NULL);
Expand All @@ -1050,6 +1054,17 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
}
else if (column->segment_info != NULL)
{
/* Update min/max for segmentby column. It is done here on flush
* instead of per row since for the segment the value is always
* the same. */
if (column->min_max_metadata_builder != NULL)
{
if (column->segment_info->is_null)
segment_meta_min_max_builder_update_null(column->min_max_metadata_builder);
else
segment_meta_min_max_builder_update_val(column->min_max_metadata_builder,
column->segment_info->val);
}
row_compressor->compressed_values[compressed_col] = column->segment_info->val;
row_compressor->compressed_is_null[compressed_col] = column->segment_info->is_null;
}
Expand Down Expand Up @@ -1091,23 +1106,31 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change

/* don't free the segment-bys if we've overflowed the row, we still need them */
if (column->segment_info != NULL && !changed_groups)
{
/* Still need to reset the min/max builder to save per-column
* min/max based on per-segment min/max. */
segment_meta_min_max_builder_reset(column->min_max_metadata_builder);
continue;
}

if (column->compressor != NULL || !column->segment_info->typ_by_val)
pfree(DatumGetPointer(row_compressor->compressed_values[compressed_col]));

if (column->min_max_metadata_builder != NULL)
{
/* segment_meta_min_max_builder_reset will free the values, so clear here */
if (!row_compressor->compressed_is_null[column->min_metadata_attr_offset])
/* segment_meta_min_max_builder_reset will free the values, so clear here */
if (column->min_metadata_attr_offset > 0 && column->max_metadata_attr_offset > 0)
{
row_compressor->compressed_values[column->min_metadata_attr_offset] = 0;
row_compressor->compressed_is_null[column->min_metadata_attr_offset] = true;
}
if (!row_compressor->compressed_is_null[column->max_metadata_attr_offset])
{
row_compressor->compressed_values[column->max_metadata_attr_offset] = 0;
row_compressor->compressed_is_null[column->max_metadata_attr_offset] = true;
if (!row_compressor->compressed_is_null[column->min_metadata_attr_offset])
{
row_compressor->compressed_values[column->min_metadata_attr_offset] = 0;
row_compressor->compressed_is_null[column->min_metadata_attr_offset] = true;
}
if (!row_compressor->compressed_is_null[column->max_metadata_attr_offset])
{
row_compressor->compressed_values[column->max_metadata_attr_offset] = 0;
row_compressor->compressed_is_null[column->max_metadata_attr_offset] = true;
}
}
segment_meta_min_max_builder_reset(column->min_max_metadata_builder);
}
Expand All @@ -1133,12 +1156,37 @@ row_compressor_reset(RowCompressor *row_compressor)
row_compressor->first_iteration = true;
}

void
ChunkColumnStats **
row_compressor_close(RowCompressor *row_compressor)
{
if (row_compressor->bistate)
FreeBulkInsertState(row_compressor->bistate);
CatalogCloseIndexes(row_compressor->resultRelInfo);

ChunkColumnStats **colstats =
palloc(sizeof(ChunkColumnStats *) * row_compressor->n_input_columns);

/* Get any relation-level stats (min and max) collected during compression
* and return it to caller */
for (int i = 0; i < row_compressor->n_input_columns; i++)
{
const PerColumn *column = &row_compressor->per_column[i];
SegmentMetaMinMaxBuilder *builder = column->min_max_metadata_builder;

if (builder && segment_meta_has_relation_stats(builder))
{
ChunkColumnStats *colstat = palloc(sizeof(ChunkColumnStats));
colstat->minmax[0] = segment_meta_min_max_builder_relation_min(builder);
colstat->minmax[1] = segment_meta_min_max_builder_relation_max(builder);
colstats[i] = colstat;
}
else
{
colstats[i] = NULL;
}
}

return colstats;
}

/******************
Expand Down
6 changes: 5 additions & 1 deletion tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <fmgr.h>
#include <lib/stringinfo.h>
#include <nodes/execnodes.h>
#include <utils/palloc.h>
#include <utils/relcache.h>

typedef struct BulkInsertStateData *BulkInsertState;
Expand Down Expand Up @@ -200,11 +201,14 @@ typedef enum CompressionAlgorithm
_MAX_NUM_COMPRESSION_ALGORITHMS = 128,
} CompressionAlgorithm;

typedef struct ChunkColumnStats ChunkColumnStats;

typedef struct CompressionStats
{
int64 rowcnt_pre_compression;
int64 rowcnt_post_compression;
int64 rowcnt_frozen;
ChunkColumnStats **colstats;
} CompressionStats;

typedef struct PerColumn
Expand Down Expand Up @@ -368,7 +372,7 @@ extern void row_compressor_init(CompressionSettings *settings, RowCompressor *ro
int16 num_columns_in_compressed_table, bool need_bistate,
int insert_options);
extern void row_compressor_reset(RowCompressor *row_compressor);
extern void row_compressor_close(RowCompressor *row_compressor);
extern struct ChunkColumnStats **row_compressor_close(RowCompressor *row_compressor);
extern void row_compressor_append_sorted_rows(RowCompressor *row_compressor,
Tuplesortstate *sorted_rel, TupleDesc sorted_desc,
Relation in_rel);
Expand Down
Loading

0 comments on commit 25d61ff

Please sign in to comment.