diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/Query.java b/core/trino-main/src/main/java/io/trino/server/protocol/Query.java index c8f245488d43..8fdb3bfeb6b1 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/Query.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/Query.java @@ -51,7 +51,14 @@ import io.trino.spi.ErrorCode; import io.trino.spi.Page; import io.trino.spi.QueryId; +import io.trino.spi.block.ArrayBlock; +import io.trino.spi.block.Block; import io.trino.spi.block.BlockEncodingSerde; +import io.trino.spi.block.DictionaryBlock; +import io.trino.spi.block.MapBlock; +import io.trino.spi.block.RowBlock; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.block.ValueBlock; import io.trino.spi.exchange.ExchangeId; import io.trino.spi.security.SelectedRole; import io.trino.spi.type.Type; @@ -562,7 +569,9 @@ private synchronized QueryResultRows removePagesFromExchange(ResultQueryInfo que } Page page = deserializer.deserialize(serializedPage); - bytes += page.getLogicalSizeInBytes(); + // page should already be loaded since it was just deserialized + page = page.getLoadedPage(); + bytes += estimateJsonSize(page); resultBuilder.addPage(page); } if (exchangeDataSource.isFinished()) { @@ -577,6 +586,38 @@ private synchronized QueryResultRows removePagesFromExchange(ResultQueryInfo que return resultBuilder.build(); } + private static long estimateJsonSize(Page page) + { + long estimatedSize = 0; + for (int i = 0; i < page.getChannelCount(); i++) { + estimatedSize += estimateJsonSize(page.getBlock(i)); + } + return estimatedSize; + } + + private static long estimateJsonSize(Block block) + { + switch (block) { + case RunLengthEncodedBlock rleBlock: + return estimateJsonSize(rleBlock.getValue()) * rleBlock.getPositionCount(); + case DictionaryBlock dictionaryBlock: + ValueBlock dictionary = dictionaryBlock.getDictionary(); + double averageSizePerEntry = (double) estimateJsonSize(dictionary) / dictionary.getPositionCount(); + return (long) (averageSizePerEntry * block.getPositionCount()); + case RowBlock rowBlock: + return rowBlock.getFieldBlocks().stream() + .mapToLong(Query::estimateJsonSize) + .sum(); + case ArrayBlock arrayBlock: + return estimateJsonSize(arrayBlock.getElementsBlock()); + case MapBlock mapBlock: + return estimateJsonSize(mapBlock.getKeyBlock()) + + estimateJsonSize(mapBlock.getValueBlock()); + default: + return block.getSizeInBytes(); + } + } + private void closeExchangeIfNecessary(ResultQueryInfo queryInfo) { if (queryInfo.state() != FAILED && queryInfo.outputStage().isPresent()) { diff --git a/core/trino-main/src/test/java/io/trino/block/TestDictionaryBlock.java b/core/trino-main/src/test/java/io/trino/block/TestDictionaryBlock.java index 81d44cf104af..dc054c8f7222 100644 --- a/core/trino-main/src/test/java/io/trino/block/TestDictionaryBlock.java +++ b/core/trino-main/src/test/java/io/trino/block/TestDictionaryBlock.java @@ -84,31 +84,6 @@ public void testSizeInBytes() assertThat(dictionaryBlock.getSizeInBytes()).isEqualTo(dictionaryBlock.getDictionary().getSizeInBytes() + (100 * SIZE_OF_INT)); } - @Test - public void testLogicalSizeInBytes() - { - // The 10 Slices in the array will be of lengths 0 to 9. - Slice[] expectedValues = createExpectedValues(10); - - // The dictionary within the dictionary block is expected to be a VariableWidthBlock of size 95 bytes. - // 45 bytes for the expectedValues Slices (sum of seq(0,9)) and 50 bytes for the position and isNull array (total 10 positions). - DictionaryBlock dictionaryBlock = createDictionaryBlock(expectedValues, 100); - assertThat(dictionaryBlock.getDictionary().getLogicalSizeInBytes()).isEqualTo(95); - - // The 100 positions in the dictionary block index to 10 positions in the underlying dictionary (10 each). - // Logical size calculation accounts for 4 bytes of offset and 1 byte of isNull. Therefore the expected unoptimized - // size is 10 times the size of the underlying dictionary (VariableWidthBlock). - assertThat(dictionaryBlock.getLogicalSizeInBytes()).isEqualTo(95 * 10); - - // With alternating nulls, we have 21 positions, with the same size calculation as above. - dictionaryBlock = createDictionaryBlock(alternatingNullValues(expectedValues), 210); - assertThat(dictionaryBlock.getDictionary().getPositionCount()).isEqualTo(21); - assertThat(dictionaryBlock.getDictionary().getLogicalSizeInBytes()).isEqualTo(150); - - // The null positions should be included in the logical size. - assertThat(dictionaryBlock.getLogicalSizeInBytes()).isEqualTo(150 * 10); - } - @Test public void testCopyRegionCreatesCompactBlock() { diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index c9bbaeba1fbe..3d276d25d7f4 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -265,6 +265,26 @@ interface io.trino.spi.protocol.SpoolingManagerFactory Spooling SPI marked as experimental + + true + java.method.removed + method long io.trino.spi.Page::getLogicalSizeInBytes() + + + true + java.method.removed + method long io.trino.spi.block.Block::getLogicalSizeInBytes() + + + true + java.method.removed + method long io.trino.spi.block.DictionaryBlock::getLogicalSizeInBytes() + + + true + java.method.removed + method long io.trino.spi.block.RunLengthEncodedBlock::getLogicalSizeInBytes() + diff --git a/core/trino-spi/src/main/java/io/trino/spi/Page.java b/core/trino-spi/src/main/java/io/trino/spi/Page.java index 7f4a3ce59b72..12f83ccb047f 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/Page.java +++ b/core/trino-spi/src/main/java/io/trino/spi/Page.java @@ -51,7 +51,6 @@ static Page wrapBlocksWithoutCopy(int positionCount, Block[] blocks) private final int positionCount; private volatile long sizeInBytes = -1; private volatile long retainedSizeInBytes = -1; - private volatile long logicalSizeInBytes = -1; public Page(Block... blocks) { @@ -78,7 +77,6 @@ private Page(boolean blocksCopyRequired, int positionCount, Block[] blocks) if (blocks.length == 0) { this.blocks = EMPTY_BLOCKS; this.sizeInBytes = 0; - this.logicalSizeInBytes = 0; // Empty blocks are not considered "retained" by any particular page this.retainedSizeInBytes = INSTANCE_SIZE; } @@ -114,19 +112,6 @@ public long getSizeInBytes() return sizeInBytes; } - public long getLogicalSizeInBytes() - { - long logicalSizeInBytes = this.logicalSizeInBytes; - if (logicalSizeInBytes < 0) { - logicalSizeInBytes = 0; - for (Block block : blocks) { - logicalSizeInBytes += block.getLogicalSizeInBytes(); - } - this.logicalSizeInBytes = logicalSizeInBytes; - } - return logicalSizeInBytes; - } - public long getRetainedSizeInBytes() { long retainedSizeInBytes = this.retainedSizeInBytes; diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/Block.java b/core/trino-spi/src/main/java/io/trino/spi/block/Block.java index 97946961ecb7..63cc3afc7ab4 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/Block.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/Block.java @@ -46,20 +46,6 @@ public sealed interface Block */ long getSizeInBytes(); - /** - * Returns the size of the block contents, regardless of internal representation. - * The same logical data values should always have the same size, no matter - * what block type is used or how they are represented within a specific block. - *

- * This can differ substantially from {@link #getSizeInBytes} for certain block - * types. For RLE, it will be {@code N} times larger. For dictionary, it will be - * larger based on how many times dictionary entries are reused. - */ - default long getLogicalSizeInBytes() - { - return getSizeInBytes(); - } - /** * Returns the size of {@code block.getRegion(position, length)}. * The method can be expensive. Do not use it outside an implementation of Block. diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlock.java index 0a95b107b44a..61425cdb21a6 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlock.java @@ -42,7 +42,6 @@ public final class DictionaryBlock private final int[] ids; private final long retainedSizeInBytes; private volatile long sizeInBytes = -1; - private volatile long logicalSizeInBytes = -1; private volatile int uniqueIds = -1; // isSequentialIds is only valid when uniqueIds is computed private volatile boolean isSequentialIds; @@ -192,36 +191,6 @@ private void calculateCompactSize() this.isSequentialIds = isSequentialIds; } - @Override - public long getLogicalSizeInBytes() - { - if (logicalSizeInBytes >= 0) { - return logicalSizeInBytes; - } - - OptionalInt dictionarySizePerPosition = dictionary.fixedSizeInBytesPerPosition(); - if (dictionarySizePerPosition.isPresent()) { - logicalSizeInBytes = dictionarySizePerPosition.getAsInt() * (long) getPositionCount(); - return logicalSizeInBytes; - } - - // Calculation of logical size can be performed as part of calculateCompactSize() with minor modifications. - // Keeping this calculation separate as this is a little more expensive and may not be called as often. - long sizeInBytes = 0; - long[] seenSizes = new long[dictionary.getPositionCount()]; - Arrays.fill(seenSizes, -1L); - for (int i = 0; i < getPositionCount(); i++) { - int position = getId(i); - if (seenSizes[position] < 0) { - seenSizes[position] = dictionary.getRegionSizeInBytes(position, 1); - } - sizeInBytes += seenSizes[position]; - } - - logicalSizeInBytes = sizeInBytes; - return sizeInBytes; - } - @Override public long getRegionSizeInBytes(int positionOffset, int length) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthEncodedBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthEncodedBlock.java index cfb0530c4959..22393bea9fbc 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthEncodedBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthEncodedBlock.java @@ -118,12 +118,6 @@ public long getSizeInBytes() return value.getSizeInBytes(); } - @Override - public long getLogicalSizeInBytes() - { - return positionCount * value.getLogicalSizeInBytes(); - } - @Override public long getRetainedSizeInBytes() { diff --git a/core/trino-spi/src/test/java/io/trino/spi/TestPage.java b/core/trino-spi/src/test/java/io/trino/spi/TestPage.java index 382cc46f7fad..b8d01fd855b1 100644 --- a/core/trino-spi/src/test/java/io/trino/spi/TestPage.java +++ b/core/trino-spi/src/test/java/io/trino/spi/TestPage.java @@ -68,8 +68,6 @@ public void testSizesForNoColumnPage() { Page page = new Page(100); assertThat(page.getSizeInBytes()).isEqualTo(0); - assertThat(page.getLogicalSizeInBytes()).isEqualTo(0); - assertThat(page.getRetainedSizeInBytes()).isEqualTo(Page.INSTANCE_SIZE); // does not include the blocks array } @Test diff --git a/lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java b/lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java index 73b3d4bca307..c4bedba75083 100644 --- a/lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java +++ b/lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java @@ -100,7 +100,7 @@ public final class OrcWriter private final List types; private final CompressionKind compression; private final int stripeMaxBytes; - private final int chunkMaxLogicalBytes; + private final int chunkMaxBytes; private final int stripeMaxRowCount; private final int rowGroupMaxRowCount; private final int maxCompressionBufferSize; @@ -153,7 +153,7 @@ public OrcWriter( checkArgument(options.getStripeMaxSize().compareTo(options.getStripeMinSize()) >= 0, "stripeMaxSize must be greater than or equal to stripeMinSize"); int stripeMinBytes = toIntExact(requireNonNull(options.getStripeMinSize(), "stripeMinSize is null").toBytes()); this.stripeMaxBytes = toIntExact(requireNonNull(options.getStripeMaxSize(), "stripeMaxSize is null").toBytes()); - this.chunkMaxLogicalBytes = Math.max(1, stripeMaxBytes / 2); + this.chunkMaxBytes = Math.max(1, stripeMaxBytes / 2); this.stripeMaxRowCount = options.getStripeMaxRowCount(); this.rowGroupMaxRowCount = options.getRowGroupMaxRowCount(); recordValidation(validation -> validation.setRowGroupMaxRowCount(rowGroupMaxRowCount)); @@ -254,6 +254,8 @@ public void write(Page page) } checkArgument(page.getChannelCount() == columnWriters.size()); + // page should already be loaded, but double check + page = page.getLoadedPage(); if (validationBuilder != null) { validationBuilder.addPage(page); @@ -264,8 +266,8 @@ public void write(Page page) // align page to row group boundaries Page chunk = page.getRegion(writeOffset, min(page.getPositionCount() - writeOffset, min(rowGroupMaxRowCount - rowGroupRowCount, stripeMaxRowCount - stripeRowCount))); - // avoid chunk with huge logical size - while (chunk.getPositionCount() > 1 && chunk.getLogicalSizeInBytes() > chunkMaxLogicalBytes) { + // avoid chunk with huge size + while (chunk.getPositionCount() > 1 && chunk.getSizeInBytes() > chunkMaxBytes) { chunk = page.getRegion(writeOffset, chunk.getPositionCount() / 2); } diff --git a/lib/trino-orc/src/main/java/io/trino/orc/writer/SliceDictionaryColumnWriter.java b/lib/trino-orc/src/main/java/io/trino/orc/writer/SliceDictionaryColumnWriter.java index 65dc7ab4cfd7..90934321ed37 100644 --- a/lib/trino-orc/src/main/java/io/trino/orc/writer/SliceDictionaryColumnWriter.java +++ b/lib/trino-orc/src/main/java/io/trino/orc/writer/SliceDictionaryColumnWriter.java @@ -69,7 +69,7 @@ public class SliceDictionaryColumnWriter implements ColumnWriter, DictionaryColumn { private static final int INSTANCE_SIZE = instanceSize(SliceDictionaryColumnWriter.class); - private static final int DIRECT_CONVERSION_CHUNK_MAX_LOGICAL_BYTES = toIntExact(DataSize.of(32, MEGABYTE).toBytes()); + private static final int DIRECT_CONVERSION_CHUNK_MAX_BYTES = toIntExact(DataSize.of(32, MEGABYTE).toBytes()); private final OrcColumnId columnId; private final Type type; @@ -224,7 +224,7 @@ private boolean writeDictionaryRowGroup(Block dictionary, int valueCount, IntBig Block chunk = block.getRegion(0, chunkPositionCount); // avoid chunk with huge logical size - while (chunkPositionCount > 1 && chunk.getLogicalSizeInBytes() > DIRECT_CONVERSION_CHUNK_MAX_LOGICAL_BYTES) { + while (chunkPositionCount > 1 && chunk.getSizeInBytes() > DIRECT_CONVERSION_CHUNK_MAX_BYTES) { chunkPositionCount /= 2; chunk = chunk.getRegion(0, chunkPositionCount); } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java index 651d86040ef5..1eed8ba73ef1 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java @@ -100,7 +100,7 @@ public class ParquetWriter private final OutputStreamSliceOutput outputStream; private final ParquetWriterOptions writerOption; private final MessageType messageType; - private final int chunkMaxLogicalBytes; + private final int chunkMaxBytes; private final Map, Type> primitiveTypes; private final CompressionCodec compressionCodec; private final Optional parquetTimeZone; @@ -142,7 +142,7 @@ public ParquetWriter( recordValidation(validation -> validation.setColumns(messageType.getColumns())); recordValidation(validation -> validation.setCreatedBy(createdBy)); initColumnWriters(); - this.chunkMaxLogicalBytes = max(1, writerOption.getMaxRowGroupSize() / 2); + this.chunkMaxBytes = max(1, writerOption.getMaxRowGroupSize() / 2); } public long getWrittenBytes() @@ -174,6 +174,9 @@ public void write(Page page) checkArgument(page.getChannelCount() == columnWriters.size()); + // page should already be loaded, but double check + page = page.getLoadedPage(); + Page validationPage = page; recordValidation(validation -> validation.addPage(validationPage)); @@ -182,7 +185,7 @@ public void write(Page page) Page chunk = page.getRegion(writeOffset, min(page.getPositionCount() - writeOffset, writerOption.getBatchSize())); // avoid chunk with huge logical size - while (chunk.getPositionCount() > 1 && chunk.getLogicalSizeInBytes() > chunkMaxLogicalBytes) { + while (chunk.getPositionCount() > 1 && chunk.getSizeInBytes() > chunkMaxBytes) { chunk = page.getRegion(writeOffset, chunk.getPositionCount() / 2); }