From 4e1a943563b95a0f1d70edf347c23404b908acd9 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 12 Oct 2023 13:52:24 +0200 Subject: [PATCH] bulk decompression of text columns --- tsl/src/compression/array.c | 83 +++++++++++++++++ tsl/src/compression/array.h | 7 ++ tsl/src/compression/compression.c | 23 ++++- tsl/src/compression/compression.h | 13 ++- tsl/src/compression/decompress_test_impl.c | 4 +- tsl/src/compression/dictionary.c | 65 +++++++++++++ tsl/src/compression/dictionary.h | 7 ++ .../compression/simple8b_rle_decompress_all.h | 2 +- .../nodes/decompress_chunk/compressed_batch.c | 93 +++++++++++++------ tsl/src/nodes/decompress_chunk/planner.c | 21 ++++- 10 files changed, 278 insertions(+), 40 deletions(-) diff --git a/tsl/src/compression/array.c b/tsl/src/compression/array.c index 58535be92e6..3c98cc4ead8 100644 --- a/tsl/src/compression/array.c +++ b/tsl/src/compression/array.c @@ -19,6 +19,8 @@ #include "compression/simple8b_rle.h" #include "datum_serialize.h" +#include "compression/arrow_c_data_interface.h" + /* A "compressed" array * uint8 has_nulls: 1 iff this has a nulls bitmap stored before the data * Oid element_type: the element stored by this array @@ -459,6 +461,87 @@ tsl_array_decompression_iterator_from_datum_reverse(Datum compressed_array, Oid return &iterator->base; } +static uint64 +pad64(uint64 value) +{ + return ((value + 63) / 64) * 64; +} + +#define ELEMENT_TYPE uint16 +#include "simple8b_rle_decompress_all.h" +#undef ELEMENT_TYPE + +ArrowArray * +tsl_text_array_decompress_all(Datum compressed_array, Oid element_type, MemoryContext dest_mctx) +{ + Assert(element_type == TEXTOID); + void *compressed_data = PG_DETOAST_DATUM(compressed_array); + StringInfoData si = { .data = compressed_data, .len = VARSIZE(compressed_data) }; + ArrayCompressed *header = consumeCompressedData(&si, sizeof(ArrayCompressed)); + + Assert(header->compression_algorithm == COMPRESSION_ALGORITHM_ARRAY); + CheckCompressedData(header->element_type == TEXTOID); + + return text_array_decompress_all_serialized_no_header(&si, header->has_nulls, dest_mctx); +} + +ArrowArray * +text_array_decompress_all_serialized_no_header(StringInfo si, bool has_nulls, + MemoryContext dest_mctx) +{ + Simple8bRleSerialized *nulls_serialized = NULL; + if (has_nulls) + { + Assert(false); + nulls_serialized = bytes_deserialize_simple8b_and_advance(si); + } + (void) nulls_serialized; + + Simple8bRleSerialized *sizes_serialized = bytes_deserialize_simple8b_and_advance(si); + + uint16 sizes[GLOBAL_MAX_ROWS_PER_COMPRESSION]; + const uint16 n = simple8brle_decompress_all_buf_uint16(sizes_serialized, + sizes, + sizeof(sizes) / sizeof(sizes[0])); + + uint32 *offsets = + (uint32 *) MemoryContextAllocZero(dest_mctx, pad64(sizeof(*offsets) * (n + 1))); + uint8 *arrow_bodies = (uint8 *) MemoryContextAllocZero(dest_mctx, pad64(si->len - si->cursor)); + + int offset = 0; + for (int i = 0; i < n; i++) + { + void *vardata = consumeCompressedData(si, sizes[i]); + // CheckCompressedData(VARSIZE_ANY(vardata) == sizes[i]); + // CheckCompressedData(sizes[i] > VARHDRSZ); + const int textlen = VARSIZE_ANY_EXHDR(vardata); + memcpy(&arrow_bodies[offset], VARDATA_ANY(vardata), textlen); + + // fprintf(stderr, "%d: copied: '%s' len %d varsize %d result %.*s\n", + // i, text_to_cstring(vardata), textlen, (int) VARSIZE_ANY(vardata), textlen, + //&arrow_bodies[offset]); + + offsets[i] = offset; + offset += textlen; + } + offsets[n] = offset; + + const int validity_bitmap_bytes = sizeof(uint64) * pad64(n); + uint64 *restrict validity_bitmap = MemoryContextAlloc(dest_mctx, validity_bitmap_bytes); + memset(validity_bitmap, 0xFF, validity_bitmap_bytes); + + ArrowArray *result = MemoryContextAllocZero(dest_mctx, sizeof(ArrowArray) + sizeof(void *) * 3); + const void **buffers = (const void **) &result[1]; + buffers[0] = validity_bitmap; + buffers[1] = offsets; + buffers[2] = arrow_bodies; + result->n_buffers = 3; + result->buffers = buffers; + result->length = n; + result->null_count = 0; + return result; +} + DecompressResult array_decompression_iterator_try_next_reverse(DecompressionIterator *base_iter) { diff --git a/tsl/src/compression/array.h b/tsl/src/compression/array.h index 9573eff955d..705fb10ef43 100644 --- a/tsl/src/compression/array.h +++ b/tsl/src/compression/array.h @@ -66,6 +66,12 @@ extern void array_compressed_send(CompressedDataHeader *header, StringInfo buffe extern Datum tsl_array_compressor_append(PG_FUNCTION_ARGS); extern Datum tsl_array_compressor_finish(PG_FUNCTION_ARGS); +ArrowArray *tsl_text_array_decompress_all(Datum compressed_array, Oid element_type, + MemoryContext dest_mctx); + +ArrowArray *text_array_decompress_all_serialized_no_header(StringInfo si, bool has_nulls, + MemoryContext dest_mctx); + #define ARRAY_ALGORITHM_DEFINITION \ { \ .iterator_init_forward = tsl_array_decompression_iterator_from_datum_forward, \ @@ -74,6 +80,7 @@ extern Datum tsl_array_compressor_finish(PG_FUNCTION_ARGS); .compressed_data_recv = array_compressed_recv, \ .compressor_for_type = array_compressor_for_type, \ .compressed_data_storage = TOAST_STORAGE_EXTENDED, \ + .decompress_all = tsl_text_array_decompress_all, \ } #endif diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index fcdf77417a2..91c252cd13a 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -126,11 +126,17 @@ DecompressionIterator *(*tsl_get_decompression_iterator_init(CompressionAlgorith } DecompressAllFunction -tsl_get_decompress_all_function(CompressionAlgorithms algorithm) +tsl_get_decompress_all_function(CompressionAlgorithms algorithm, Oid type) { if (algorithm >= _END_COMPRESSION_ALGORITHMS) elog(ERROR, "invalid compression algorithm %d", algorithm); + if (type != TEXTOID && + (algorithm == COMPRESSION_ALGORITHM_DICTIONARY || algorithm == COMPRESSION_ALGORITHM_ARRAY)) + { + return NULL; + } + return definitions[algorithm].decompress_all; } @@ -1758,6 +1764,21 @@ tsl_compressed_data_decompress_reverse(PG_FUNCTION_ARGS) ; } +TS_FUNCTION_INFO_V1(tsl_compressed_data_info); + +Datum +tsl_compressed_data_info(PG_FUNCTION_ARGS) +{ + StringInfoData buf = { 0 }; + initStringInfo(&buf); + + CompressedDataHeader *header = get_compressed_data_header(PG_GETARG_DATUM(0)); + + appendStringInfo(&buf, "algo: %d", header->compression_algorithm); + + PG_RETURN_CSTRING(buf.data); +} + Datum tsl_compressed_data_send(PG_FUNCTION_ARGS) { diff --git a/tsl/src/compression/compression.h b/tsl/src/compression/compression.h index fbfd6e49b1d..d06ab55953d 100644 --- a/tsl/src/compression/compression.h +++ b/tsl/src/compression/compression.h @@ -316,7 +316,8 @@ extern void decompress_chunk(Oid in_table, Oid out_table); extern DecompressionIterator *(*tsl_get_decompression_iterator_init( CompressionAlgorithms algorithm, bool reverse))(Datum, Oid element_type); -extern DecompressAllFunction tsl_get_decompress_all_function(CompressionAlgorithms algorithm); +extern DecompressAllFunction tsl_get_decompress_all_function(CompressionAlgorithms algorithm, + Oid type); typedef struct Chunk Chunk; typedef struct ChunkInsertState ChunkInsertState; @@ -374,9 +375,13 @@ extern RowDecompressor build_decompressor(Relation in_rel, Relation out_rel); #define CORRUPT_DATA_MESSAGE (errcode(ERRCODE_DATA_CORRUPTED)) #endif -#define CheckCompressedData(X) \ - if (unlikely(!(X))) \ - ereport(ERROR, CORRUPT_DATA_MESSAGE) +#define CDSTR(X) #X +#define CDSTR2(X) CDSTR(X) + +#define CheckCompressedData Assert +// #define CheckCompressedData(X) \ +// if (unlikely(!(X))) \ +// ereport(ERROR, CORRUPT_DATA_MESSAGE, errdetail(#X)) inline static void * consumeCompressedData(StringInfo si, int bytes) diff --git a/tsl/src/compression/decompress_test_impl.c b/tsl/src/compression/decompress_test_impl.c index 69897d45b99..c452b173345 100644 --- a/tsl/src/compression/decompress_test_impl.c +++ b/tsl/src/compression/decompress_test_impl.c @@ -42,7 +42,7 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks) * For routine fuzzing, we only run bulk decompression to make it faster * and the coverage space smaller. */ - DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo); + DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo, PGTYPE); decompress_all(compressed_data, PGTYPE, CurrentMemoryContext); return 0; } @@ -53,7 +53,7 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks) * the row-by-row is old and stable. */ ArrowArray *arrow = NULL; - DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo); + DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo, PGTYPE); if (decompress_all) { arrow = decompress_all(compressed_data, PGTYPE, CurrentMemoryContext); diff --git a/tsl/src/compression/dictionary.c b/tsl/src/compression/dictionary.c index 06fa9c6d5ec..695a8303f9f 100644 --- a/tsl/src/compression/dictionary.c +++ b/tsl/src/compression/dictionary.c @@ -25,6 +25,7 @@ #include "compression/array.h" #include "compression/dictionary_hash.h" #include "compression/datum_serialize.h" +#include "compression/arrow_c_data_interface.h" /* * A compression bitmap is stored as @@ -334,6 +335,13 @@ dictionary_compressor_finish(DictionaryCompressor *compressor) average_element_size = sizes.dictionary_size / sizes.num_distinct; expected_array_size = average_element_size * sizes.dictionary_compressed_indexes->num_elements; compressed = dictionary_compressed_from_serialization_info(sizes, compressor->type); + fprintf(stderr, + "dict size %ld, distinct %ld, avg element size %ld, easize %ld, totalsize %ld\n", + sizes.dictionary_size, + (uint64) sizes.num_distinct, + average_element_size, + expected_array_size, + sizes.total_size); if (expected_array_size < sizes.total_size) return dictionary_compressed_to_array_compressed(compressed); @@ -395,6 +403,63 @@ dictionary_decompression_iterator_init(DictionaryDecompressionIterator *iter, co } Assert(array_decompression_iterator_try_next_forward(dictionary_iterator).is_done); } + +static uint64 +pad64(uint64 value) +{ + return ((value + 63) / 64) * 64; +} + +#define ELEMENT_TYPE int16 +#include "simple8b_rle_decompress_all.h" +#undef ELEMENT_TYPE + +ArrowArray * +tsl_text_dictionary_decompress_all(Datum compressed, Oid element_type, MemoryContext dest_mctx) +{ + Assert(element_type == TEXTOID); + + compressed = PointerGetDatum(PG_DETOAST_DATUM(compressed)); + + StringInfoData si = { .data = DatumGetPointer(compressed), .len = VARSIZE(compressed) }; + + const DictionaryCompressed *header = consumeCompressedData(&si, sizeof(DictionaryCompressed)); + + Assert(header->compression_algorithm == COMPRESSION_ALGORITHM_DICTIONARY); + CheckCompressedData(header->element_type == TEXTOID); + + Simple8bRleSerialized *indices_serialized = bytes_deserialize_simple8b_and_advance(&si); + const uint16 n_padded = indices_serialized->num_elements + 63; + int16 *indices = MemoryContextAlloc(dest_mctx, sizeof(int16) * n_padded); + const uint16 n = simple8brle_decompress_all_buf_int16(indices_serialized, indices, n_padded); + + if (header->has_nulls) + { + Assert(false); + Simple8bRleSerialized *nulls_serialized = bytes_deserialize_simple8b_and_advance(&si); + (void) nulls_serialized; + } + + const int validity_bitmap_bytes = sizeof(uint64) * pad64(n); + uint64 *restrict validity_bitmap = MemoryContextAlloc(dest_mctx, validity_bitmap_bytes); + memset(validity_bitmap, 0xFF, validity_bitmap_bytes); + + ArrowArray *dict = + text_array_decompress_all_serialized_no_header(&si, /* has_nulls = */ false, dest_mctx); + CheckCompressedData(header->num_distinct == dict->length); + + ArrowArray *result = MemoryContextAllocZero(dest_mctx, sizeof(ArrowArray) + sizeof(void *) * 2); + const void **buffers = (const void **) &result[1]; + buffers[0] = validity_bitmap; + buffers[1] = indices; + result->n_buffers = 2; + result->buffers = buffers; + result->length = n; + result->null_count = 0; + result->dictionary = dict; + return result; +} + DecompressionIterator * tsl_dictionary_decompression_iterator_from_datum_forward(Datum dictionary_compressed, Oid element_type) diff --git a/tsl/src/compression/dictionary.h b/tsl/src/compression/dictionary.h index 081bf578b8b..a316824af26 100644 --- a/tsl/src/compression/dictionary.h +++ b/tsl/src/compression/dictionary.h @@ -47,6 +47,12 @@ extern Datum dictionary_compressed_recv(StringInfo buf); extern Datum tsl_dictionary_compressor_append(PG_FUNCTION_ARGS); extern Datum tsl_dictionary_compressor_finish(PG_FUNCTION_ARGS); +ArrowArray *tsl_text_array_decompress_all(Datum compressed_array, Oid element_type, + MemoryContext dest_mctx); + +ArrowArray *tsl_text_dictionary_decompress_all(Datum compressed, Oid element_type, + MemoryContext dest_mctx); + #define DICTIONARY_ALGORITHM_DEFINITION \ { \ .iterator_init_forward = tsl_dictionary_decompression_iterator_from_datum_forward, \ @@ -55,6 +61,7 @@ extern Datum tsl_dictionary_compressor_finish(PG_FUNCTION_ARGS); .compressed_data_recv = dictionary_compressed_recv, \ .compressor_for_type = dictionary_compressor_for_type, \ .compressed_data_storage = TOAST_STORAGE_EXTENDED, \ + .decompress_all = tsl_text_dictionary_decompress_all, \ } #endif diff --git a/tsl/src/compression/simple8b_rle_decompress_all.h b/tsl/src/compression/simple8b_rle_decompress_all.h index 48a168fb581..5466d1efc56 100644 --- a/tsl/src/compression/simple8b_rle_decompress_all.h +++ b/tsl/src/compression/simple8b_rle_decompress_all.h @@ -86,7 +86,7 @@ FUNCTION_NAME(simple8brle_decompress_all_buf, * might be incorrect. \ */ \ const uint16 n_block_values = SIMPLE8B_NUM_ELEMENTS[X]; \ - CheckCompressedData(decompressed_index + n_block_values < n_buffer_elements); \ + CheckCompressedData(decompressed_index + n_block_values <= n_buffer_elements); \ \ const uint64 bitmask = simple8brle_selector_get_bitmask(X); \ \ diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index 97492449189..60cfa61cf02 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -335,7 +335,8 @@ compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state, } DecompressAllFunction decompress_all = - tsl_get_decompress_all_function(header->compression_algorithm); + tsl_get_decompress_all_function(header->compression_algorithm, + column_description->typid); Assert(decompress_all != NULL); MemoryContext context_before_decompression = @@ -434,6 +435,20 @@ compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state, MemoryContextSwitchTo(old_context); } +static Datum +get_text_datum(ArrowArray *arrow, int arrow_row) +{ + Assert(arrow->dictionary == NULL); + const uint32 start = ((uint32 *) arrow->buffers[1])[arrow_row]; + const int32 value_bytes = ((uint32 *) arrow->buffers[1])[arrow_row + 1] - start; + Assert(value_bytes >= 0); + Datum datum = PointerGetDatum(palloc0(value_bytes + VARHDRSZ)); + SET_VARSIZE(datum, value_bytes + VARHDRSZ); + memcpy(VARDATA(datum), &((uint8 *) arrow->buffers[2])[start], value_bytes); + + return datum; +} + /* * Construct the next tuple in the decompressed scan slot. * Doesn't check the quals. @@ -473,39 +488,59 @@ compressed_batch_make_next_tuple(DecompressChunkState *chunk_state, } else if (column_values.arrow_values != NULL) { - const char *restrict src = column_values.arrow_values; - Assert(column_values.value_bytes > 0); - - /* - * The conversion of Datum to more narrow types will truncate - * the higher bytes, so we don't care if we read some garbage - * into them, and can always read 8 bytes. These are unaligned - * reads, so technically we have to do memcpy. - */ - uint64 value; - memcpy(&value, &src[column_values.value_bytes * arrow_row], 8); - -#ifdef USE_FLOAT8_BYVAL - Datum datum = Int64GetDatum(value); -#else - /* - * On 32-bit systems, the data larger than 4 bytes go by - * reference, so we have to jump through these hoops. - */ - Datum datum; - if (column_values.value_bytes <= 4) + const AttrNumber attr = AttrNumberGetAttrOffset(column_values.output_attno); + if (column_values.value_bytes == -1) { - datum = Int32GetDatum((uint32) value); + if (column_values.arrow->dictionary == NULL) + { + decompressed_scan_slot->tts_values[attr] = + get_text_datum(column_values.arrow, arrow_row); + } + else + { + const int16 index = ((int16 *) column_values.arrow->buffers[1])[arrow_row]; + decompressed_scan_slot->tts_values[attr] = + get_text_datum(column_values.arrow->dictionary, index); + } + + decompressed_scan_slot->tts_isnull[attr] = + !arrow_row_is_valid(column_values.arrow->buffers[0], arrow_row); } else { - datum = Int64GetDatum(value); - } + Assert(column_values.value_bytes > 0); + const char *restrict src = column_values.arrow_values; + + /* + * The conversion of Datum to more narrow types will truncate + * the higher bytes, so we don't care if we read some garbage + * into them, and can always read 8 bytes. These are unaligned + * reads, so technically we have to do memcpy. + */ + uint64 value; + memcpy(&value, &src[column_values.value_bytes * arrow_row], 8); + +#ifdef USE_FLOAT8_BYVAL + Datum datum = Int64GetDatum(value); +#else + /* + * On 32-bit systems, the data larger than 4 bytes go by + * reference, so we have to jump through these hoops. + */ + Datum datum; + if (column_values.value_bytes <= 4) + { + datum = Int32GetDatum((uint32) value); + } + else + { + datum = Int64GetDatum(value); + } #endif - const AttrNumber attr = AttrNumberGetAttrOffset(column_values.output_attno); - decompressed_scan_slot->tts_values[attr] = datum; - decompressed_scan_slot->tts_isnull[attr] = - !arrow_row_is_valid(column_values.arrow_validity, arrow_row); + decompressed_scan_slot->tts_values[attr] = datum; + decompressed_scan_slot->tts_isnull[attr] = + !arrow_row_is_valid(column_values.arrow_validity, arrow_row); + } } } diff --git a/tsl/src/nodes/decompress_chunk/planner.c b/tsl/src/nodes/decompress_chunk/planner.c index bc83707e0b1..443c4345ea4 100644 --- a/tsl/src/nodes/decompress_chunk/planner.c +++ b/tsl/src/nodes/decompress_chunk/planner.c @@ -254,13 +254,28 @@ build_decompression_map(PlannerInfo *root, DecompressChunkPath *path, List *scan lappend_int(path->is_segmentby_column, compression_info && compression_info->segmentby_column_index != 0); - const bool bulk_decompression_possible = - destination_attno_in_uncompressed_chunk > 0 && compression_info && - tsl_get_decompress_all_function(compression_info->algo_id) != NULL; + /* + * Determine if we can use bulk decompression for this column. + */ + bool bulk_decompression_possible = false; + if (destination_attno_in_uncompressed_chunk > 0 && compression_info) + { + Oid typid = + get_atttype(path->info->chunk_rte->relid, destination_attno_in_uncompressed_chunk); + Assert(OidIsValid(typid)); + if (tsl_get_decompress_all_function(compression_info->algo_id, typid) != NULL) + { + bulk_decompression_possible = true; + } + } path->have_bulk_decompression_columns |= bulk_decompression_possible; path->bulk_decompression_column = lappend_int(path->bulk_decompression_column, bulk_decompression_possible); + /* + * Save information about decompressed columns in uncompressed chunk + * for planning of vectorized filters. + */ if (destination_attno_in_uncompressed_chunk > 0) { path->uncompressed_chunk_attno_to_compression_info