Skip to content

Commit

Permalink
more prewhere
Browse files Browse the repository at this point in the history
  • Loading branch information
akuzm committed Oct 18, 2023
1 parent 890505e commit 03ccd4d
Showing 1 changed file with 49 additions and 31 deletions.
80 changes: 49 additions & 31 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ get_max_element_bytes(ArrowArray *text_array)
}

static void
compressed_batch_decompress_column(DecompressChunkState *chunk_state,
DecompressBatchState *batch_state, int i)
decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch_state, int i)
{
DecompressChunkColumnDescription *column_description = &chunk_state->template_columns[i];
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
Expand Down Expand Up @@ -147,8 +146,7 @@ compressed_batch_decompress_column(DecompressChunkState *chunk_state,
}

DecompressAllFunction decompress_all =
tsl_get_decompress_all_function(header->compression_algorithm,
column_description->typid);
tsl_get_decompress_all_function(header->compression_algorithm);
Assert(decompress_all != NULL);

MemoryContext context_before_decompression =
Expand Down Expand Up @@ -258,7 +256,7 @@ apply_vector_quals(DecompressChunkState *chunk_state, DecompressBatchState *batc
* skip decompressing some columns if the entire batch doesn't pass
* the quals.
*/
compressed_batch_decompress_column(chunk_state, batch_state, column_index);
decompress_column(chunk_state, batch_state, column_index);
Assert(column_values->value_bytes != 0);
}

Expand Down Expand Up @@ -448,8 +446,6 @@ compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state,
*/
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->value_bytes = 0;

// compressed_batch_decompress_column(chunk_state, batch_state, i);
break;
}
case SEGMENTBY_COLUMN:
Expand Down Expand Up @@ -514,14 +510,14 @@ compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state,
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->value_bytes == 0)
{
compressed_batch_decompress_column(chunk_state, batch_state, i);
decompress_column(chunk_state, batch_state, i);
Assert(column_values->value_bytes != 0);
}
}
}
else
{
//fprintf(stderr, "the entire batch didn't pass!!!\n");
// fprintf(stderr, "the entire batch didn't pass!!!\n");
}

MemoryContextSwitchTo(old_context);
Expand All @@ -532,8 +528,7 @@ compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state,
* Doesn't check the quals.
*/
static void
compressed_batch_make_next_tuple(DecompressChunkState *chunk_state,
DecompressBatchState *batch_state)
make_next_tuple(DecompressChunkState *chunk_state, DecompressBatchState *batch_state)
{
TupleTableSlot *decompressed_scan_slot = batch_state->decompressed_scan_slot;
Assert(decompressed_scan_slot != NULL);
Expand All @@ -549,25 +544,26 @@ compressed_batch_make_next_tuple(DecompressChunkState *chunk_state,
const int num_compressed_columns = chunk_state->num_compressed_columns;
for (int i = 0; i < num_compressed_columns; i++)
{
CompressedColumnValues column_values = batch_state->compressed_columns[i];

if (column_values.iterator != NULL)
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
Ensure(column_values->value_bytes != 0, "the column is not decompressed");
if (column_values->iterator != NULL)
{
DecompressResult result = column_values.iterator->try_next(column_values.iterator);
DecompressResult result = column_values->iterator->try_next(column_values->iterator);

if (result.is_done)
{
elog(ERROR, "compressed column out of sync with batch counter");
}

const AttrNumber attr = AttrNumberGetAttrOffset(column_values.output_attno);
const AttrNumber attr = AttrNumberGetAttrOffset(column_values->output_attno);
decompressed_scan_slot->tts_isnull[attr] = result.is_null;
decompressed_scan_slot->tts_values[attr] = result.val;
}
else if (column_values.arrow_values != NULL)
else if (column_values->arrow_values != NULL)
{
const char *restrict src = column_values.arrow_values;
Assert(column_values.value_bytes > 0);
const AttrNumber attr = AttrNumberGetAttrOffset(column_values->output_attno);
Assert(column_values->value_bytes > 0);
const char *restrict src = column_values->arrow_values;

/*
* The conversion of Datum to more narrow types will truncate
Expand All @@ -576,7 +572,7 @@ compressed_batch_make_next_tuple(DecompressChunkState *chunk_state,
* reads, so technically we have to do memcpy.
*/
uint64 value;
memcpy(&value, &src[column_values.value_bytes * arrow_row], 8);
memcpy(&value, &src[column_values->value_bytes * arrow_row], 8);

#ifdef USE_FLOAT8_BYVAL
Datum datum = Int64GetDatum(value);
Expand All @@ -586,7 +582,7 @@ compressed_batch_make_next_tuple(DecompressChunkState *chunk_state,
* reference, so we have to jump through these hoops.
*/
Datum datum;
if (column_values.value_bytes <= 4)
if (column_values->value_bytes <= 4)
{
datum = Int32GetDatum((uint32) value);
}
Expand All @@ -595,10 +591,9 @@ compressed_batch_make_next_tuple(DecompressChunkState *chunk_state,
datum = Int64GetDatum(value);
}
#endif
const AttrNumber attr = AttrNumberGetAttrOffset(column_values.output_attno);
decompressed_scan_slot->tts_values[attr] = datum;
decompressed_scan_slot->tts_isnull[attr] =
!arrow_row_is_valid(column_values.arrow_validity, arrow_row);
!arrow_row_is_valid(column_values->arrow_validity, arrow_row);
}
}

Expand All @@ -619,7 +614,7 @@ compressed_batch_make_next_tuple(DecompressChunkState *chunk_state,
}

static bool
compressed_batch_vector_qual(DecompressChunkState *chunk_state, DecompressBatchState *batch_state)
vector_qual(DecompressChunkState *chunk_state, DecompressBatchState *batch_state)
{
Assert(batch_state->total_batch_rows > 0);
Assert(batch_state->next_batch_row < batch_state->total_batch_rows);
Expand All @@ -637,7 +632,7 @@ compressed_batch_vector_qual(DecompressChunkState *chunk_state, DecompressBatchS
}

static bool
compressed_batch_postgres_qual(DecompressChunkState *chunk_state, DecompressBatchState *batch_state)
postgres_qual(DecompressChunkState *chunk_state, DecompressBatchState *batch_state)
{
TupleTableSlot *decompressed_scan_slot = batch_state->decompressed_scan_slot;
Assert(!TupIsNull(decompressed_scan_slot));
Expand Down Expand Up @@ -672,7 +667,7 @@ compressed_batch_advance(DecompressChunkState *chunk_state, DecompressBatchState
for (; batch_state->next_batch_row < batch_state->total_batch_rows;
batch_state->next_batch_row++)
{
if (!compressed_batch_vector_qual(chunk_state, batch_state))
if (!vector_qual(chunk_state, batch_state))
{
/*
* This row doesn't pass the vectorized quals. Advance the iterated
Expand All @@ -690,9 +685,9 @@ compressed_batch_advance(DecompressChunkState *chunk_state, DecompressBatchState
continue;
}

compressed_batch_make_next_tuple(chunk_state, batch_state);
make_next_tuple(chunk_state, batch_state);

if (!compressed_batch_postgres_qual(chunk_state, batch_state))
if (!postgres_qual(chunk_state, batch_state))
{
/*
* The tuple didn't pass the qual, fetch the next one in the next
Expand Down Expand Up @@ -743,11 +738,34 @@ compressed_batch_save_first_tuple(DecompressChunkState *chunk_state,
Assert(batch_state->total_batch_rows > 0);
Assert(TupIsNull(batch_state->decompressed_scan_slot));

compressed_batch_make_next_tuple(chunk_state, batch_state);
/*
* We might not have decompressed some columns if the vector quals didn't
* pass for the entire batch. Have to decompress them anyway if we're asked
* to save the first tuple. This doesn't actually happen yet, because the
* vectorized decompression is disabled with sorted merge, but we might want
* to enable it for some queries.
*/
const int num_compressed_columns = chunk_state->num_compressed_columns;
for (int i = 0; i < num_compressed_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->value_bytes == 0)
{
decompress_column(chunk_state, batch_state, i);

Check warning on line 754 in tsl/src/nodes/decompress_chunk/compressed_batch.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/decompress_chunk/compressed_batch.c#L754

Added line #L754 was not covered by tests
Assert(column_values->value_bytes != 0);
}

Check warning on line 756 in tsl/src/nodes/decompress_chunk/compressed_batch.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/decompress_chunk/compressed_batch.c#L756

Added line #L756 was not covered by tests
}

/* Make the first tuple and save it. */
make_next_tuple(chunk_state, batch_state);
ExecCopySlot(first_tuple_slot, batch_state->decompressed_scan_slot);

const bool qual_passed = compressed_batch_vector_qual(chunk_state, batch_state) &&
compressed_batch_postgres_qual(chunk_state, batch_state);
/*
* Check the quals and advance, so that the batch is in the correct state
* for the subsequent calls (matching tuple is in decompressed scan slot).
*/
const bool qual_passed =
vector_qual(chunk_state, batch_state) && postgres_qual(chunk_state, batch_state);
batch_state->next_batch_row++;

if (!qual_passed)
Expand Down

0 comments on commit 03ccd4d

Please sign in to comment.