Skip to content

Commit

Permalink
Add spooling session properties
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Jan 21, 2025
1 parent d15a18d commit d856917
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
import static io.trino.server.protocol.spooling.SpooledBlock.SPOOLING_METADATA_SYMBOL;
import static io.trino.server.protocol.spooling.SpooledBlock.SPOOLING_METADATA_TYPE;
import static io.trino.server.protocol.spooling.SpooledBlock.createNonSpooledPage;
import static io.trino.server.protocol.spooling.SpoolingSessionProperties.getInitialSegmentSize;
import static io.trino.server.protocol.spooling.SpoolingSessionProperties.getMaxInlinedRows;
import static io.trino.server.protocol.spooling.SpoolingSessionProperties.getMaxInlinedSize;
import static io.trino.server.protocol.spooling.SpoolingSessionProperties.getMaxSegmentSize;
import static io.trino.server.protocol.spooling.SpoolingSessionProperties.isAllowInlining;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -160,11 +165,11 @@ public OutputSpoolingOperator(OperatorContext operatorContext, QueryDataEncoder
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.controller = new OutputSpoolingController(
spoolingConfig.isAllowInlining(),
spoolingConfig.getMaximumInlinedRows(),
spoolingConfig.getMaximumSegmentSize().toBytes(),
spoolingConfig.getInitialSegmentSize().toBytes(),
spoolingConfig.getMaximumSegmentSize().toBytes());
isAllowInlining(operatorContext.getSession()),
getMaxInlinedRows(operatorContext.getSession()),
getMaxInlinedSize(operatorContext.getSession()).toBytes(),
getInitialSegmentSize(operatorContext.getSession()).toBytes(),
getMaxSegmentSize(operatorContext.getSession()).toBytes());
this.userMemoryContext = operatorContext.newLocalUserMemoryContext(OutputSpoolingOperator.class.getSimpleName());
this.queryDataEncoder = requireNonNull(queryDataEncoder, "queryDataEncoder is null");
this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import com.google.inject.multibindings.OptionalBinder;
import com.google.inject.multibindings.ProvidesIntoSet;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.SystemSessionPropertiesProvider;
import io.trino.server.ServerConfig;
import io.trino.server.protocol.spooling.SpoolingConfig.SegmentRetrievalMode;
import io.trino.spi.spool.SpoolingManager;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.jaxrs.JaxrsBinder.jaxrsBinder;
import static io.trino.server.protocol.spooling.QueryDataEncoder.EncoderSelector.noEncoder;
Expand All @@ -49,6 +51,8 @@ protected void setup(Binder binder)
return;
}

newSetBinder(binder, SystemSessionPropertiesProvider.class).addBinding().to(SpoolingSessionProperties.class).in(Scopes.SINGLETON);

boolean isCoordinator = buildConfigObject(ServerConfig.class).isCoordinator();
SpoolingConfig spoolingConfig = buildConfigObject(SpoolingConfig.class);
binder.bind(QueryDataEncoder.EncoderSelector.class).to(PreferredQueryDataEncoderSelector.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.server.protocol.spooling;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.SystemSessionPropertiesProvider;
import io.trino.spi.TrinoException;
import io.trino.spi.session.PropertyMetadata;

import java.util.List;
import java.util.function.Consumer;

import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.longProperty;

public class SpoolingSessionProperties
implements SystemSessionPropertiesProvider
{
// Spooled segments
public static final String INITIAL_SEGMENT_SIZE = "spooling_initial_segment_size";
public static final String MAX_SEGMENT_SIZE = "spooling_max_segment_size";

// Inlined segments
public static final String ALLOW_INLINING = "spooling_inlining_enabled";
public static final String MAX_INLINED_SIZE = "spooling_max_inlined_size";
public static final String MAX_INLINED_ROWS = "spooling_max_inlined_rows";

private final List<PropertyMetadata<?>> sessionProperties;

@Inject
public SpoolingSessionProperties(SpoolingConfig spoolingConfig)
{
sessionProperties = ImmutableList.<PropertyMetadata<?>>builder()
.add(dataSizeProperty(
INITIAL_SEGMENT_SIZE,
"Initial size of a spooled segment",
spoolingConfig.getInitialSegmentSize(),
isDataSizeBetween(INITIAL_SEGMENT_SIZE, DataSize.of(1, KILOBYTE), DataSize.of(128, MEGABYTE)),
false))
.add(dataSizeProperty(
MAX_SEGMENT_SIZE,
"Maximum size of a spooled segment",
spoolingConfig.getMaximumSegmentSize(),
isDataSizeBetween(MAX_SEGMENT_SIZE, DataSize.of(1, KILOBYTE), DataSize.of(128, MEGABYTE)),
false))
.add(booleanProperty(
ALLOW_INLINING,
"Allow inlining initial rows",
spoolingConfig.isAllowInlining(),
false))
.add(dataSizeProperty(
MAX_INLINED_SIZE,
"Maximum size of inlined data",
spoolingConfig.getMaximumInlinedSize(),
isDataSizeBetween(MAX_INLINED_SIZE, DataSize.of(1, KILOBYTE), DataSize.of(1, MEGABYTE)),
false))
.add(longProperty(
MAX_INLINED_ROWS,
"Maximum number of rows that are allowed to be inlined per worker",
spoolingConfig.getMaximumInlinedRows(),
false))
.build();
}

private Consumer<DataSize> isDataSizeBetween(String property, DataSize min, DataSize max)
{
return value -> {
if (min.compareTo(value) > 0) {
throw new TrinoException(INVALID_SESSION_PROPERTY, "Session property '" + property + "' must be greater than " + min);
}

if (max.compareTo(value) < 0) {
throw new TrinoException(INVALID_SESSION_PROPERTY, "Session property '" + property + "' must be smaller than " + max);
}
};
}

public static DataSize getInitialSegmentSize(Session session)
{
return session.getSystemProperty(INITIAL_SEGMENT_SIZE, DataSize.class);
}

public static DataSize getMaxSegmentSize(Session session)
{
return session.getSystemProperty(MAX_SEGMENT_SIZE, DataSize.class);
}

public static boolean isAllowInlining(Session session)
{
return session.getSystemProperty(ALLOW_INLINING, Boolean.class);
}

public static DataSize getMaxInlinedSize(Session session)
{
return session.getSystemProperty(MAX_INLINED_SIZE, DataSize.class);
}

public static long getMaxInlinedRows(Session session)
{
return session.getSystemProperty(MAX_INLINED_ROWS, Long.class);
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
}
}

0 comments on commit d856917

Please sign in to comment.