Skip to content

Commit

Permalink
bulk decompression of text columns
Browse files Browse the repository at this point in the history
  • Loading branch information
akuzm committed Oct 12, 2023
1 parent 6f038ec commit 4e1a943
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 40 deletions.
83 changes: 83 additions & 0 deletions tsl/src/compression/array.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "compression/simple8b_rle.h"
#include "datum_serialize.h"

#include "compression/arrow_c_data_interface.h"

/* A "compressed" array
* uint8 has_nulls: 1 iff this has a nulls bitmap stored before the data
* Oid element_type: the element stored by this array
Expand Down Expand Up @@ -459,6 +461,87 @@ tsl_array_decompression_iterator_from_datum_reverse(Datum compressed_array, Oid
return &iterator->base;
}

static uint64
pad64(uint64 value)
{
return ((value + 63) / 64) * 64;
}

#define ELEMENT_TYPE uint16
#include "simple8b_rle_decompress_all.h"
#undef ELEMENT_TYPE

ArrowArray *
tsl_text_array_decompress_all(Datum compressed_array, Oid element_type, MemoryContext dest_mctx)
{
Assert(element_type == TEXTOID);
void *compressed_data = PG_DETOAST_DATUM(compressed_array);
StringInfoData si = { .data = compressed_data, .len = VARSIZE(compressed_data) };
ArrayCompressed *header = consumeCompressedData(&si, sizeof(ArrayCompressed));

Assert(header->compression_algorithm == COMPRESSION_ALGORITHM_ARRAY);
CheckCompressedData(header->element_type == TEXTOID);

return text_array_decompress_all_serialized_no_header(&si, header->has_nulls, dest_mctx);
}

ArrowArray *
text_array_decompress_all_serialized_no_header(StringInfo si, bool has_nulls,
MemoryContext dest_mctx)
{
Simple8bRleSerialized *nulls_serialized = NULL;
if (has_nulls)
{
Assert(false);
nulls_serialized = bytes_deserialize_simple8b_and_advance(si);
}
(void) nulls_serialized;

Simple8bRleSerialized *sizes_serialized = bytes_deserialize_simple8b_and_advance(si);

uint16 sizes[GLOBAL_MAX_ROWS_PER_COMPRESSION];
const uint16 n = simple8brle_decompress_all_buf_uint16(sizes_serialized,
sizes,
sizeof(sizes) / sizeof(sizes[0]));

uint32 *offsets =
(uint32 *) MemoryContextAllocZero(dest_mctx, pad64(sizeof(*offsets) * (n + 1)));
uint8 *arrow_bodies = (uint8 *) MemoryContextAllocZero(dest_mctx, pad64(si->len - si->cursor));

int offset = 0;
for (int i = 0; i < n; i++)
{
void *vardata = consumeCompressedData(si, sizes[i]);
// CheckCompressedData(VARSIZE_ANY(vardata) == sizes[i]);
// CheckCompressedData(sizes[i] > VARHDRSZ);
const int textlen = VARSIZE_ANY_EXHDR(vardata);
memcpy(&arrow_bodies[offset], VARDATA_ANY(vardata), textlen);

// fprintf(stderr, "%d: copied: '%s' len %d varsize %d result %.*s\n",
// i, text_to_cstring(vardata), textlen, (int) VARSIZE_ANY(vardata), textlen,
//&arrow_bodies[offset]);

offsets[i] = offset;
offset += textlen;
}
offsets[n] = offset;

const int validity_bitmap_bytes = sizeof(uint64) * pad64(n);
uint64 *restrict validity_bitmap = MemoryContextAlloc(dest_mctx, validity_bitmap_bytes);
memset(validity_bitmap, 0xFF, validity_bitmap_bytes);

ArrowArray *result = MemoryContextAllocZero(dest_mctx, sizeof(ArrowArray) + sizeof(void *) * 3);
const void **buffers = (const void **) &result[1];
buffers[0] = validity_bitmap;
buffers[1] = offsets;
buffers[2] = arrow_bodies;
result->n_buffers = 3;
result->buffers = buffers;
result->length = n;
result->null_count = 0;
return result;
}

DecompressResult
array_decompression_iterator_try_next_reverse(DecompressionIterator *base_iter)
{
Expand Down
7 changes: 7 additions & 0 deletions tsl/src/compression/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ extern void array_compressed_send(CompressedDataHeader *header, StringInfo buffe
extern Datum tsl_array_compressor_append(PG_FUNCTION_ARGS);
extern Datum tsl_array_compressor_finish(PG_FUNCTION_ARGS);

ArrowArray *tsl_text_array_decompress_all(Datum compressed_array, Oid element_type,
MemoryContext dest_mctx);

ArrowArray *text_array_decompress_all_serialized_no_header(StringInfo si, bool has_nulls,
MemoryContext dest_mctx);

#define ARRAY_ALGORITHM_DEFINITION \
{ \
.iterator_init_forward = tsl_array_decompression_iterator_from_datum_forward, \
Expand All @@ -74,6 +80,7 @@ extern Datum tsl_array_compressor_finish(PG_FUNCTION_ARGS);
.compressed_data_recv = array_compressed_recv, \
.compressor_for_type = array_compressor_for_type, \
.compressed_data_storage = TOAST_STORAGE_EXTENDED, \
.decompress_all = tsl_text_array_decompress_all, \
}

#endif
23 changes: 22 additions & 1 deletion tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,17 @@ DecompressionIterator *(*tsl_get_decompression_iterator_init(CompressionAlgorith
}

DecompressAllFunction
tsl_get_decompress_all_function(CompressionAlgorithms algorithm)
tsl_get_decompress_all_function(CompressionAlgorithms algorithm, Oid type)
{
if (algorithm >= _END_COMPRESSION_ALGORITHMS)
elog(ERROR, "invalid compression algorithm %d", algorithm);

if (type != TEXTOID &&
(algorithm == COMPRESSION_ALGORITHM_DICTIONARY || algorithm == COMPRESSION_ALGORITHM_ARRAY))
{
return NULL;
}

return definitions[algorithm].decompress_all;
}

Expand Down Expand Up @@ -1758,6 +1764,21 @@ tsl_compressed_data_decompress_reverse(PG_FUNCTION_ARGS)
;
}

TS_FUNCTION_INFO_V1(tsl_compressed_data_info);

Datum
tsl_compressed_data_info(PG_FUNCTION_ARGS)
{
StringInfoData buf = { 0 };
initStringInfo(&buf);

CompressedDataHeader *header = get_compressed_data_header(PG_GETARG_DATUM(0));

appendStringInfo(&buf, "algo: %d", header->compression_algorithm);

PG_RETURN_CSTRING(buf.data);
}

Datum
tsl_compressed_data_send(PG_FUNCTION_ARGS)
{
Expand Down
13 changes: 9 additions & 4 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ extern void decompress_chunk(Oid in_table, Oid out_table);
extern DecompressionIterator *(*tsl_get_decompression_iterator_init(
CompressionAlgorithms algorithm, bool reverse))(Datum, Oid element_type);

extern DecompressAllFunction tsl_get_decompress_all_function(CompressionAlgorithms algorithm);
extern DecompressAllFunction tsl_get_decompress_all_function(CompressionAlgorithms algorithm,
Oid type);

typedef struct Chunk Chunk;
typedef struct ChunkInsertState ChunkInsertState;
Expand Down Expand Up @@ -374,9 +375,13 @@ extern RowDecompressor build_decompressor(Relation in_rel, Relation out_rel);
#define CORRUPT_DATA_MESSAGE (errcode(ERRCODE_DATA_CORRUPTED))
#endif

#define CheckCompressedData(X) \
if (unlikely(!(X))) \
ereport(ERROR, CORRUPT_DATA_MESSAGE)
#define CDSTR(X) #X
#define CDSTR2(X) CDSTR(X)

#define CheckCompressedData Assert
// #define CheckCompressedData(X) \
// if (unlikely(!(X))) \
// ereport(ERROR, CORRUPT_DATA_MESSAGE, errdetail(#X))

inline static void *
consumeCompressedData(StringInfo si, int bytes)
Expand Down
4 changes: 2 additions & 2 deletions tsl/src/compression/decompress_test_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks)
* For routine fuzzing, we only run bulk decompression to make it faster
* and the coverage space smaller.
*/
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo);
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo, PGTYPE);
decompress_all(compressed_data, PGTYPE, CurrentMemoryContext);
return 0;
}
Expand All @@ -53,7 +53,7 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks)
* the row-by-row is old and stable.
*/
ArrowArray *arrow = NULL;
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo);
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo, PGTYPE);
if (decompress_all)
{
arrow = decompress_all(compressed_data, PGTYPE, CurrentMemoryContext);
Expand Down
65 changes: 65 additions & 0 deletions tsl/src/compression/dictionary.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "compression/array.h"
#include "compression/dictionary_hash.h"
#include "compression/datum_serialize.h"
#include "compression/arrow_c_data_interface.h"

/*
* A compression bitmap is stored as
Expand Down Expand Up @@ -334,6 +335,13 @@ dictionary_compressor_finish(DictionaryCompressor *compressor)
average_element_size = sizes.dictionary_size / sizes.num_distinct;
expected_array_size = average_element_size * sizes.dictionary_compressed_indexes->num_elements;
compressed = dictionary_compressed_from_serialization_info(sizes, compressor->type);
fprintf(stderr,
"dict size %ld, distinct %ld, avg element size %ld, easize %ld, totalsize %ld\n",
sizes.dictionary_size,
(uint64) sizes.num_distinct,
average_element_size,
expected_array_size,
sizes.total_size);
if (expected_array_size < sizes.total_size)
return dictionary_compressed_to_array_compressed(compressed);

Expand Down Expand Up @@ -395,6 +403,63 @@ dictionary_decompression_iterator_init(DictionaryDecompressionIterator *iter, co
}
Assert(array_decompression_iterator_try_next_forward(dictionary_iterator).is_done);
}

static uint64
pad64(uint64 value)
{
return ((value + 63) / 64) * 64;
}

#define ELEMENT_TYPE int16
#include "simple8b_rle_decompress_all.h"
#undef ELEMENT_TYPE

ArrowArray *
tsl_text_dictionary_decompress_all(Datum compressed, Oid element_type, MemoryContext dest_mctx)
{
Assert(element_type == TEXTOID);

compressed = PointerGetDatum(PG_DETOAST_DATUM(compressed));

StringInfoData si = { .data = DatumGetPointer(compressed), .len = VARSIZE(compressed) };

const DictionaryCompressed *header = consumeCompressedData(&si, sizeof(DictionaryCompressed));

Assert(header->compression_algorithm == COMPRESSION_ALGORITHM_DICTIONARY);
CheckCompressedData(header->element_type == TEXTOID);

Simple8bRleSerialized *indices_serialized = bytes_deserialize_simple8b_and_advance(&si);
const uint16 n_padded = indices_serialized->num_elements + 63;
int16 *indices = MemoryContextAlloc(dest_mctx, sizeof(int16) * n_padded);
const uint16 n = simple8brle_decompress_all_buf_int16(indices_serialized, indices, n_padded);

if (header->has_nulls)
{
Assert(false);
Simple8bRleSerialized *nulls_serialized = bytes_deserialize_simple8b_and_advance(&si);
(void) nulls_serialized;
}

const int validity_bitmap_bytes = sizeof(uint64) * pad64(n);
uint64 *restrict validity_bitmap = MemoryContextAlloc(dest_mctx, validity_bitmap_bytes);
memset(validity_bitmap, 0xFF, validity_bitmap_bytes);

ArrowArray *dict =
text_array_decompress_all_serialized_no_header(&si, /* has_nulls = */ false, dest_mctx);
CheckCompressedData(header->num_distinct == dict->length);

ArrowArray *result = MemoryContextAllocZero(dest_mctx, sizeof(ArrowArray) + sizeof(void *) * 2);
const void **buffers = (const void **) &result[1];
buffers[0] = validity_bitmap;
buffers[1] = indices;
result->n_buffers = 2;
result->buffers = buffers;
result->length = n;
result->null_count = 0;
result->dictionary = dict;
return result;
}

DecompressionIterator *
tsl_dictionary_decompression_iterator_from_datum_forward(Datum dictionary_compressed,
Oid element_type)
Expand Down
7 changes: 7 additions & 0 deletions tsl/src/compression/dictionary.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ extern Datum dictionary_compressed_recv(StringInfo buf);
extern Datum tsl_dictionary_compressor_append(PG_FUNCTION_ARGS);
extern Datum tsl_dictionary_compressor_finish(PG_FUNCTION_ARGS);

ArrowArray *tsl_text_array_decompress_all(Datum compressed_array, Oid element_type,
MemoryContext dest_mctx);

ArrowArray *tsl_text_dictionary_decompress_all(Datum compressed, Oid element_type,
MemoryContext dest_mctx);

#define DICTIONARY_ALGORITHM_DEFINITION \
{ \
.iterator_init_forward = tsl_dictionary_decompression_iterator_from_datum_forward, \
Expand All @@ -55,6 +61,7 @@ extern Datum tsl_dictionary_compressor_finish(PG_FUNCTION_ARGS);
.compressed_data_recv = dictionary_compressed_recv, \
.compressor_for_type = dictionary_compressor_for_type, \
.compressed_data_storage = TOAST_STORAGE_EXTENDED, \
.decompress_all = tsl_text_dictionary_decompress_all, \
}

#endif
2 changes: 1 addition & 1 deletion tsl/src/compression/simple8b_rle_decompress_all.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ FUNCTION_NAME(simple8brle_decompress_all_buf,
* might be incorrect. \
*/ \
const uint16 n_block_values = SIMPLE8B_NUM_ELEMENTS[X]; \
CheckCompressedData(decompressed_index + n_block_values < n_buffer_elements); \
CheckCompressedData(decompressed_index + n_block_values <= n_buffer_elements); \
\
const uint64 bitmask = simple8brle_selector_get_bitmask(X); \
\
Expand Down
Loading

0 comments on commit 4e1a943

Please sign in to comment.