Skip to content

Commit 7eed9a9

Browse files
ebyhrwendigo
andcommitted
Implement support for WITH SESSION clause
Co-Authored-By: Mateusz "Serafin" Gajewski <[email protected]>
1 parent ed48441 commit 7eed9a9

27 files changed

+504
-55
lines changed

core/trino-main/src/main/java/io/trino/Session.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import static com.google.common.base.MoreObjects.toStringHelper;
5656
import static com.google.common.base.Preconditions.checkArgument;
5757
import static com.google.common.base.Preconditions.checkState;
58+
import static io.trino.SystemSessionProperties.TIME_ZONE_ID;
5859
import static io.trino.client.ProtocolHeaders.TRINO_HEADERS;
5960
import static io.trino.spi.StandardErrorCode.CATALOG_NOT_FOUND;
6061
import static io.trino.spi.StandardErrorCode.NOT_FOUND;
@@ -416,6 +417,11 @@ public Session withDefaultProperties(Map<String, String> systemPropertyDefaults,
416417
.putAll(catalogEntry.getValue());
417418
}
418419

420+
return withProperties(systemProperties, catalogProperties);
421+
}
422+
423+
public Session withProperties(Map<String, String> systemProperties, Map<String, Map<String, String>> catalogProperties)
424+
{
419425
return new Session(
420426
queryId,
421427
querySpan,
@@ -428,7 +434,10 @@ public Session withDefaultProperties(Map<String, String> systemPropertyDefaults,
428434
schema,
429435
path,
430436
traceToken,
431-
timeZoneKey,
437+
// This is required to override a timezone using a WITH SESSION timezone
438+
Optional.ofNullable(systemProperties.get(TIME_ZONE_ID))
439+
.map(TimeZoneKey::getTimeZoneKey)
440+
.orElse(timeZoneKey),
432441
locale,
433442
remoteUserAddress,
434443
userAgent,

core/trino-main/src/main/java/io/trino/dispatcher/LocalDispatchQueryFactory.java

+5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.trino.server.protocol.Slug;
3939
import io.trino.spi.TrinoException;
4040
import io.trino.spi.resourcegroups.ResourceGroupId;
41+
import io.trino.sql.SessionPropertyResolver;
4142
import io.trino.sql.tree.Statement;
4243
import io.trino.transaction.TransactionId;
4344
import io.trino.transaction.TransactionManager;
@@ -59,6 +60,7 @@ public class LocalDispatchQueryFactory
5960
private final TransactionManager transactionManager;
6061
private final AccessControl accessControl;
6162
private final Metadata metadata;
63+
private final SessionPropertyResolver sessionPropertyResolver;
6264
private final QueryMonitor queryMonitor;
6365
private final LocationFactory locationFactory;
6466

@@ -77,6 +79,7 @@ public LocalDispatchQueryFactory(
7779
QueryManager queryManager,
7880
QueryManagerConfig queryManagerConfig,
7981
TransactionManager transactionManager,
82+
SessionPropertyResolver sessionPropertyResolver,
8083
AccessControl accessControl,
8184
Metadata metadata,
8285
QueryMonitor queryMonitor,
@@ -92,6 +95,7 @@ public LocalDispatchQueryFactory(
9295
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
9396
this.accessControl = requireNonNull(accessControl, "accessControl is null");
9497
this.metadata = requireNonNull(metadata, "metadata is null");
98+
this.sessionPropertyResolver = requireNonNull(sessionPropertyResolver, "sessionPropertyInterpreter is null");
9599
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");
96100
this.locationFactory = requireNonNull(locationFactory, "locationFactory is null");
97101
this.executionFactories = requireNonNull(executionFactories, "executionFactories is null");
@@ -132,6 +136,7 @@ public DispatchQuery createDispatchQuery(
132136
planOptimizersStatsCollector,
133137
getQueryType(preparedQuery.getStatement()),
134138
faultTolerantExecutionExchangeEncryptionEnabled,
139+
Optional.of(sessionPropertyResolver.getSessionPropertiesApplier(preparedQuery)),
135140
version);
136141

137142
// It is important that `queryCreatedEvent` is called here. Moving it past the `executor.submit` below

core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java

+9
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import io.trino.spi.resourcegroups.ResourceGroupId;
5454
import io.trino.spi.security.SelectedRole;
5555
import io.trino.spi.type.Type;
56+
import io.trino.sql.SessionPropertyResolver.SessionPropertiesApplier;
5657
import io.trino.sql.analyzer.Output;
5758
import io.trino.sql.planner.PlanFragment;
5859
import io.trino.tracing.TrinoAttributes;
@@ -245,6 +246,7 @@ public static QueryStateMachine begin(
245246
PlanOptimizersStatsCollector queryStatsCollector,
246247
Optional<QueryType> queryType,
247248
boolean faultTolerantExecutionExchangeEncryptionEnabled,
249+
Optional<SessionPropertiesApplier> sessionPropertiesApplier,
248250
NodeVersion version)
249251
{
250252
return beginWithTicker(
@@ -264,6 +266,7 @@ public static QueryStateMachine begin(
264266
queryStatsCollector,
265267
queryType,
266268
faultTolerantExecutionExchangeEncryptionEnabled,
269+
sessionPropertiesApplier,
267270
version);
268271
}
269272

@@ -284,6 +287,7 @@ static QueryStateMachine beginWithTicker(
284287
PlanOptimizersStatsCollector queryStatsCollector,
285288
Optional<QueryType> queryType,
286289
boolean faultTolerantExecutionExchangeEncryptionEnabled,
290+
Optional<SessionPropertiesApplier> sessionPropertiesApplier,
287291
NodeVersion version)
288292
{
289293
// if there is an existing transaction, activate it
@@ -314,6 +318,11 @@ static QueryStateMachine beginWithTicker(
314318
session = session.withoutSpooling();
315319
}
316320

321+
// Apply WITH SESSION properties which require transaction to be started to resolve catalog handles
322+
if (sessionPropertiesApplier.isPresent()) {
323+
session = sessionPropertiesApplier.orElseThrow().apply(session);
324+
}
325+
317326
Span querySpan = session.getQuerySpan();
318327

319328
querySpan.setAttribute(TrinoAttributes.QUERY_TYPE, queryType.map(Enum::name).orElse("UNKNOWN"));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.execution;
15+
16+
import com.google.inject.Inject;
17+
import io.trino.Session;
18+
import io.trino.metadata.SessionPropertyManager;
19+
import io.trino.security.AccessControl;
20+
import io.trino.spi.TrinoException;
21+
import io.trino.spi.connector.CatalogHandle;
22+
import io.trino.spi.session.PropertyMetadata;
23+
import io.trino.spi.type.Type;
24+
import io.trino.sql.PlannerContext;
25+
import io.trino.sql.tree.Expression;
26+
import io.trino.sql.tree.NodeRef;
27+
import io.trino.sql.tree.Parameter;
28+
import io.trino.sql.tree.QualifiedName;
29+
30+
import java.util.List;
31+
import java.util.Map;
32+
33+
import static io.trino.metadata.MetadataUtil.getRequiredCatalogHandle;
34+
import static io.trino.metadata.SessionPropertyManager.evaluatePropertyValue;
35+
import static io.trino.metadata.SessionPropertyManager.serializeSessionProperty;
36+
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
37+
import static io.trino.sql.analyzer.SemanticExceptions.semanticException;
38+
import static java.lang.String.format;
39+
import static java.util.Objects.requireNonNull;
40+
41+
public class SessionEvaluator
42+
{
43+
private final PlannerContext plannerContext;
44+
private final AccessControl accessControl;
45+
private final SessionPropertyManager sessionPropertyManager;
46+
47+
@Inject
48+
public SessionEvaluator(PlannerContext plannerContext, AccessControl accessControl, SessionPropertyManager sessionPropertyManager)
49+
{
50+
this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
51+
this.accessControl = requireNonNull(accessControl, "accessControl is null");
52+
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
53+
}
54+
55+
public String evaluate(Session session, QualifiedName name, Expression expression, Map<NodeRef<Parameter>, Expression> parameters)
56+
{
57+
List<String> nameParts = name.getParts();
58+
if (nameParts.size() == 1) {
59+
PropertyMetadata<?> systemPropertyMetadata = sessionPropertyManager.getSystemSessionPropertyMetadata(nameParts.getFirst())
60+
.orElseThrow(() -> semanticException(INVALID_SESSION_PROPERTY, expression, "Session property %s does not exist", name));
61+
62+
return toSessionValue(session, name, expression, parameters, systemPropertyMetadata);
63+
}
64+
else if (nameParts.size() == 2) {
65+
String catalogName = nameParts.getFirst();
66+
String propertyName = nameParts.getLast();
67+
68+
CatalogHandle catalogHandle = getRequiredCatalogHandle(plannerContext.getMetadata(), session, expression, catalogName);
69+
PropertyMetadata<?> connectorPropertyMetadata = sessionPropertyManager.getConnectorSessionPropertyMetadata(catalogHandle, propertyName)
70+
.orElseThrow(() -> semanticException(INVALID_SESSION_PROPERTY, expression, "Session property %s does not exist", name));
71+
72+
return toSessionValue(session, name, expression, parameters, connectorPropertyMetadata);
73+
}
74+
throw semanticException(INVALID_SESSION_PROPERTY, expression, "Invalid session property '%s'", name);
75+
}
76+
77+
private String toSessionValue(Session session, QualifiedName name, Expression expression, Map<NodeRef<Parameter>, Expression> parameters, PropertyMetadata<?> propertyMetadata)
78+
{
79+
Type type = propertyMetadata.getSqlType();
80+
Object objectValue;
81+
82+
try {
83+
objectValue = evaluatePropertyValue(expression, type, session, plannerContext, accessControl, parameters);
84+
}
85+
catch (TrinoException e) {
86+
throw new TrinoException(
87+
INVALID_SESSION_PROPERTY,
88+
format("Unable to set session property '%s' to '%s': %s", name, expression, e.getRawMessage()));
89+
}
90+
91+
String value = serializeSessionProperty(type, objectValue);
92+
// verify the SQL value can be decoded by the property
93+
try {
94+
propertyMetadata.decode(objectValue);
95+
}
96+
catch (RuntimeException e) {
97+
throw semanticException(INVALID_SESSION_PROPERTY, expression, "%s", e.getMessage());
98+
}
99+
100+
return value;
101+
}
102+
}

core/trino-main/src/main/java/io/trino/execution/SetSessionTask.java

+12-51
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,9 @@
1717
import com.google.inject.Inject;
1818
import io.trino.Session;
1919
import io.trino.execution.warnings.WarningCollector;
20-
import io.trino.metadata.SessionPropertyManager;
2120
import io.trino.security.AccessControl;
2221
import io.trino.security.SecurityContext;
2322
import io.trino.spi.TrinoException;
24-
import io.trino.spi.connector.CatalogHandle;
25-
import io.trino.spi.session.PropertyMetadata;
26-
import io.trino.spi.type.Type;
27-
import io.trino.sql.PlannerContext;
2823
import io.trino.sql.tree.Expression;
2924
import io.trino.sql.tree.QualifiedName;
3025
import io.trino.sql.tree.SetSession;
@@ -33,27 +28,20 @@
3328

3429
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
3530
import static io.trino.execution.ParameterExtractor.bindParameters;
36-
import static io.trino.metadata.MetadataUtil.getRequiredCatalogHandle;
37-
import static io.trino.metadata.SessionPropertyManager.evaluatePropertyValue;
38-
import static io.trino.metadata.SessionPropertyManager.serializeSessionProperty;
3931
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
40-
import static io.trino.sql.analyzer.SemanticExceptions.semanticException;
41-
import static java.lang.String.format;
4232
import static java.util.Objects.requireNonNull;
4333

4434
public class SetSessionTask
4535
implements DataDefinitionTask<SetSession>
4636
{
47-
private final PlannerContext plannerContext;
37+
private final SessionEvaluator sessionEvaluator;
4838
private final AccessControl accessControl;
49-
private final SessionPropertyManager sessionPropertyManager;
5039

5140
@Inject
52-
public SetSessionTask(PlannerContext plannerContext, AccessControl accessControl, SessionPropertyManager sessionPropertyManager)
41+
public SetSessionTask(SessionEvaluator sessionEvaluator, AccessControl accessControl)
5342
{
54-
this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
43+
this.sessionEvaluator = requireNonNull(sessionEvaluator, "sessionEvaluator is null");
5544
this.accessControl = requireNonNull(accessControl, "accessControl is null");
56-
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
5745
}
5846

5947
@Override
@@ -69,50 +57,23 @@ public ListenableFuture<Void> execute(
6957
List<Expression> parameters,
7058
WarningCollector warningCollector)
7159
{
72-
Session session = stateMachine.getSession();
7360
QualifiedName propertyName = statement.getName();
7461
List<String> parts = propertyName.getParts();
75-
if (parts.size() > 2) {
76-
throw semanticException(INVALID_SESSION_PROPERTY, statement, "Invalid session property '%s'", propertyName);
77-
}
62+
Session session = stateMachine.getSession();
7863

79-
// validate the property name
80-
PropertyMetadata<?> propertyMetadata;
8164
if (parts.size() == 1) {
82-
accessControl.checkCanSetSystemSessionProperty(session.getIdentity(), session.getQueryId(), parts.get(0));
83-
propertyMetadata = sessionPropertyManager.getSystemSessionPropertyMetadata(parts.get(0))
84-
.orElseThrow(() -> semanticException(INVALID_SESSION_PROPERTY, statement, "Session property '%s' does not exist", statement.getName()));
85-
}
86-
else {
87-
CatalogHandle catalogHandle = getRequiredCatalogHandle(plannerContext.getMetadata(), stateMachine.getSession(), statement, parts.get(0));
88-
accessControl.checkCanSetCatalogSessionProperty(SecurityContext.of(session), parts.get(0), parts.get(1));
89-
propertyMetadata = sessionPropertyManager.getConnectorSessionPropertyMetadata(catalogHandle, parts.get(1))
90-
.orElseThrow(() -> semanticException(INVALID_SESSION_PROPERTY, statement, "Session property '%s' does not exist", statement.getName()));
91-
}
92-
93-
Type type = propertyMetadata.getSqlType();
94-
Object objectValue;
95-
96-
try {
97-
objectValue = evaluatePropertyValue(statement.getValue(), type, session, plannerContext, accessControl, bindParameters(statement, parameters));
98-
}
99-
catch (TrinoException e) {
100-
throw new TrinoException(
101-
INVALID_SESSION_PROPERTY,
102-
format("Unable to set session property '%s' to '%s': %s", propertyName, statement.getValue(), e.getRawMessage()));
65+
accessControl.checkCanSetSystemSessionProperty(session.getIdentity(), session.getQueryId(), parts.getFirst());
10366
}
104-
105-
String value = serializeSessionProperty(type, objectValue);
106-
107-
// verify the SQL value can be decoded by the property
108-
try {
109-
propertyMetadata.decode(objectValue);
67+
else if (parts.size() == 2) {
68+
accessControl.checkCanSetCatalogSessionProperty(SecurityContext.of(session), parts.getFirst(), parts.getLast());
11069
}
111-
catch (RuntimeException e) {
112-
throw semanticException(INVALID_SESSION_PROPERTY, statement, "%s", e.getMessage());
70+
else {
71+
throw new TrinoException(INVALID_SESSION_PROPERTY, "Invalid session property '%s'".formatted(propertyName));
11372
}
11473

115-
stateMachine.addSetSessionProperties(propertyName.toString(), value);
74+
stateMachine.addSetSessionProperties(
75+
statement.getName().toString(),
76+
sessionEvaluator.evaluate(stateMachine.getSession(), statement.getName(), statement.getValue(), bindParameters(statement, parameters)));
11677

11778
return immediateVoidFuture();
11879
}

core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java

+7
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import io.trino.execution.QueryPerformanceFetcher;
5959
import io.trino.execution.QueryPreparer;
6060
import io.trino.execution.RemoteTaskFactory;
61+
import io.trino.execution.SessionEvaluator;
6162
import io.trino.execution.SqlQueryManager;
6263
import io.trino.execution.StageInfo;
6364
import io.trino.execution.TaskInfo;
@@ -113,6 +114,7 @@
113114
import io.trino.server.ui.WorkerResource;
114115
import io.trino.spi.VersionEmbedder;
115116
import io.trino.sql.PlannerContext;
117+
import io.trino.sql.SessionPropertyResolver;
116118
import io.trino.sql.analyzer.AnalyzerFactory;
117119
import io.trino.sql.analyzer.QueryExplainerFactory;
118120
import io.trino.sql.planner.OptimizerStatsMBeanExporter;
@@ -211,6 +213,8 @@ protected void setup(Binder binder)
211213

212214
// dispatcher
213215
binder.bind(DispatchManager.class).in(Scopes.SINGLETON);
216+
// WITH SESSION interpreter
217+
binder.bind(SessionPropertyResolver.class).in(Scopes.SINGLETON);
214218
// export under the old name, for backwards compatibility
215219
newExporter(binder).export(DispatchManager.class).as(generator -> generator.generatedNameOf(QueryManager.class));
216220
binder.bind(FailedDispatchQueryFactory.class).in(Scopes.SINGLETON);
@@ -318,6 +322,9 @@ List<OutputStatsEstimatorFactory> getCompositeOutputDataSizeEstimatorDelegateFac
318322
// explain analyze
319323
binder.bind(ExplainAnalyzeContext.class).in(Scopes.SINGLETON);
320324

325+
// session evaluator
326+
binder.bind(SessionEvaluator.class).in(Scopes.SINGLETON);
327+
321328
// execution scheduler
322329
jsonCodecBinder(binder).bindJsonCodec(TaskInfo.class);
323330
jsonCodecBinder(binder).bindJsonCodec(TaskStatus.class);

0 commit comments

Comments
 (0)