From ffea77360ee16604192a23c6a2e72c10664c4df4 Mon Sep 17 00:00:00 2001 From: Jordan Zimmerman Date: Fri, 21 Jun 2024 07:00:14 +0100 Subject: [PATCH 1/5] Rename TrinoS3ProxyResource to TrinoS3Resource --- .../io/trino/s3/proxy/server/TrinoS3ProxyServerModule.java | 6 +++--- .../{TrinoS3ProxyResource.java => TrinoS3Resource.java} | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) rename trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/{TrinoS3ProxyResource.java => TrinoS3Resource.java} (96%) diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/TrinoS3ProxyServerModule.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/TrinoS3ProxyServerModule.java index 9b122fac..56a27d83 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/TrinoS3ProxyServerModule.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/TrinoS3ProxyServerModule.java @@ -26,7 +26,7 @@ import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient; import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient.ForProxyClient; import io.trino.s3.proxy.server.rest.TrinoS3ProxyConfig; -import io.trino.s3.proxy.server.rest.TrinoS3ProxyResource; +import io.trino.s3.proxy.server.rest.TrinoS3Resource; import io.trino.s3.proxy.server.rest.TrinoStsResource; import io.trino.s3.proxy.server.security.SecurityController; import io.trino.s3.proxy.server.signing.InternalSigningController; @@ -60,8 +60,8 @@ protected void setup(Binder binder) configBinder(binder).bindConfig(SigningControllerConfig.class); TrinoS3ProxyConfig builtConfig = buildConfigObject(TrinoS3ProxyConfig.class); - jaxrsBinder(binder).bind(TrinoS3ProxyResource.class); - jaxrsBinder(binder).bindInstance(buildResourceAtPath(TrinoS3ProxyResource.class, builtConfig.getS3Path())); + jaxrsBinder(binder).bind(TrinoS3Resource.class); + jaxrsBinder(binder).bindInstance(buildResourceAtPath(TrinoS3Resource.class, builtConfig.getS3Path())); jaxrsBinder(binder).bind(TrinoStsResource.class); jaxrsBinder(binder).bindInstance(buildResourceAtPath(TrinoStsResource.class, builtConfig.getStsPath())); diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyResource.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3Resource.java similarity index 96% rename from trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyResource.java rename to trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3Resource.java index 9fda4c15..875df8af 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyResource.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3Resource.java @@ -37,7 +37,7 @@ import static io.trino.s3.proxy.server.rest.RequestBuilder.fromRequest; import static java.util.Objects.requireNonNull; -public class TrinoS3ProxyResource +public class TrinoS3Resource { private final SigningController signingController; private final TrinoS3ProxyClient proxyClient; @@ -45,7 +45,7 @@ public class TrinoS3ProxyResource private final String s3Path; @Inject - public TrinoS3ProxyResource(SigningController signingController, TrinoS3ProxyClient proxyClient, TrinoS3ProxyConfig trinoS3ProxyConfig) + public TrinoS3Resource(SigningController signingController, TrinoS3ProxyClient proxyClient, TrinoS3ProxyConfig trinoS3ProxyConfig) { this.signingController = requireNonNull(signingController, "signingController is null"); this.proxyClient = requireNonNull(proxyClient, "proxyClient is null"); From 9b21f8a560207341958c0d8511afaa6de6785535 Mon Sep 17 00:00:00 2001 From: Jordan Zimmerman Date: Fri, 21 Jun 2024 07:09:58 +0100 Subject: [PATCH 2/5] Add more debug logs --- .../server/credentials/CredentialsController.java | 10 +++++++++- .../s3/proxy/server/rest/TrinoS3ProxyClient.java | 12 +++++++++--- .../server/signing/InternalSigningController.java | 9 ++++++++- .../io/trino/s3/proxy/server/signing/Signer.java | 2 +- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/credentials/CredentialsController.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/credentials/CredentialsController.java index 3fb3880c..a8c3e840 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/credentials/CredentialsController.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/credentials/CredentialsController.java @@ -14,6 +14,7 @@ package io.trino.s3.proxy.server.credentials; import com.google.inject.Inject; +import io.airlift.log.Logger; import io.trino.s3.proxy.server.remote.RemoteS3Facade; import io.trino.s3.proxy.spi.credentials.Credential; import io.trino.s3.proxy.spi.credentials.Credentials; @@ -43,6 +44,8 @@ public class CredentialsController { + private static final Logger log = Logger.get(CredentialsController.class); + private final RemoteS3Facade remoteS3Facade; private final CredentialsProvider credentialsProvider; private final Map remoteSessions = new ConcurrentHashMap<>(); @@ -132,10 +135,15 @@ public void shutdown() @SuppressWarnings("resource") public Optional withCredentials(String emulatedAccessKey, Optional emulatedSessionToken, Function> credentialsConsumer) { - return credentialsProvider.credentials(emulatedAccessKey, emulatedSessionToken) + Optional result = credentialsProvider.credentials(emulatedAccessKey, emulatedSessionToken) .flatMap(credentials -> credentials.remoteSessionRole() .flatMap(remoteSessionRole -> internalRemoteSession(remoteSessionRole, credentials).withUsage(credentials, credentialsConsumer)) .or(() -> credentialsConsumer.apply(credentials))); + + result.ifPresentOrElse(_ -> log.debug("Credentials found. EmulatedAccessKey: %s", emulatedAccessKey), + () -> log.debug("Credentials not found. EmulatedAccessKey: %s", emulatedAccessKey)); + + return result; } private Session internalRemoteSession(RemoteSessionRole remoteSessionRole, Credentials credentials) diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java index 89bfe6ec..af4c2870 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java @@ -17,6 +17,7 @@ import com.google.inject.Inject; import io.airlift.http.client.HttpClient; import io.airlift.http.client.Request; +import io.airlift.log.Logger; import io.trino.s3.proxy.server.remote.RemoteS3Facade; import io.trino.s3.proxy.server.security.SecurityController; import io.trino.s3.proxy.spi.collections.ImmutableMultiMap; @@ -24,6 +25,7 @@ import io.trino.s3.proxy.spi.credentials.Credentials; import io.trino.s3.proxy.spi.rest.ParsedS3Request; import io.trino.s3.proxy.spi.rest.RequestContent; +import io.trino.s3.proxy.spi.security.SecurityResponse; import io.trino.s3.proxy.spi.signing.SigningContext; import io.trino.s3.proxy.spi.signing.SigningController; import io.trino.s3.proxy.spi.signing.SigningMetadata; @@ -52,6 +54,8 @@ public class TrinoS3ProxyClient { + private static final Logger log = Logger.get(TrinoS3ProxyClient.class); + private static final int CHUNK_SIZE = 8_192 * 8; private final HttpClient httpClient; @@ -78,7 +82,7 @@ public TrinoS3ProxyClient(@ForProxyClient HttpClient httpClient, SigningControll public void shutDown() { if (!shutdownAndAwaitTermination(executorService, Duration.ofSeconds(30))) { - // TODO add logging - check for false result + log.warn("Could not shutdown executor service"); } } @@ -86,8 +90,9 @@ public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request reques { URI remoteUri = remoteS3Facade.buildEndpoint(uriBuilder(request.queryParameters()), request.rawPath(), request.bucketName(), request.requestAuthorization().region()); - // TODO log/expose any securityController error - if (!securityController.apply(request).canProceed()) { + SecurityResponse securityResponse = securityController.apply(request); + if (!securityResponse.canProceed()) { + log.debug("SecurityController check failed. AccessKey: %s, Request: %s, SecurityResponse: %s", signingMetadata.credentials().emulated().accessKey(), request, securityResponse); throw new WebApplicationException(Response.Status.UNAUTHORIZED); } @@ -97,6 +102,7 @@ public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request reques .setFollowRedirects(true); if (remoteUri.getHost() == null) { + log.debug("RemoteURI missing host. AccessKey: %s, Request: %s", signingMetadata.credentials().emulated().accessKey(), request); throw new WebApplicationException(Response.Status.BAD_REQUEST); } diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/InternalSigningController.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/InternalSigningController.java index ddbd0422..7fed1ae9 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/InternalSigningController.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/InternalSigningController.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; +import io.airlift.log.Logger; import io.trino.s3.proxy.server.credentials.CredentialsController; import io.trino.s3.proxy.spi.collections.ImmutableMultiMap; import io.trino.s3.proxy.spi.collections.MultiMap; @@ -46,6 +47,8 @@ public class InternalSigningController implements SigningController { + private static final Logger log = Logger.get(SigningController.class); + private final Duration maxClockDrift; private final CredentialsController credentialsController; @@ -104,13 +107,17 @@ public String signRequest( public SigningMetadata validateAndParseAuthorization(Request request, SigningServiceType signingServiceType) { if (!request.requestAuthorization().isValid()) { + log.debug("Invalid requestAuthorization. Request: %s, SigningServiceType: %s", request, signingServiceType); throw new WebApplicationException(Response.Status.UNAUTHORIZED); } return credentialsController.withCredentials(request.requestAuthorization().accessKey(), request.requestAuthorization().securityToken(), credentials -> { SigningMetadata metadata = new SigningMetadata(signingServiceType, credentials, Optional.empty()); return isValidAuthorization(metadata, request, Credentials::emulated); - }).orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); + }).orElseThrow(() -> { + log.debug("ValidateAndParseAuthorization failed. Request: %s, SigningServiceType: %s", request, signingServiceType); + return new WebApplicationException(Response.Status.UNAUTHORIZED); + }); } private SigningContext internalSignRequest( diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/Signer.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/Signer.java index 458b8e2e..20042216 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/Signer.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/Signer.java @@ -146,7 +146,7 @@ private static SigningContext buildSigningContext(String authorization, byte[] s { RequestAuthorization requestAuthorization = RequestAuthorization.parse(authorization); if (!requestAuthorization.isValid()) { - // TODO logging, etc. + log.debug("Invalid RequestAuthorization. RequestAuthorization: %s", requestAuthorization); throw new WebApplicationException(Response.Status.BAD_REQUEST); } ChunkSigner chunkSigner = new ChunkSigner(requestDate, requestAuthorization.keyPath(), signingKey); From c237f58ec02e75e450109a1026dc43e5ebf62fa7 Mon Sep 17 00:00:00 2001 From: Jordan Zimmerman Date: Thu, 27 Jun 2024 09:24:41 +0100 Subject: [PATCH 3/5] Add a requestId to Request and ParsedS3Request --- .../trino/s3/proxy/spi/rest/ParsedS3Request.java | 3 +++ .../java/io/trino/s3/proxy/spi/rest/Request.java | 3 +++ .../s3/proxy/server/rest/RequestBuilder.java | 15 ++++++++++++++- 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/trino-s3-proxy-spi/src/main/java/io/trino/s3/proxy/spi/rest/ParsedS3Request.java b/trino-s3-proxy-spi/src/main/java/io/trino/s3/proxy/spi/rest/ParsedS3Request.java index 5078968d..d50d31de 100644 --- a/trino-s3-proxy-spi/src/main/java/io/trino/s3/proxy/spi/rest/ParsedS3Request.java +++ b/trino-s3-proxy-spi/src/main/java/io/trino/s3/proxy/spi/rest/ParsedS3Request.java @@ -18,10 +18,12 @@ import io.trino.s3.proxy.spi.signing.RequestAuthorization; import java.util.Optional; +import java.util.UUID; import static java.util.Objects.requireNonNull; public record ParsedS3Request( + UUID requestId, RequestAuthorization requestAuthorization, String requestDate, String bucketName, @@ -35,6 +37,7 @@ public record ParsedS3Request( { public ParsedS3Request { + requireNonNull(requestId, "requestId is null"); requireNonNull(requestAuthorization, "requestAuthorization is null"); requireNonNull(requestDate, "requestDate is null"); requireNonNull(bucketName, "bucketName is null"); diff --git a/trino-s3-proxy-spi/src/main/java/io/trino/s3/proxy/spi/rest/Request.java b/trino-s3-proxy-spi/src/main/java/io/trino/s3/proxy/spi/rest/Request.java index dfed07bd..2219bb10 100644 --- a/trino-s3-proxy-spi/src/main/java/io/trino/s3/proxy/spi/rest/Request.java +++ b/trino-s3-proxy-spi/src/main/java/io/trino/s3/proxy/spi/rest/Request.java @@ -18,10 +18,12 @@ import io.trino.s3.proxy.spi.signing.RequestAuthorization; import java.net.URI; +import java.util.UUID; import static java.util.Objects.requireNonNull; public record Request( + UUID requestId, RequestAuthorization requestAuthorization, String requestDate, URI requestUri, @@ -32,6 +34,7 @@ public record Request( { public Request { + requireNonNull(requestId, "requestId is null"); requireNonNull(requestAuthorization, "requestAuthorization is null"); requireNonNull(requestDate, "requestDate is null"); requireNonNull(requestUri, "requestUri is null"); diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestBuilder.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestBuilder.java index 7dbd2c86..03b5ee00 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestBuilder.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestBuilder.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Locale; import java.util.Optional; +import java.util.UUID; import java.util.function.Supplier; import static com.google.common.io.ByteStreams.toByteArray; @@ -56,6 +57,7 @@ static Request fromRequest(ContainerRequest request) Optional securityTokenHeader = requestHeaders.getFirst("x-amz-security-token"); RequestContent requestContent = request.hasEntity() ? buildRequestContent(request.getEntityStream(), getRequestContentTypeFromHeader(requestHeaders)) : RequestContent.EMPTY; return new Request( + UUID.randomUUID(), RequestAuthorization.parse(requestHeaders.getFirst("authorization").orElse(""), securityTokenHeader), xAmzDate, request.getRequestUri(), @@ -94,7 +96,18 @@ record BucketAndKey(String bucket, String rawKey) {} }); String keyInBucket = URLDecoder.decode(bucketAndKey.rawKey, StandardCharsets.UTF_8); - return new ParsedS3Request(request.requestAuthorization(), request.requestDate(), bucketAndKey.bucket, keyInBucket, headers, queryParameters, httpVerb, bucketAndKey.rawKey, rawQuery, requestContent); + return new ParsedS3Request( + request.requestId(), + request.requestAuthorization(), + request.requestDate(), + bucketAndKey.bucket, + keyInBucket, + headers, + queryParameters, + httpVerb, + bucketAndKey.rawKey, + rawQuery, + requestContent); } private static RequestContent buildRequestContent(InputStream requestEntityStream, String requestContentType) From 9a2d618f7d4978d23feba187cf907960c6908239 Mon Sep 17 00:00:00 2001 From: Jordan Zimmerman Date: Fri, 21 Jun 2024 07:36:47 +0100 Subject: [PATCH 4/5] Add request logging mechanism TODO - add complete telemetry system Adds controller that logs requests and allows for eventual admin API that can return recent request info Relates to #25 --- trino-s3-proxy/pom.xml | 6 + .../server/TrinoS3ProxyServerModule.java | 2 + .../server/rest/RequestLoggerController.java | 210 ++++++++++++++++++ .../server/rest/RequestLoggingSession.java | 38 ++++ .../server/rest/StreamingResponseHandler.java | 29 ++- .../proxy/server/rest/TrinoS3ProxyClient.java | 10 +- .../s3/proxy/server/rest/TrinoS3Resource.java | 24 +- .../signing/InternalSigningController.java | 19 +- .../s3/proxy/server/rest/HangingResource.java | 2 +- .../server/signing/TestSigningController.java | 5 +- .../testing/TestingTrinoS3ProxyServer.java | 4 + .../testing/containers/DockerAttachUtil.java | 6 + 12 files changed, 335 insertions(+), 20 deletions(-) create mode 100644 trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestLoggerController.java create mode 100644 trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestLoggingSession.java diff --git a/trino-s3-proxy/pom.xml b/trino-s3-proxy/pom.xml index fc200cc8..aaabf210 100644 --- a/trino-s3-proxy/pom.xml +++ b/trino-s3-proxy/pom.xml @@ -169,6 +169,12 @@ utils + + io.airlift + log-manager + runtime + + com.github.docker-java docker-java-api diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/TrinoS3ProxyServerModule.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/TrinoS3ProxyServerModule.java index 56a27d83..cf752918 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/TrinoS3ProxyServerModule.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/TrinoS3ProxyServerModule.java @@ -23,6 +23,7 @@ import io.trino.s3.proxy.server.credentials.CredentialsController; import io.trino.s3.proxy.server.remote.RemoteS3Facade; import io.trino.s3.proxy.server.remote.VirtualHostStyleRemoteS3Facade; +import io.trino.s3.proxy.server.rest.RequestLoggerController; import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient; import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient.ForProxyClient; import io.trino.s3.proxy.server.rest.TrinoS3ProxyConfig; @@ -68,6 +69,7 @@ protected void setup(Binder binder) binder.bind(SigningController.class).to(InternalSigningController.class).in(Scopes.SINGLETON); binder.bind(CredentialsController.class).in(Scopes.SINGLETON); binder.bind(SecurityController.class).in(Scopes.SINGLETON); + binder.bind(RequestLoggerController.class).in(Scopes.SINGLETON); // TODO config, etc. httpClientBinder(binder).bindHttpClient("ProxyClient", ForProxyClient.class); diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestLoggerController.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestLoggerController.java new file mode 100644 index 00000000..c53a1958 --- /dev/null +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestLoggerController.java @@ -0,0 +1,210 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.s3.proxy.server.rest; + +import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.s3.proxy.spi.rest.Request; +import io.trino.s3.proxy.spi.signing.SigningServiceType; +import jakarta.annotation.PreDestroy; +import jakarta.ws.rs.WebApplicationException; + +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import static com.google.common.base.Preconditions.checkState; +import static io.trino.s3.proxy.spi.rest.RequestContent.ContentType.EMPTY; + +public class RequestLoggerController +{ + private static final Logger log = Logger.get(RequestLoggerController.class); + + private interface LoggerProc + { + void log(String format, Object... args); + + boolean isEnabled(); + } + + private static final LoggerProc debugLogger = new LoggerProc() + { + @Override + public void log(String format, Object... args) + { + log.debug(format, args); + } + + @Override + public boolean isEnabled() + { + return log.isDebugEnabled(); + } + }; + + private static final LoggerProc infoLogger = new LoggerProc() + { + @Override + public void log(String format, Object... args) + { + log.info(format, args); + } + + @Override + public boolean isEnabled() + { + return log.isInfoEnabled(); + } + }; + + private static final RequestLoggingSession NOP_REQUEST_LOGGING_SESSION = () -> {}; + + private volatile LoggerProc loggerProc = debugLogger; + private final Map sessions = new ConcurrentHashMap<>(); + + @PreDestroy + public void verifyState() + { + checkState(sessions.isEmpty(), "Some logging sessions were not closed: " + sessions); + } + + // TODO - allow levels to be set for only certain users, IPs, etc. + + public void setLevelInfo() + { + loggerProc = infoLogger; + } + + public void setLevelDebug() + { + loggerProc = debugLogger; + } + + public RequestLoggingSession newRequestSession(Request request, SigningServiceType serviceType) + { + return sessions.compute(request.requestId(), (requestId, current) -> { + checkState(current == null, "There is already a logging session for the request: " + requestId); + return internalNewRequestSession(request, serviceType); + }); + } + + public Optional currentRequestSession(UUID requestId) + { + return Optional.ofNullable(sessions.get(requestId)); + } + + private RequestLoggingSession internalNewRequestSession(Request request, SigningServiceType serviceType) + { + if (!loggerProc.isEnabled()) { + return NOP_REQUEST_LOGGING_SESSION; + } + + Map entries = new ConcurrentHashMap<>(); + + Map properties = new ConcurrentHashMap<>(); + + Map errors = new ConcurrentHashMap<>(); + + Map requestDetails = ImmutableMap.of( + "request.id", request.requestId(), + "request.type", serviceType, + "request.uri", request.requestUri(), + "request.http.method", request.httpVerb(), + "request.http.entity", request.requestContent().contentType() != EMPTY); + + addAll(entries, requestDetails); + + logAndClear("RequestStart", entries); + + return new RequestLoggingSession() + { + private final Stopwatch stopwatch = Stopwatch.createStarted(); + private volatile boolean closed; + + @Override + public void logProperty(String name, Object value) + { + properties.put(name, String.valueOf(value)); + } + + @Override + public void logError(String name, Object value) + { + errors.put(name, String.valueOf(value)); + } + + @SuppressWarnings({"ThrowableNotThrown", "SwitchStatementWithTooFewBranches"}) + @Override + public void logException(Throwable e) + { + switch (Throwables.getRootCause(e)) { + case WebApplicationException webApplicationException -> { + errors.put("webException.status", Integer.toString(webApplicationException.getResponse().getStatus())); + errors.put("webException.message", webApplicationException.getMessage()); + } + + default -> { + errors.put("exception.type", e.getClass().getName()); + errors.put("exception.message", e.getMessage()); + } + } + } + + @SuppressWarnings("resource") + @Override + public void close() + { + if (closed) { + return; + } + closed = true; + + try { + addAll(entries, requestDetails); + add(entries, "request.elapsed.ms", stopwatch.elapsed().toMillis()); + add(entries, "request.properties", properties); + add(entries, "request.errors", errors); + + logAndClear("RequestEnd", entries); + } + finally { + sessions.remove(request.requestId()); + } + } + }; + } + + private void addAll(Map entries, Map values) + { + values.forEach((key, value) -> add(entries, key, value)); + } + + private void add(Map entries, String key, Object value) + { + entries.put(key, String.valueOf(value)); + } + + private void logAndClear(String message, Map entries) + { + // TODO - keep a list of recent entries, etc. + + Map copy = ImmutableMap.copyOf(entries); + entries.clear(); + + loggerProc.log("%s: %s", message, copy); + } +} diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestLoggingSession.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestLoggingSession.java new file mode 100644 index 00000000..821da3e6 --- /dev/null +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/RequestLoggingSession.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.s3.proxy.server.rest; + +import java.io.Closeable; + +public interface RequestLoggingSession + extends Closeable +{ + default void logProperty(String name, Object value) + { + // NOP + } + + default void logError(String name, Object value) + { + // NOP + } + + default void logException(Throwable e) + { + // NOP + } + + @Override + void close(); +} diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/StreamingResponseHandler.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/StreamingResponseHandler.java index 576aa627..ba969688 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/StreamingResponseHandler.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/StreamingResponseHandler.java @@ -29,16 +29,21 @@ class StreamingResponseHandler implements ResponseHandler { private final AsyncResponse asyncResponse; + private final RequestLoggingSession requestLoggingSession; - StreamingResponseHandler(AsyncResponse asyncResponse) + StreamingResponseHandler(AsyncResponse asyncResponse, RequestLoggingSession requestLoggingSession) { this.asyncResponse = requireNonNull(asyncResponse, "asyncResponse is null"); + this.requestLoggingSession = requireNonNull(requestLoggingSession, "requestLoggingSession is null"); } @Override public Void handleException(Request request, Exception exception) throws RuntimeException { + requestLoggingSession.logException(exception); + requestLoggingSession.close(); + throw propagate(request, exception); } @@ -56,15 +61,21 @@ public Void handle(Request request, Response response) output.flush(); }; - jakarta.ws.rs.core.Response.ResponseBuilder responseBuilder = jakarta.ws.rs.core.Response.status(response.getStatusCode()).entity(streamingOutput); - response.getHeaders() - .keySet() - .stream() - .map(HeaderName::toString) - .forEach(name -> response.getHeaders(name).forEach(value -> responseBuilder.header(name, value))); + try (requestLoggingSession) { + jakarta.ws.rs.core.Response.ResponseBuilder responseBuilder = jakarta.ws.rs.core.Response.status(response.getStatusCode()).entity(streamingOutput); + response.getHeaders() + .keySet() + .stream() + .map(HeaderName::toString) + .forEach(name -> response.getHeaders(name).forEach(value -> responseBuilder.header(name, value))); + + requestLoggingSession.logProperty("response.status", response.getStatusCode()); + requestLoggingSession.logProperty("response.headers", response.getHeaders()); + + // this will block until StreamingOutput completes - // this will block until StreamingOutput completes - asyncResponse.resume(responseBuilder.build()); + asyncResponse.resume(responseBuilder.build()); + } return null; } diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java index af4c2870..4ba4ab94 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java @@ -86,13 +86,17 @@ public void shutDown() } } - public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request request, AsyncResponse asyncResponse) + public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request request, AsyncResponse asyncResponse, RequestLoggingSession requestLoggingSession) { URI remoteUri = remoteS3Facade.buildEndpoint(uriBuilder(request.queryParameters()), request.rawPath(), request.bucketName(), request.requestAuthorization().region()); SecurityResponse securityResponse = securityController.apply(request); if (!securityResponse.canProceed()) { log.debug("SecurityController check failed. AccessKey: %s, Request: %s, SecurityResponse: %s", signingMetadata.credentials().emulated().accessKey(), request, securityResponse); + requestLoggingSession.logError("request.security.fail.credentials", signingMetadata.credentials().emulated()); + requestLoggingSession.logError("request.security.fail.request", request); + requestLoggingSession.logError("request.security.fail.response", securityResponse); + throw new WebApplicationException(Response.Status.UNAUTHORIZED); } @@ -151,9 +155,11 @@ public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request reques executorService.submit(() -> { try { - httpClient.execute(remoteRequest, new StreamingResponseHandler(asyncResponse)); + httpClient.execute(remoteRequest, new StreamingResponseHandler(asyncResponse, requestLoggingSession)); } catch (Throwable e) { + requestLoggingSession.logException(e); + requestLoggingSession.close(); asyncResponse.resume(e); } }); diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3Resource.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3Resource.java index 875df8af..ee02c7bf 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3Resource.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3Resource.java @@ -43,13 +43,16 @@ public class TrinoS3Resource private final TrinoS3ProxyClient proxyClient; private final Optional serverHostName; private final String s3Path; + private final RequestLoggerController requestLoggerController; @Inject - public TrinoS3Resource(SigningController signingController, TrinoS3ProxyClient proxyClient, TrinoS3ProxyConfig trinoS3ProxyConfig) + public TrinoS3Resource(SigningController signingController, TrinoS3ProxyClient proxyClient, TrinoS3ProxyConfig trinoS3ProxyConfig, RequestLoggerController requestLoggerController) { this.signingController = requireNonNull(signingController, "signingController is null"); this.proxyClient = requireNonNull(proxyClient, "proxyClient is null"); this.serverHostName = trinoS3ProxyConfig.getS3HostName(); + this.requestLoggerController = requireNonNull(requestLoggerController, "requestLogger is null"); + s3Path = trinoS3ProxyConfig.getS3Path(); } @@ -121,10 +124,23 @@ public void s3DeleteWithPath(@Context ContainerRequest containerRequest, @Suspen private void handler(ContainerRequest containerRequest, AsyncResponse asyncResponse) { Request request = fromRequest(containerRequest); - ParsedS3Request parsedS3Request = parseRequest(request); - SigningMetadata signingMetadata = signingController.validateAndParseAuthorization(request, SigningServiceType.S3); + RequestLoggingSession requestLoggingSession = requestLoggerController.newRequestSession(request, SigningServiceType.S3); + try { + ParsedS3Request parsedS3Request = parseRequest(request); + + requestLoggingSession.logProperty("request.parsed.bucket", parsedS3Request.bucketName()); + requestLoggingSession.logProperty("request.parsed.key", parsedS3Request.keyInBucket()); + + SigningMetadata signingMetadata = signingController.validateAndParseAuthorization(request, SigningServiceType.S3); + requestLoggingSession.logProperty("request.emulated.key", signingMetadata.credentials().emulated().secretKey()); - proxyClient.proxyRequest(signingMetadata, parsedS3Request, asyncResponse); + proxyClient.proxyRequest(signingMetadata, parsedS3Request, asyncResponse, requestLoggingSession); + } + catch (Throwable e) { + requestLoggingSession.logException(e); + requestLoggingSession.close(); + throw e; + } } private ParsedS3Request parseRequest(Request request) diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/InternalSigningController.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/InternalSigningController.java index 7fed1ae9..4645f9c0 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/InternalSigningController.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/InternalSigningController.java @@ -13,10 +13,12 @@ */ package io.trino.s3.proxy.server.signing; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import io.airlift.log.Logger; import io.trino.s3.proxy.server.credentials.CredentialsController; +import io.trino.s3.proxy.server.rest.RequestLoggerController; import io.trino.s3.proxy.spi.collections.ImmutableMultiMap; import io.trino.s3.proxy.spi.collections.MultiMap; import io.trino.s3.proxy.spi.credentials.Credential; @@ -50,14 +52,17 @@ public class InternalSigningController private static final Logger log = Logger.get(SigningController.class); private final Duration maxClockDrift; + private final RequestLoggerController requestLoggerController; private final CredentialsController credentialsController; private static final Set LOWERCASE_HEADERS = ImmutableSet.of("content-type"); @Inject - public InternalSigningController(CredentialsController credentialsController, SigningControllerConfig signingControllerConfig) + public InternalSigningController(CredentialsController credentialsController, SigningControllerConfig signingControllerConfig, RequestLoggerController requestLoggerController) { this.credentialsController = requireNonNull(credentialsController, "credentialsController is null"); + this.requestLoggerController = requireNonNull(requestLoggerController, "requestLoggerController is null"); + maxClockDrift = signingControllerConfig.getMaxClockDrift().toJavaTime(); } @@ -167,7 +172,17 @@ private Optional isValidAuthorization( signingHeaders, request.requestQueryParameters(), request.httpVerb()); - return request.requestAuthorization().authorization().equals(signingContext.signingAuthorization().authorization()) ? Stream.of(metadata.withSigningContext(signingContext)) : Stream.of(); + + String requestAuthorization = request.requestAuthorization().authorization(); + String generatedAuthorization = signingContext.signingAuthorization().authorization(); + boolean generatedMatchesRequest = requestAuthorization.equals(generatedAuthorization); + if (generatedMatchesRequest) { + return Stream.of(metadata.withSigningContext(signingContext)); + } + + requestLoggerController.currentRequestSession(request.requestId()) + .ifPresent(requestLoggingSession -> requestLoggingSession.logError("request.security.authorization.mismatch", ImmutableMap.of("request", requestAuthorization, "generated", generatedAuthorization))); + return Stream.of(); }).findFirst(); } diff --git a/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/rest/HangingResource.java b/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/rest/HangingResource.java index 5831102c..0045906a 100644 --- a/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/rest/HangingResource.java +++ b/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/rest/HangingResource.java @@ -62,7 +62,7 @@ public void callHangingRequest(@Context UriInfo uriInfo, @Suspended AsyncRespons { // simulate calling a remote request and streaming the result while the remote server hangs Request request = prepareGet().setUri(uriInfo.getBaseUri().resolve("hang")).build(); - httpClient.execute(request, new StreamingResponseHandler(asyncResponse)); + httpClient.execute(request, new StreamingResponseHandler(asyncResponse, () -> {})); } @GET diff --git a/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/signing/TestSigningController.java b/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/signing/TestSigningController.java index 89ded9a9..7b5b8352 100644 --- a/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/signing/TestSigningController.java +++ b/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/signing/TestSigningController.java @@ -15,6 +15,7 @@ import io.airlift.units.Duration; import io.trino.s3.proxy.server.credentials.CredentialsController; +import io.trino.s3.proxy.server.rest.RequestLoggerController; import io.trino.s3.proxy.server.testing.TestingRemoteS3Facade; import io.trino.s3.proxy.spi.collections.ImmutableMultiMap; import io.trino.s3.proxy.spi.credentials.Credential; @@ -39,7 +40,7 @@ public class TestSigningController private final CredentialsProvider credentialsProvider = (emulatedAccessKey, session) -> Optional.of(CREDENTIALS); private final CredentialsController credentialsController = new CredentialsController(new TestingRemoteS3Facade(), credentialsProvider); - private final SigningController signingController = new InternalSigningController(credentialsController, new SigningControllerConfig().setMaxClockDrift(new Duration(99999, TimeUnit.DAYS))); + private final SigningController signingController = new InternalSigningController(credentialsController, new SigningControllerConfig().setMaxClockDrift(new Duration(99999, TimeUnit.DAYS)), new RequestLoggerController()); @Test public void testRootLs() @@ -70,7 +71,7 @@ public void testRootLs() @Test public void testRootExpiredClock() { - SigningController signingController = new InternalSigningController(credentialsController, new SigningControllerConfig().setMaxClockDrift(new Duration(1, TimeUnit.MINUTES))); + SigningController signingController = new InternalSigningController(credentialsController, new SigningControllerConfig().setMaxClockDrift(new Duration(1, TimeUnit.MINUTES)), new RequestLoggerController()); // values discovered from an AWS CLI request sent to a dummy local HTTP server ImmutableMultiMap.Builder requestHeadersBuilder = ImmutableMultiMap.builder(false); diff --git a/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/testing/TestingTrinoS3ProxyServer.java b/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/testing/TestingTrinoS3ProxyServer.java index b57da3a2..ccf10b33 100644 --- a/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/testing/TestingTrinoS3ProxyServer.java +++ b/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/testing/TestingTrinoS3ProxyServer.java @@ -27,6 +27,8 @@ import io.airlift.http.server.testing.TestingHttpServerModule; import io.airlift.jaxrs.JaxrsModule; import io.airlift.json.JsonModule; +import io.airlift.log.Level; +import io.airlift.log.Logging; import io.airlift.node.testing.TestingNodeModule; import io.trino.s3.proxy.server.remote.RemoteS3Facade; import io.trino.s3.proxy.server.testing.TestingUtil.ForTesting; @@ -189,6 +191,8 @@ private static TestingTrinoS3ProxyServer start(Collection extraModules, Bootstrap app = new Bootstrap(modules.build()); Injector injector = app.setOptionalConfigurationProperties(properties).initialize(); + Logging.initialize().setLevel("io.trino.s3.proxy", Level.DEBUG); + return new TestingTrinoS3ProxyServer(injector); } } diff --git a/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/testing/containers/DockerAttachUtil.java b/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/testing/containers/DockerAttachUtil.java index 72184457..3aab9e2f 100644 --- a/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/testing/containers/DockerAttachUtil.java +++ b/trino-s3-proxy/src/test/java/io/trino/s3/proxy/server/testing/containers/DockerAttachUtil.java @@ -16,6 +16,7 @@ import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.async.ResultCallback; import com.github.dockerjava.api.model.Frame; +import io.airlift.log.Logger; import org.testcontainers.DockerClientFactory; import java.io.BufferedReader; @@ -34,6 +35,8 @@ public final class DockerAttachUtil { + private static final Logger log = Logger.get(DockerAttachUtil.class); + private DockerAttachUtil() {} public static InputStream inputToContainerStdin(String containerId, String command) @@ -54,6 +57,9 @@ public static void clearInputStreamAndClose(InputStream inputStream, Predicate Date: Wed, 26 Jun 2024 08:01:09 +0100 Subject: [PATCH 5/5] Improve exceptions/resumption in StreamingResponseHandler --- .../server/rest/StreamingResponseHandler.java | 36 ++++++++++++++++--- .../proxy/server/rest/TrinoS3ProxyClient.java | 7 ++-- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/StreamingResponseHandler.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/StreamingResponseHandler.java index ba969688..c207fd8f 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/StreamingResponseHandler.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/StreamingResponseHandler.java @@ -13,16 +13,21 @@ */ package io.trino.s3.proxy.server.rest; +import com.google.common.base.Throwables; import io.airlift.http.client.HeaderName; +import io.airlift.http.client.HttpStatus; import io.airlift.http.client.Request; import io.airlift.http.client.Response; import io.airlift.http.client.ResponseHandler; +import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.container.AsyncResponse; import jakarta.ws.rs.core.StreamingOutput; import java.io.InputStream; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; -import static io.airlift.http.client.ResponseHandlerUtils.propagate; +import static jakarta.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static java.util.Objects.requireNonNull; class StreamingResponseHandler @@ -30,6 +35,7 @@ class StreamingResponseHandler { private final AsyncResponse asyncResponse; private final RequestLoggingSession requestLoggingSession; + private final AtomicBoolean hasBeenResumed = new AtomicBoolean(false); StreamingResponseHandler(AsyncResponse asyncResponse, RequestLoggingSession requestLoggingSession) { @@ -44,7 +50,8 @@ public Void handleException(Request request, Exception exception) requestLoggingSession.logException(exception); requestLoggingSession.close(); - throw propagate(request, exception); + resume(exception); + return null; } @Override @@ -62,7 +69,10 @@ public Void handle(Request request, Response response) }; try (requestLoggingSession) { - jakarta.ws.rs.core.Response.ResponseBuilder responseBuilder = jakarta.ws.rs.core.Response.status(response.getStatusCode()).entity(streamingOutput); + jakarta.ws.rs.core.Response.ResponseBuilder responseBuilder = jakarta.ws.rs.core.Response.status(response.getStatusCode()); + if (HttpStatus.familyForStatusCode(response.getStatusCode()) == HttpStatus.Family.SUCCESSFUL) { + responseBuilder.entity(streamingOutput); + } response.getHeaders() .keySet() .stream() @@ -74,9 +84,27 @@ public Void handle(Request request, Response response) // this will block until StreamingOutput completes - asyncResponse.resume(responseBuilder.build()); + resume(responseBuilder.build()); } return null; } + + @SuppressWarnings("ThrowableNotThrown") + private void resume(Object result) + { + switch (result) { + case WebApplicationException exception -> resume(exception.getResponse()); + case Throwable exception when Throwables.getRootCause(exception) instanceof WebApplicationException webApplicationException -> resume(webApplicationException.getResponse()); + case Throwable exception -> resume(jakarta.ws.rs.core.Response.status(INTERNAL_SERVER_ERROR.getStatusCode(), Optional.ofNullable(exception.getMessage()).orElse("Unknown error")).build()); + default -> { + if (hasBeenResumed.compareAndSet(false, true)) { + asyncResponse.resume(result); + } + else { + throw new WebApplicationException("Could not resume with response: " + result, INTERNAL_SERVER_ERROR); + } + } + } + } } diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java index 4ba4ab94..4daff262 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/TrinoS3ProxyClient.java @@ -154,13 +154,12 @@ public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request reques Request remoteRequest = remoteRequestBuilder.build(); executorService.submit(() -> { + StreamingResponseHandler responseHandler = new StreamingResponseHandler(asyncResponse, requestLoggingSession); try { - httpClient.execute(remoteRequest, new StreamingResponseHandler(asyncResponse, requestLoggingSession)); + httpClient.execute(remoteRequest, responseHandler); } catch (Throwable e) { - requestLoggingSession.logException(e); - requestLoggingSession.close(); - asyncResponse.resume(e); + responseHandler.handleException(remoteRequest, new RuntimeException(e)); } }); }