Skip to content

Commit

Permalink
Add request logging mechanism
Browse files Browse the repository at this point in the history
TODO - add complete telemetry system

Adds controller that logs requests and allows for eventual admin
API that can return recent request info

Relates to #25
  • Loading branch information
Randgalt committed Jun 25, 2024
1 parent cce9969 commit 059b727
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 18 deletions.
6 changes: 6 additions & 0 deletions trino-s3-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@
<artifactId>utils</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import io.trino.s3.proxy.server.credentials.CredentialsController;
import io.trino.s3.proxy.server.remote.RemoteS3Facade;
import io.trino.s3.proxy.server.remote.VirtualHostStyleRemoteS3Facade;
import io.trino.s3.proxy.server.rest.TrinoS3Resource;
import io.trino.s3.proxy.server.rest.RequestLogger;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient.ForProxyClient;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyConfig;
import io.trino.s3.proxy.server.rest.TrinoS3Resource;
import io.trino.s3.proxy.server.rest.TrinoStsResource;
import io.trino.s3.proxy.server.security.SecurityController;
import io.trino.s3.proxy.server.signing.InternalSigningController;
Expand Down Expand Up @@ -68,6 +69,7 @@ protected void setup(Binder binder)
binder.bind(SigningController.class).to(InternalSigningController.class).in(Scopes.SINGLETON);
binder.bind(CredentialsController.class).in(Scopes.SINGLETON);
binder.bind(SecurityController.class).in(Scopes.SINGLETON);
binder.bind(RequestLogger.class).in(Scopes.SINGLETON);

// TODO config, etc.
httpClientBinder(binder).bindHttpClient("ProxyClient", ForProxyClient.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.s3.proxy.server.rest;

import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.trino.s3.proxy.spi.signing.SigningServiceType;
import jakarta.ws.rs.WebApplicationException;
import org.glassfish.jersey.server.ContainerRequest;

import java.net.URI;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class RequestLogger
{
private static final Logger log = Logger.get(RequestLogger.class);

private static final LoggerProc debugLogger = new LoggerProc()
{
@Override
public void log(String format, Object... args)
{
log.debug(format, args);
}

@Override
public boolean isEnabled()
{
return log.isDebugEnabled();
}
};

private static final LoggerProc infoLogger = new LoggerProc()
{
@Override
public void log(String format, Object... args)
{
log.info(format, args);
}

@Override
public boolean isEnabled()
{
return log.isInfoEnabled();
}
};

private volatile LoggerProc loggerProc = debugLogger;

private interface LoggerProc
{
void log(String format, Object... args);

boolean isEnabled();
}

// TODO - allow levels to be set for only certain users, IPs, etc.

public void setLevelInfo()
{
loggerProc = infoLogger;
}

public void setLevelDebug()
{
loggerProc = debugLogger;
}

public RequestLoggingSession requestSession(SigningServiceType serviceType, ContainerRequest request)
{
if (!loggerProc.isEnabled()) {
return () -> {};
}

// TODO introduce telemetry/spans
UUID requestId = UUID.randomUUID();

Map<String, String> entries = new ConcurrentHashMap<>();

Map<String, String> properties = new ConcurrentHashMap<>();

Map<String, String> errors = new ConcurrentHashMap<>();

String requestMethod = request.getMethod();
URI requestUri = request.getRequestUri();
boolean requestHasEntity = request.hasEntity();

add(entries, "request.id", requestId);
add(entries, "request.type", serviceType);
add(entries, "request.http.method", requestMethod);
add(entries, "request.http.uri", requestUri);
add(entries, "request.http.entity", requestHasEntity);
push("RequestStart", entries);

return new RequestLoggingSession()
{
private final Stopwatch stopwatch = Stopwatch.createStarted();
private volatile boolean closed;

@Override
public void logProperty(String name, Object value)
{
properties.put(name, String.valueOf(value));
}

@Override
public void logError(String name, Object value)
{
errors.put(name, String.valueOf(value));
}

@SuppressWarnings({"ThrowableNotThrown", "SwitchStatementWithTooFewBranches"})
@Override
public void logException(Throwable e)
{
switch (Throwables.getRootCause(e)) {
case WebApplicationException webApplicationException -> {
errors.put("webException.status", Integer.toString(webApplicationException.getResponse().getStatus()));
errors.put("webException.message", webApplicationException.getMessage());
}

default -> {
errors.put("exception.type", e.getClass().getName());
errors.put("exception.message", e.getMessage());
}
}
}

@Override
public void close()
{
if (closed) {
return;
}
closed = true;

add(entries, "request.id", requestId);
add(entries, "request.type", serviceType);
add(entries, "request.http.method", requestMethod);
add(entries, "request.http.uri", requestUri);
add(entries, "request.http.entity", requestHasEntity);
add(entries, "request.elapsed.ms", stopwatch.elapsed().toMillis());
add(entries, "request.properties", properties);
add(entries, "request.errors", errors);
push("RequestEnd", entries);
}
};
}

private void add(Map<String, String> entries, String key, Object value)
{
entries.put(key, String.valueOf(value));
}

private void push(String message, Map<String, String> entries)
{
// TODO - keep a stck of recent entries, etc.

Map<String, String> copy = ImmutableMap.copyOf(entries);
entries.clear();

loggerProc.log("%s: %s", message, copy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.s3.proxy.server.rest;

import java.io.Closeable;

public interface RequestLoggingSession
extends Closeable
{
default void logProperty(String name, Object value)
{
// NOP
}

default void logError(String name, Object value)
{
// NOP
}

default void logException(Throwable e)
{
// NOP
}

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,21 @@ class StreamingResponseHandler
implements ResponseHandler<Void, RuntimeException>
{
private final AsyncResponse asyncResponse;
private final RequestLoggingSession requestLoggingSession;

StreamingResponseHandler(AsyncResponse asyncResponse)
StreamingResponseHandler(AsyncResponse asyncResponse, RequestLoggingSession requestLoggingSession)
{
this.asyncResponse = requireNonNull(asyncResponse, "asyncResponse is null");
this.requestLoggingSession = requireNonNull(requestLoggingSession, "requestLoggingSession is null");
}

@Override
public Void handleException(Request request, Exception exception)
throws RuntimeException
{
requestLoggingSession.logException(exception);
requestLoggingSession.close();

throw propagate(request, exception);
}

Expand All @@ -56,15 +61,24 @@ public Void handle(Request request, Response response)
output.flush();
};

jakarta.ws.rs.core.Response.ResponseBuilder responseBuilder = jakarta.ws.rs.core.Response.status(response.getStatusCode()).entity(streamingOutput);
response.getHeaders()
.keySet()
.stream()
.map(HeaderName::toString)
.forEach(name -> response.getHeaders(name).forEach(value -> responseBuilder.header(name, value)));
try {
jakarta.ws.rs.core.Response.ResponseBuilder responseBuilder = jakarta.ws.rs.core.Response.status(response.getStatusCode()).entity(streamingOutput);
response.getHeaders()
.keySet()
.stream()
.map(HeaderName::toString)
.forEach(name -> response.getHeaders(name).forEach(value -> responseBuilder.header(name, value)));

requestLoggingSession.logProperty("response.status", response.getStatusCode());
requestLoggingSession.logProperty("response.headers", response.getHeaders());

// this will block until StreamingOutput completes

// this will block until StreamingOutput completes
asyncResponse.resume(responseBuilder.build());
asyncResponse.resume(responseBuilder.build());
}
finally {
requestLoggingSession.close();
}

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@ public void shutDown()
}
}

public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request request, AsyncResponse asyncResponse)
public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request request, AsyncResponse asyncResponse, RequestLoggingSession requestLoggingSession)
{
URI remoteUri = remoteS3Facade.buildEndpoint(uriBuilder(request.queryParameters()), request.rawPath(), request.bucketName(), request.requestAuthorization().region());

SecurityResponse securityResponse = securityController.apply(request);
if (!securityResponse.canProceed()) {
log.debug("SecurityController check failed. AccessKey: %s, Request: %s, SecurityResponse: %s", signingMetadata.credentials().emulated().accessKey(), request, securityResponse);
requestLoggingSession.logError("security.fail.credentials", signingMetadata.credentials().emulated());
requestLoggingSession.logError("security.fail.request", request);
requestLoggingSession.logError("security.fail.response", securityResponse);

throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}

Expand Down Expand Up @@ -151,9 +155,11 @@ public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request reques

executorService.submit(() -> {
try {
httpClient.execute(remoteRequest, new StreamingResponseHandler(asyncResponse));
httpClient.execute(remoteRequest, new StreamingResponseHandler(asyncResponse, requestLoggingSession));
}
catch (Throwable e) {
requestLoggingSession.logException(e);
requestLoggingSession.close();
asyncResponse.resume(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ public class TrinoS3Resource
private final TrinoS3ProxyClient proxyClient;
private final Optional<String> serverHostName;
private final String s3Path;
private final RequestLogger requestLogger;

@Inject
public TrinoS3Resource(SigningController signingController, TrinoS3ProxyClient proxyClient, TrinoS3ProxyConfig trinoS3ProxyConfig)
public TrinoS3Resource(SigningController signingController, TrinoS3ProxyClient proxyClient, TrinoS3ProxyConfig trinoS3ProxyConfig, RequestLogger requestLogger)
{
this.signingController = requireNonNull(signingController, "signingController is null");
this.proxyClient = requireNonNull(proxyClient, "proxyClient is null");
this.serverHostName = trinoS3ProxyConfig.getS3HostName();
this.requestLogger = requireNonNull(requestLogger, "requestLogger is null");

s3Path = trinoS3ProxyConfig.getS3Path();
}

Expand Down Expand Up @@ -120,11 +123,24 @@ public void s3DeleteWithPath(@Context ContainerRequest containerRequest, @Suspen

private void handler(ContainerRequest containerRequest, AsyncResponse asyncResponse)
{
Request request = fromRequest(containerRequest);
ParsedS3Request parsedS3Request = parseRequest(request);
SigningMetadata signingMetadata = signingController.validateAndParseAuthorization(request, SigningServiceType.S3);
RequestLoggingSession requestLoggingSession = requestLogger.requestSession(SigningServiceType.S3, containerRequest);
try {
Request request = fromRequest(containerRequest);
ParsedS3Request parsedS3Request = parseRequest(request);

requestLoggingSession.logProperty("parsed.bucket", parsedS3Request.bucketName());
requestLoggingSession.logProperty("parsed.key", parsedS3Request.keyInBucket());

SigningMetadata signingMetadata = signingController.validateAndParseAuthorization(request, SigningServiceType.S3);
requestLoggingSession.logProperty("emulated.key", signingMetadata.credentials().emulated().secretKey());

proxyClient.proxyRequest(signingMetadata, parsedS3Request, asyncResponse);
proxyClient.proxyRequest(signingMetadata, parsedS3Request, asyncResponse, requestLoggingSession);
}
catch (Throwable e) {
requestLoggingSession.logException(e);
requestLoggingSession.close();
throw e;
}
}

private ParsedS3Request parseRequest(Request request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void callHangingRequest(@Context UriInfo uriInfo, @Suspended AsyncRespons
{
// simulate calling a remote request and streaming the result while the remote server hangs
Request request = prepareGet().setUri(uriInfo.getBaseUri().resolve("hang")).build();
httpClient.execute(request, new StreamingResponseHandler(asyncResponse));
httpClient.execute(request, new StreamingResponseHandler(asyncResponse, () -> {}));
}

@GET
Expand Down
Loading

0 comments on commit 059b727

Please sign in to comment.