Skip to content

Commit

Permalink
Add request logging mechanism
Browse files Browse the repository at this point in the history
TODO - add complete telemetry system

Adds controller that logs requests and allows for eventual admin
API that can return recent request info

Relates to #25
  • Loading branch information
Randgalt committed Jun 27, 2024
1 parent c237f58 commit e661ccc
Show file tree
Hide file tree
Showing 12 changed files with 334 additions and 20 deletions.
6 changes: 6 additions & 0 deletions trino-s3-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@
<artifactId>utils</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.s3.proxy.server.credentials.CredentialsController;
import io.trino.s3.proxy.server.remote.RemoteS3Facade;
import io.trino.s3.proxy.server.remote.VirtualHostStyleRemoteS3Facade;
import io.trino.s3.proxy.server.rest.RequestLoggerController;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient.ForProxyClient;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyConfig;
Expand Down Expand Up @@ -68,6 +69,7 @@ protected void setup(Binder binder)
binder.bind(SigningController.class).to(InternalSigningController.class).in(Scopes.SINGLETON);
binder.bind(CredentialsController.class).in(Scopes.SINGLETON);
binder.bind(SecurityController.class).in(Scopes.SINGLETON);
binder.bind(RequestLoggerController.class).in(Scopes.SINGLETON);

// TODO config, etc.
httpClientBinder(binder).bindHttpClient("ProxyClient", ForProxyClient.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.s3.proxy.server.rest;

import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.trino.s3.proxy.spi.rest.Request;
import io.trino.s3.proxy.spi.signing.SigningServiceType;
import jakarta.annotation.PreDestroy;
import jakarta.ws.rs.WebApplicationException;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.s3.proxy.spi.rest.RequestContent.ContentType.EMPTY;

public class RequestLoggerController
{
private static final Logger log = Logger.get(RequestLoggerController.class);

private static final LoggerProc debugLogger = new LoggerProc()
{
@Override
public void log(String format, Object... args)
{
log.debug(format, args);
}

@Override
public boolean isEnabled()
{
return log.isDebugEnabled();
}
};

private static final LoggerProc infoLogger = new LoggerProc()
{
@Override
public void log(String format, Object... args)
{
log.info(format, args);
}

@Override
public boolean isEnabled()
{
return log.isInfoEnabled();
}
};

private volatile LoggerProc loggerProc = debugLogger;
private final Map<UUID, RequestLoggingSession> sessions = new ConcurrentHashMap<>();

private interface LoggerProc
{
void log(String format, Object... args);

boolean isEnabled();
}

@PreDestroy
public void verifyState()
{
checkState(sessions.isEmpty(), "Some logging sessions were not closed: " + sessions);
}

// TODO - allow levels to be set for only certain users, IPs, etc.

public void setLevelInfo()
{
loggerProc = infoLogger;
}

public void setLevelDebug()
{
loggerProc = debugLogger;
}

public RequestLoggingSession newRequestSession(Request request, SigningServiceType serviceType)
{
return sessions.compute(request.requestId(), (requestId, current) -> {
checkState(current == null, "There is already a logging session for the request: " + requestId);
return internalNewRequestSession(request, serviceType);
});
}

public Optional<RequestLoggingSession> currentRequestSession(UUID requestId, SigningServiceType serviceType)
{
return Optional.ofNullable(sessions.get(requestId));
}

private RequestLoggingSession internalNewRequestSession(Request request, SigningServiceType serviceType)
{
if (!loggerProc.isEnabled()) {
return () -> {};
}

// TODO introduce telemetry/spans
UUID requestId = UUID.randomUUID();

Map<String, String> entries = new ConcurrentHashMap<>();

Map<String, String> properties = new ConcurrentHashMap<>();

Map<String, String> errors = new ConcurrentHashMap<>();

Map<String, Object> requestDetails = ImmutableMap.of(
"request.id", requestId,
"request.type", serviceType,
"request.uri", request.requestUri(),
"request.http.method", request.httpVerb(),
"request.http.entity", request.requestContent().contentType() != EMPTY);

addAll(entries, requestDetails);

logAndClear("RequestStart", entries);

return new RequestLoggingSession()
{
private final Stopwatch stopwatch = Stopwatch.createStarted();
private volatile boolean closed;

@Override
public void logProperty(String name, Object value)
{
properties.put(name, String.valueOf(value));
}

@Override
public void logError(String name, Object value)
{
errors.put(name, String.valueOf(value));
}

@SuppressWarnings({"ThrowableNotThrown", "SwitchStatementWithTooFewBranches"})
@Override
public void logException(Throwable e)
{
switch (Throwables.getRootCause(e)) {
case WebApplicationException webApplicationException -> {
errors.put("webException.status", Integer.toString(webApplicationException.getResponse().getStatus()));
errors.put("webException.message", webApplicationException.getMessage());
}

default -> {
errors.put("exception.type", e.getClass().getName());
errors.put("exception.message", e.getMessage());
}
}
}

@SuppressWarnings("resource")
@Override
public void close()
{
if (closed) {
return;
}
closed = true;

try {
addAll(entries, requestDetails);
add(entries, "request.elapsed.ms", stopwatch.elapsed().toMillis());
add(entries, "request.properties", properties);
add(entries, "request.errors", errors);

logAndClear("RequestEnd", entries);
}
finally {
sessions.remove(request.requestId());
}
}
};
}

private void addAll(Map<String, String> entries, Map<String, Object> values)
{
values.forEach((key, value) -> add(entries, key, value));
}

private void add(Map<String, String> entries, String key, Object value)
{
entries.put(key, String.valueOf(value));
}

private void logAndClear(String message, Map<String, String> entries)
{
// TODO - keep a stck of recent entries, etc.

Map<String, String> copy = ImmutableMap.copyOf(entries);
entries.clear();

loggerProc.log("%s: %s", message, copy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.s3.proxy.server.rest;

import java.io.Closeable;

public interface RequestLoggingSession
extends Closeable
{
default void logProperty(String name, Object value)
{
// NOP
}

default void logError(String name, Object value)
{
// NOP
}

default void logException(Throwable e)
{
// NOP
}

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,21 @@ class StreamingResponseHandler
implements ResponseHandler<Void, RuntimeException>
{
private final AsyncResponse asyncResponse;
private final RequestLoggingSession requestLoggingSession;

StreamingResponseHandler(AsyncResponse asyncResponse)
StreamingResponseHandler(AsyncResponse asyncResponse, RequestLoggingSession requestLoggingSession)
{
this.asyncResponse = requireNonNull(asyncResponse, "asyncResponse is null");
this.requestLoggingSession = requireNonNull(requestLoggingSession, "requestLoggingSession is null");
}

@Override
public Void handleException(Request request, Exception exception)
throws RuntimeException
{
requestLoggingSession.logException(exception);
requestLoggingSession.close();

throw propagate(request, exception);
}

Expand All @@ -56,15 +61,21 @@ public Void handle(Request request, Response response)
output.flush();
};

jakarta.ws.rs.core.Response.ResponseBuilder responseBuilder = jakarta.ws.rs.core.Response.status(response.getStatusCode()).entity(streamingOutput);
response.getHeaders()
.keySet()
.stream()
.map(HeaderName::toString)
.forEach(name -> response.getHeaders(name).forEach(value -> responseBuilder.header(name, value)));
try (requestLoggingSession) {
jakarta.ws.rs.core.Response.ResponseBuilder responseBuilder = jakarta.ws.rs.core.Response.status(response.getStatusCode()).entity(streamingOutput);
response.getHeaders()
.keySet()
.stream()
.map(HeaderName::toString)
.forEach(name -> response.getHeaders(name).forEach(value -> responseBuilder.header(name, value)));

requestLoggingSession.logProperty("response.status", response.getStatusCode());
requestLoggingSession.logProperty("response.headers", response.getHeaders());

// this will block until StreamingOutput completes

// this will block until StreamingOutput completes
asyncResponse.resume(responseBuilder.build());
asyncResponse.resume(responseBuilder.build());
}

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@ public void shutDown()
}
}

public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request request, AsyncResponse asyncResponse)
public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request request, AsyncResponse asyncResponse, RequestLoggingSession requestLoggingSession)
{
URI remoteUri = remoteS3Facade.buildEndpoint(uriBuilder(request.queryParameters()), request.rawPath(), request.bucketName(), request.requestAuthorization().region());

SecurityResponse securityResponse = securityController.apply(request);
if (!securityResponse.canProceed()) {
log.debug("SecurityController check failed. AccessKey: %s, Request: %s, SecurityResponse: %s", signingMetadata.credentials().emulated().accessKey(), request, securityResponse);
requestLoggingSession.logError("request.security.fail.credentials", signingMetadata.credentials().emulated());
requestLoggingSession.logError("request.security.fail.request", request);
requestLoggingSession.logError("request.security.fail.response", securityResponse);

throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}

Expand Down Expand Up @@ -151,9 +155,11 @@ public void proxyRequest(SigningMetadata signingMetadata, ParsedS3Request reques

executorService.submit(() -> {
try {
httpClient.execute(remoteRequest, new StreamingResponseHandler(asyncResponse));
httpClient.execute(remoteRequest, new StreamingResponseHandler(asyncResponse, requestLoggingSession));
}
catch (Throwable e) {
requestLoggingSession.logException(e);
requestLoggingSession.close();
asyncResponse.resume(e);
}
});
Expand Down
Loading

0 comments on commit e661ccc

Please sign in to comment.