Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect relation-level stats during compression #7520

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions src/ts_catalog/chunk_column_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ ts_chunk_column_stats_lookup(int32 hypertable_id, int32 chunk_id, const char *co
* updated.
*/
int
ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk)
ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk,
ChunkColumnStats **statsarray)
{
Size i = 0;
ChunkRangeSpace *rs = ht->range_space;
Expand All @@ -755,22 +756,38 @@ ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk)

for (int range_index = 0; range_index < rs->num_range_cols; range_index++)
{
Datum minmax[2];
const Form_chunk_column_stats form = &rs->range_cols[range_index];
const ChunkColumnStats *colstats = NULL;
ChunkColumnStats computed_stats;

AttrNumber attno;
char *col_name = NameStr(rs->range_cols[range_index].column_name);
const char *col_name = NameStr(form->column_name);
Oid col_type;

/* Get the attribute number in the HT for this column, and map to the chunk */
/* TODO: fix unnecessary mapping */
attno = get_attnum(ht->main_table_relid, col_name);
attno = ts_map_attno(ht->main_table_relid, chunk->table_id, attno);
col_type = get_atttype(ht->main_table_relid, attno);

if (statsarray)
colstats = statsarray[AttrNumberGetAttrOffset(attno)];

if (NULL == colstats && ts_chunk_get_minmax(chunk->table_id,
col_type,
attno,
"column range",
computed_stats.minmax))
{
colstats = &computed_stats;
}

/* calculate the min/max range for this column on this chunk */
if (ts_chunk_get_minmax(chunk->table_id, col_type, attno, "column range", minmax))
if (colstats)
{
Form_chunk_column_stats range;
int64 min = ts_time_value_to_internal(minmax[0], col_type);
int64 max = ts_time_value_to_internal(minmax[1], col_type);
int64 min = ts_time_value_to_internal(colstats->minmax[0], col_type);
int64 max = ts_time_value_to_internal(colstats->minmax[1], col_type);

/* The end value is exclusive to the range, so incr by 1 */
if (max != DIMENSION_SLICE_MAXVALUE)
Expand Down Expand Up @@ -821,7 +838,8 @@ ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk)
}
}
else
ereport(WARNING, errmsg("unable to calculate min/max values for column ranges"));
ereport(WARNING,
errmsg("unable to calculate min/max column range for \"%s\"", col_name));
}

MemoryContextSwitchTo(orig_mcxt);
Expand Down
10 changes: 9 additions & 1 deletion src/ts_catalog/chunk_column_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ typedef struct ChunkRangeSpace
FormData_chunk_column_stats range_cols[FLEXIBLE_ARRAY_MEMBER];
} ChunkRangeSpace;

typedef struct ChunkColumnStats
{
/* Min and max, in that order */
Datum minmax[2];
bool isnull[2];
} ChunkColumnStats;

#define CHUNKRANGESPACE_SIZE(num_columns) \
(sizeof(ChunkRangeSpace) + (sizeof(NameData) * (num_columns)))

Expand All @@ -35,7 +42,8 @@ extern int ts_chunk_column_stats_update_by_id(int32 chunk_column_stats_id,
extern Form_chunk_column_stats ts_chunk_column_stats_lookup(int32 hypertable_id, int32 chunk_id,
const char *col_name);

extern TSDLLEXPORT int ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk);
extern TSDLLEXPORT int ts_chunk_column_stats_calculate(const Hypertable *ht, const Chunk *chunk,
ChunkColumnStats **statsarray);
extern int ts_chunk_column_stats_insert(const Hypertable *ht, const Chunk *chunk);

extern void ts_chunk_column_stats_drop(const Hypertable *ht, const char *col_name, bool *dropped);
Expand Down
11 changes: 8 additions & 3 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,15 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
* In the future, we can look at computing min/max entries in the compressed chunk
* using the batch metadata and then recompute the range to handle DELETE cases.
*/
if (cxt.srcht->range_space)
ts_chunk_column_stats_calculate(cxt.srcht, cxt.srcht_chunk);

cstat = compress_chunk(cxt.srcht_chunk->table_id, compress_ht_chunk->table_id, insert_options);

if (cxt.srcht->range_space && cstat.colstats)
{
ts_chunk_column_stats_calculate(cxt.srcht, cxt.srcht_chunk, cstat.colstats);
pfree(cstat.colstats);
}

after_size = ts_relation_size_impl(compress_ht_chunk->table_id);

if (new_compressed_chunk)
Expand Down Expand Up @@ -1370,7 +1375,7 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
*/
Hypertable *ht = ts_hypertable_get_by_id(uncompressed_chunk->fd.hypertable_id);
if (ht->range_space)
ts_chunk_column_stats_calculate(ht, uncompressed_chunk);
ts_chunk_column_stats_calculate(ht, uncompressed_chunk, NULL);

/*************** tuplesort state *************************/
Tuplesortstate *segment_tuplesortstate;
Expand Down
104 changes: 76 additions & 28 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <libpq/pqformat.h>
#include <storage/predicate.h>
#include <utils/datum.h>
#include <utils/palloc.h>
#include <utils/snapmgr.h>
#include <utils/syscache.h>
#include <utils/typcache.h>
Expand All @@ -34,6 +35,7 @@
#include "segment_meta.h"
#include "ts_catalog/array_utils.h"
#include "ts_catalog/catalog.h"
#include "ts_catalog/chunk_column_stats.h"
#include "ts_catalog/compression_chunk_size.h"
#include "ts_catalog/compression_settings.h"

Expand Down Expand Up @@ -287,6 +289,7 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)

TupleDesc in_desc = RelationGetDescr(in_rel);
TupleDesc out_desc = RelationGetDescr(out_rel);

/* Before calling row compressor relation should be segmented and sorted as configured
* by compress_segmentby and compress_orderby.
* Cost of sorting can be mitigated if we find an existing BTREE index defined for
Expand Down Expand Up @@ -494,7 +497,8 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
tuplesort_end(sorted_rel);
}

row_compressor_close(&row_compressor);
cstat.colstats = row_compressor_close(&row_compressor);

if (!ts_guc_enable_delete_after_compression)
{
DEBUG_WAITPOINT("compression_done_before_truncate_uncompressed");
Expand Down Expand Up @@ -720,6 +724,17 @@ build_column_map(CompressionSettings *settings, Relation uncompressed_table,
bool is_segmentby = ts_array_is_member(settings->fd.segmentby, NameStr(attr->attname));
bool is_orderby = ts_array_is_member(settings->fd.orderby, NameStr(attr->attname));

SegmentMetaMinMaxBuilder *segment_min_max_builder = NULL;
TypeCacheEntry *type = lookup_type_cache(attr->atttypid, TYPECACHE_LT_OPR);

if (OidIsValid(type->lt_opr))
{
/* Always run the min-max builder if the type allows. It is
* useful to collect, e.g., column stats for chunk skipping. */
segment_min_max_builder =
segment_meta_min_max_builder_create(attr->atttypid, attr->attcollation);
}

if (!is_segmentby)
{
if (compressed_column_attr->atttypid != compressed_data_type_oid)
Expand All @@ -742,18 +757,6 @@ build_column_map(CompressionSettings *settings, Relation uncompressed_table,
int16 segment_min_attr_offset = segment_min_attr_number - 1;
int16 segment_max_attr_offset = segment_max_attr_number - 1;

SegmentMetaMinMaxBuilder *segment_min_max_builder = NULL;
if (segment_min_attr_number != InvalidAttrNumber ||
segment_max_attr_number != InvalidAttrNumber)
{
Ensure(segment_min_attr_number != InvalidAttrNumber,
"could not find the min metadata column");
Ensure(segment_max_attr_number != InvalidAttrNumber,
"could not find the min metadata column");
segment_min_max_builder =
segment_meta_min_max_builder_create(attr->atttypid, attr->attcollation);
}

Ensure(!is_orderby || segment_min_max_builder != NULL,
"orderby columns must have minmax metadata");

Expand All @@ -777,6 +780,7 @@ build_column_map(CompressionSettings *settings, Relation uncompressed_table,
.segmentby_column_index = index,
.min_metadata_attr_offset = -1,
.max_metadata_attr_offset = -1,
.min_max_metadata_builder = segment_min_max_builder,
};
}
}
Expand Down Expand Up @@ -965,7 +969,9 @@ row_compressor_append_row(RowCompressor *row_compressor, TupleTableSlot *row)
bool is_null;
Datum val;

/* if there is no compressor, this must be a segmenter, so just skip */
/* if there is no compressor, this must be a segmenter, so just
* skip. Note that, for segmentby columns, min/max stats are updated
* per segment (on flush) for instead of per row. */
if (compressor == NULL)
continue;

Expand Down Expand Up @@ -1024,11 +1030,9 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
row_compressor->compressed_values[compressed_col] =
PointerGetDatum(compressed_data);

if (column->min_max_metadata_builder != NULL)
if (column->min_max_metadata_builder != NULL && column->min_metadata_attr_offset >= 0 &&
column->max_metadata_attr_offset >= 0)
{
Assert(column->min_metadata_attr_offset >= 0);
Assert(column->max_metadata_attr_offset >= 0);

if (!segment_meta_min_max_builder_empty(column->min_max_metadata_builder))
{
Assert(compressed_data != NULL);
Expand All @@ -1050,6 +1054,17 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
}
else if (column->segment_info != NULL)
{
/* Update min/max for segmentby column. It is done here on flush
* instead of per row since for the segment the value is always
* the same. */
if (column->min_max_metadata_builder != NULL)
{
if (column->segment_info->is_null)
segment_meta_min_max_builder_update_null(column->min_max_metadata_builder);
else
segment_meta_min_max_builder_update_val(column->min_max_metadata_builder,
column->segment_info->val);
}
row_compressor->compressed_values[compressed_col] = column->segment_info->val;
row_compressor->compressed_is_null[compressed_col] = column->segment_info->is_null;
}
Expand Down Expand Up @@ -1091,23 +1106,31 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change

/* don't free the segment-bys if we've overflowed the row, we still need them */
if (column->segment_info != NULL && !changed_groups)
{
/* Still need to reset the min/max builder to save per-column
* min/max based on per-segment min/max. */
segment_meta_min_max_builder_reset(column->min_max_metadata_builder);
continue;
}

if (column->compressor != NULL || !column->segment_info->typ_by_val)
pfree(DatumGetPointer(row_compressor->compressed_values[compressed_col]));

if (column->min_max_metadata_builder != NULL)
{
/* segment_meta_min_max_builder_reset will free the values, so clear here */
if (!row_compressor->compressed_is_null[column->min_metadata_attr_offset])
/* segment_meta_min_max_builder_reset will free the values, so clear here */
if (column->min_metadata_attr_offset > 0 && column->max_metadata_attr_offset > 0)
{
row_compressor->compressed_values[column->min_metadata_attr_offset] = 0;
row_compressor->compressed_is_null[column->min_metadata_attr_offset] = true;
}
if (!row_compressor->compressed_is_null[column->max_metadata_attr_offset])
{
row_compressor->compressed_values[column->max_metadata_attr_offset] = 0;
row_compressor->compressed_is_null[column->max_metadata_attr_offset] = true;
if (!row_compressor->compressed_is_null[column->min_metadata_attr_offset])
{
row_compressor->compressed_values[column->min_metadata_attr_offset] = 0;
row_compressor->compressed_is_null[column->min_metadata_attr_offset] = true;
}
if (!row_compressor->compressed_is_null[column->max_metadata_attr_offset])
{
row_compressor->compressed_values[column->max_metadata_attr_offset] = 0;
row_compressor->compressed_is_null[column->max_metadata_attr_offset] = true;
}
}
segment_meta_min_max_builder_reset(column->min_max_metadata_builder);
}
Expand All @@ -1133,12 +1156,37 @@ row_compressor_reset(RowCompressor *row_compressor)
row_compressor->first_iteration = true;
}

void
ChunkColumnStats **
row_compressor_close(RowCompressor *row_compressor)
{
if (row_compressor->bistate)
FreeBulkInsertState(row_compressor->bistate);
CatalogCloseIndexes(row_compressor->resultRelInfo);

ChunkColumnStats **colstats =
palloc(sizeof(ChunkColumnStats *) * row_compressor->n_input_columns);

/* Get any relation-level stats (min and max) collected during compression
* and return it to caller */
for (int i = 0; i < row_compressor->n_input_columns; i++)
{
const PerColumn *column = &row_compressor->per_column[i];
SegmentMetaMinMaxBuilder *builder = column->min_max_metadata_builder;

if (builder && segment_meta_has_relation_stats(builder))
{
ChunkColumnStats *colstat = palloc(sizeof(ChunkColumnStats));
colstat->minmax[0] = segment_meta_min_max_builder_relation_min(builder);
colstat->minmax[1] = segment_meta_min_max_builder_relation_max(builder);
colstats[i] = colstat;
}
else
{
colstats[i] = NULL;
}
}

return colstats;
}

/******************
Expand Down
6 changes: 5 additions & 1 deletion tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <fmgr.h>
#include <lib/stringinfo.h>
#include <nodes/execnodes.h>
#include <utils/palloc.h>
#include <utils/relcache.h>

typedef struct BulkInsertStateData *BulkInsertState;
Expand Down Expand Up @@ -200,11 +201,14 @@ typedef enum CompressionAlgorithm
_MAX_NUM_COMPRESSION_ALGORITHMS = 128,
} CompressionAlgorithm;

typedef struct ChunkColumnStats ChunkColumnStats;

typedef struct CompressionStats
{
int64 rowcnt_pre_compression;
int64 rowcnt_post_compression;
int64 rowcnt_frozen;
ChunkColumnStats **colstats;
} CompressionStats;

typedef struct PerColumn
Expand Down Expand Up @@ -368,7 +372,7 @@ extern void row_compressor_init(CompressionSettings *settings, RowCompressor *ro
int16 num_columns_in_compressed_table, bool need_bistate,
int insert_options);
extern void row_compressor_reset(RowCompressor *row_compressor);
extern void row_compressor_close(RowCompressor *row_compressor);
extern struct ChunkColumnStats **row_compressor_close(RowCompressor *row_compressor);
extern void row_compressor_append_sorted_rows(RowCompressor *row_compressor,
Tuplesortstate *sorted_rel, TupleDesc sorted_desc,
Relation in_rel);
Expand Down
Loading
Loading