Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use dedicated protocol to render data in local timezone #24759

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ public enum ClientCapabilities
// When this capability is not set, the server returns datetime types with precision = 3
PARAMETRIC_DATETIME,
// Whether clients support the session authorization set/reset feature
SESSION_AUTHORIZATION;
SESSION_AUTHORIZATION,
USE_SESSION_TIME_ZONE,
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public final class ProtocolHeaders
private final String requestResourceEstimate;
private final String requestExtraCredential;
private final String requestQueryDataEncoding;
private final String requestRenderTimestampInLocalZone;
private final String responseSetCatalog;
private final String responseSetSchema;
private final String responseSetPath;
Expand Down Expand Up @@ -91,6 +92,7 @@ private ProtocolHeaders(String name)
requestResourceEstimate = prefix + "Resource-Estimate";
requestExtraCredential = prefix + "Extra-Credential";
requestQueryDataEncoding = prefix + "Query-Data-Encoding";
requestRenderTimestampInLocalZone = prefix + "Timestamp-In-Local-Zone";
responseSetCatalog = prefix + "Set-Catalog";
responseSetSchema = prefix + "Set-Schema";
responseSetPath = prefix + "Set-Path";
Expand Down Expand Up @@ -205,6 +207,11 @@ public String requestQueryDataEncoding()
return requestQueryDataEncoding;
}

public String getRequestRenderTimestampInLocalZone()
{
return requestRenderTimestampInLocalZone;
}

public String responseSetCatalog()
{
return responseSetCatalog;
Expand Down
33 changes: 27 additions & 6 deletions core/trino-main/src/main/java/io/trino/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public final class Session
private final ProtocolHeaders protocolHeaders;
private final Optional<Slice> exchangeEncryptionKey;
private final Optional<String> queryDataEncoding;
private final boolean renderTimestampInLocalTimeZone;

public Session(
QueryId queryId,
Expand Down Expand Up @@ -120,7 +121,8 @@ public Session(
Map<String, String> preparedStatements,
ProtocolHeaders protocolHeaders,
Optional<Slice> exchangeEncryptionKey,
Optional<String> queryDataEncoding)
Optional<String> queryDataEncoding,
boolean renderTimestampInLocalTimeZone)
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.querySpan = requireNonNull(querySpan, "querySpan is null");
Expand Down Expand Up @@ -157,6 +159,7 @@ public Session(
this.catalogProperties = catalogPropertiesBuilder.buildOrThrow();

checkArgument(catalog.isPresent() || schema.isEmpty(), "schema is set but catalog is not");
this.renderTimestampInLocalTimeZone = renderTimestampInLocalTimeZone;
}

public QueryId getQueryId()
Expand Down Expand Up @@ -265,6 +268,11 @@ public TransactionId getRequiredTransactionId()
return transactionId.orElseThrow(NotInTransactionException::new);
}

public boolean isRenderTimestampInLocalTimeZone()
{
return renderTimestampInLocalTimeZone;
}

public boolean isClientTransactionSupport()
{
return clientTransactionSupport;
Expand Down Expand Up @@ -393,7 +401,8 @@ public Session beginTransactionId(TransactionId transactionId, TransactionManage
preparedStatements,
protocolHeaders,
exchangeEncryptionKey,
queryDataEncoding);
queryDataEncoding,
renderTimestampInLocalTimeZone);
}

public Session withDefaultProperties(Map<String, String> systemPropertyDefaults, Map<String, Map<String, String>> catalogPropertyDefaults, AccessControl accessControl)
Expand Down Expand Up @@ -443,7 +452,8 @@ public Session withDefaultProperties(Map<String, String> systemPropertyDefaults,
preparedStatements,
protocolHeaders,
exchangeEncryptionKey,
queryDataEncoding);
queryDataEncoding,
renderTimestampInLocalTimeZone);
}

public Session withExchangeEncryption(Slice encryptionKey)
Expand Down Expand Up @@ -476,7 +486,8 @@ public Session withExchangeEncryption(Slice encryptionKey)
preparedStatements,
protocolHeaders,
Optional.of(encryptionKey),
queryDataEncoding);
queryDataEncoding,
renderTimestampInLocalTimeZone);
}

public ConnectorSession toConnectorSession()
Expand Down Expand Up @@ -530,7 +541,8 @@ public SessionRepresentation toSessionRepresentation()
identity.getCatalogRoles(),
preparedStatements,
protocolHeaders.getProtocolName(),
queryDataEncoding);
queryDataEncoding,
renderTimestampInLocalTimeZone);
}

@Override
Expand All @@ -556,6 +568,7 @@ public String toString()
.add("clientCapabilities", clientCapabilities)
.add("resourceEstimates", resourceEstimates)
.add("start", start)
.add("renderTimestampInLocalTimeZone", renderTimestampInLocalTimeZone)
.omitNullValues()
.toString();
}
Expand Down Expand Up @@ -663,6 +676,7 @@ public static class SessionBuilder
private Optional<String> queryDataEncoding = Optional.empty();
private ResourceEstimates resourceEstimates;
private Instant start = Instant.now();
private boolean renderTimestampInLocalTimeZone;
private final Map<String, String> systemProperties = new HashMap<>();
private final Map<String, Map<String, String>> catalogSessionProperties = new HashMap<>();
private final SessionPropertyManager sessionPropertyManager;
Expand Down Expand Up @@ -950,6 +964,12 @@ public SessionBuilder setQueryDataEncoding(Optional<String> value)
return this;
}

public SessionBuilder setRenderTimestampInLocalTimeZone(boolean renderTimestampInLocalTimeZone)
{
this.renderTimestampInLocalTimeZone = renderTimestampInLocalTimeZone;
return this;
}

public Session build()
{
return new Session(
Expand Down Expand Up @@ -979,7 +999,8 @@ public Session build()
preparedStatements,
protocolHeaders,
Optional.empty(),
queryDataEncoding);
queryDataEncoding,
renderTimestampInLocalTimeZone);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public final class SessionRepresentation
private final Map<String, String> preparedStatements;
private final String protocolName;
private final Optional<String> queryDataEncoding;
private final boolean renderTimestampInLocalTimeZone;

@JsonCreator
public SessionRepresentation(
Expand Down Expand Up @@ -104,7 +105,8 @@ public SessionRepresentation(
@JsonProperty("catalogRoles") Map<String, SelectedRole> catalogRoles,
@JsonProperty("preparedStatements") Map<String, String> preparedStatements,
@JsonProperty("protocolName") String protocolName,
@JsonProperty("queryDataEncoding") Optional<String> queryDataEncoding)
@JsonProperty("queryDataEncoding") Optional<String> queryDataEncoding,
@JsonProperty("renderTimestampInLocalTimeZone") boolean renderTimestampInLocalTimeZone)
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.querySpan = requireNonNull(querySpan, "querySpan is null");
Expand Down Expand Up @@ -141,6 +143,7 @@ public SessionRepresentation(
catalogPropertiesBuilder.put(entry.getKey(), ImmutableMap.copyOf(entry.getValue()));
}
this.catalogProperties = catalogPropertiesBuilder.buildOrThrow();
this.renderTimestampInLocalTimeZone = renderTimestampInLocalTimeZone;
}

@JsonProperty
Expand Down Expand Up @@ -388,6 +391,7 @@ public Session toSession(SessionPropertyManager sessionPropertyManager, Map<Stri
preparedStatements,
createProtocolHeaders(protocolName),
exchangeEncryptionKey,
queryDataEncoding);
queryDataEncoding,
renderTimestampInLocalTimeZone);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.spi.type.SqlTimestampWithTimeZone;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimeWithTimeZoneType;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -59,7 +60,9 @@ public class JsonArrayResultsIterator
private final ImmutableList<Page> pages;
private final List<OutputColumn> columns;
private final boolean supportsParametricDateTime;
private final boolean supportsUseSessionTimeZone;
private final Consumer<TrinoException> exceptionConsumer;
private final TimeZoneKey sessionTimeZoneKey;

private Page currentPage;
private int rowPosition = -1;
Expand All @@ -72,7 +75,9 @@ public JsonArrayResultsIterator(Session session, List<Page> pages, List<OutputCo
this.session = requireNonNull(session, "session is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.supportsParametricDateTime = session.getClientCapabilities().contains(ClientCapabilities.PARAMETRIC_DATETIME.toString());
this.supportsUseSessionTimeZone = session.isRenderTimestampInLocalTimeZone();
this.exceptionConsumer = requireNonNull(exceptionConsumer, "exceptionConsumer is null");
this.sessionTimeZoneKey = session.getTimeZoneKey();
this.currentPage = queue.pollFirst();
}

Expand Down Expand Up @@ -120,6 +125,9 @@ private List<Object> getRowValues()
if (!supportsParametricDateTime) {
value = getLegacyValue(value, type);
}
if (supportsUseSessionTimeZone) {
value = useSessionTimeZone(value, type);
}
row.add(value);
}
catch (Throwable throwable) {
Expand All @@ -131,6 +139,56 @@ private List<Object> getRowValues()
return unmodifiableList(row);
}

private Object useSessionTimeZone(Object value, Type type)
{
if (value == null) {
return null;
}

if (type instanceof TimestampWithTimeZoneType) {
SqlTimestampWithTimeZone originalValue = ((SqlTimestampWithTimeZone) value);
return SqlTimestampWithTimeZone.newInstance(originalValue.getPrecision(), originalValue.getEpochMillis(), originalValue.getPicosOfMilli(), sessionTimeZoneKey);
}

if (type instanceof ArrayType) {
Type elementType = ((ArrayType) type).getElementType();

if (!(elementType instanceof TimestampWithTimeZoneType)) {
return value;
}

List<Object> listValue = (List<Object>) value;
List<Object> updateValues = new ArrayList<>(listValue.size());
for (Object element : listValue) {
updateValues.add(useSessionTimeZone(element, elementType));
}

return unmodifiableList(updateValues);
}

if (type instanceof MapType) {
Type keyType = ((MapType) type).getKeyType();
Type valueType = ((MapType) type).getValueType();

Map<Object, Object> mapValue = (Map<Object, Object>) value;
Map<Object, Object> result = Maps.newHashMapWithExpectedSize(mapValue.size());
mapValue.forEach((key, val) -> result.put(useSessionTimeZone(key, keyType), useSessionTimeZone(val, valueType)));
return unmodifiableMap(result);
}

if (type instanceof RowType) {
List<RowType.Field> fields = ((RowType) type).getFields();
List<Object> values = (List<Object>) value;

List<Object> result = new ArrayList<>(values.size());
for (int i = 0; i < values.size(); i++) {
result.add(useSessionTimeZone(values.get(i), fields.get(i).getType()));
}
return unmodifiableList(result);
}
return value;
}

private Object getLegacyValue(Object value, Type type)
{
if (value == null) {
Expand Down
Loading