Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

01 - Add request logging mechanism #82

Merged
merged 5 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import io.trino.s3.proxy.spi.signing.RequestAuthorization;

import java.util.Optional;
import java.util.UUID;

import static java.util.Objects.requireNonNull;

public record ParsedS3Request(
UUID requestId,
RequestAuthorization requestAuthorization,
String requestDate,
String bucketName,
Expand All @@ -35,6 +37,7 @@ public record ParsedS3Request(
{
public ParsedS3Request
{
requireNonNull(requestId, "requestId is null");
requireNonNull(requestAuthorization, "requestAuthorization is null");
requireNonNull(requestDate, "requestDate is null");
requireNonNull(bucketName, "bucketName is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import io.trino.s3.proxy.spi.signing.RequestAuthorization;

import java.net.URI;
import java.util.UUID;

import static java.util.Objects.requireNonNull;

public record Request(
UUID requestId,
RequestAuthorization requestAuthorization,
String requestDate,
URI requestUri,
Expand All @@ -32,6 +34,7 @@ public record Request(
{
public Request
{
requireNonNull(requestId, "requestId is null");
requireNonNull(requestAuthorization, "requestAuthorization is null");
requireNonNull(requestDate, "requestDate is null");
requireNonNull(requestUri, "requestUri is null");
Expand Down
6 changes: 6 additions & 0 deletions trino-s3-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@
<artifactId>utils</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import io.trino.s3.proxy.server.credentials.CredentialsController;
import io.trino.s3.proxy.server.remote.RemoteS3Facade;
import io.trino.s3.proxy.server.remote.VirtualHostStyleRemoteS3Facade;
import io.trino.s3.proxy.server.rest.RequestLoggerController;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient.ForProxyClient;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyConfig;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyResource;
import io.trino.s3.proxy.server.rest.TrinoS3Resource;
import io.trino.s3.proxy.server.rest.TrinoStsResource;
import io.trino.s3.proxy.server.security.SecurityController;
import io.trino.s3.proxy.server.signing.InternalSigningController;
Expand Down Expand Up @@ -60,14 +61,15 @@ protected void setup(Binder binder)
configBinder(binder).bindConfig(SigningControllerConfig.class);
TrinoS3ProxyConfig builtConfig = buildConfigObject(TrinoS3ProxyConfig.class);

jaxrsBinder(binder).bind(TrinoS3ProxyResource.class);
jaxrsBinder(binder).bindInstance(buildResourceAtPath(TrinoS3ProxyResource.class, builtConfig.getS3Path()));
jaxrsBinder(binder).bind(TrinoS3Resource.class);
jaxrsBinder(binder).bindInstance(buildResourceAtPath(TrinoS3Resource.class, builtConfig.getS3Path()));
jaxrsBinder(binder).bind(TrinoStsResource.class);
jaxrsBinder(binder).bindInstance(buildResourceAtPath(TrinoStsResource.class, builtConfig.getStsPath()));

binder.bind(SigningController.class).to(InternalSigningController.class).in(Scopes.SINGLETON);
binder.bind(CredentialsController.class).in(Scopes.SINGLETON);
binder.bind(SecurityController.class).in(Scopes.SINGLETON);
binder.bind(RequestLoggerController.class).in(Scopes.SINGLETON);

// TODO config, etc.
httpClientBinder(binder).bindHttpClient("ProxyClient", ForProxyClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.s3.proxy.server.credentials;

import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.trino.s3.proxy.server.remote.RemoteS3Facade;
import io.trino.s3.proxy.spi.credentials.Credential;
import io.trino.s3.proxy.spi.credentials.Credentials;
Expand Down Expand Up @@ -43,6 +44,8 @@

public class CredentialsController
{
private static final Logger log = Logger.get(CredentialsController.class);

private final RemoteS3Facade remoteS3Facade;
private final CredentialsProvider credentialsProvider;
private final Map<String, Session> remoteSessions = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -132,10 +135,15 @@ public void shutdown()
@SuppressWarnings("resource")
public <T> Optional<T> withCredentials(String emulatedAccessKey, Optional<String> emulatedSessionToken, Function<Credentials, Optional<T>> credentialsConsumer)
{
return credentialsProvider.credentials(emulatedAccessKey, emulatedSessionToken)
Optional<T> result = credentialsProvider.credentials(emulatedAccessKey, emulatedSessionToken)
.flatMap(credentials -> credentials.remoteSessionRole()
.flatMap(remoteSessionRole -> internalRemoteSession(remoteSessionRole, credentials).withUsage(credentials, credentialsConsumer))
.or(() -> credentialsConsumer.apply(credentials)));

result.ifPresentOrElse(_ -> log.debug("Credentials found. EmulatedAccessKey: %s", emulatedAccessKey),
() -> log.debug("Credentials not found. EmulatedAccessKey: %s", emulatedAccessKey));

return result;
}

private Session internalRemoteSession(RemoteSessionRole remoteSessionRole, Credentials credentials)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;

import static com.google.common.io.ByteStreams.toByteArray;
Expand All @@ -56,6 +57,7 @@ static Request fromRequest(ContainerRequest request)
Optional<String> securityTokenHeader = requestHeaders.getFirst("x-amz-security-token");
RequestContent requestContent = request.hasEntity() ? buildRequestContent(request.getEntityStream(), getRequestContentTypeFromHeader(requestHeaders)) : RequestContent.EMPTY;
return new Request(
UUID.randomUUID(),
mosiac1 marked this conversation as resolved.
Show resolved Hide resolved
RequestAuthorization.parse(requestHeaders.getFirst("authorization").orElse(""), securityTokenHeader),
xAmzDate,
request.getRequestUri(),
Expand Down Expand Up @@ -94,7 +96,18 @@ record BucketAndKey(String bucket, String rawKey) {}
});

String keyInBucket = URLDecoder.decode(bucketAndKey.rawKey, StandardCharsets.UTF_8);
return new ParsedS3Request(request.requestAuthorization(), request.requestDate(), bucketAndKey.bucket, keyInBucket, headers, queryParameters, httpVerb, bucketAndKey.rawKey, rawQuery, requestContent);
return new ParsedS3Request(
request.requestId(),
request.requestAuthorization(),
request.requestDate(),
bucketAndKey.bucket,
keyInBucket,
headers,
queryParameters,
httpVerb,
bucketAndKey.rawKey,
rawQuery,
requestContent);
}

private static RequestContent buildRequestContent(InputStream requestEntityStream, String requestContentType)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.s3.proxy.server.rest;

import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.trino.s3.proxy.spi.rest.Request;
import io.trino.s3.proxy.spi.signing.SigningServiceType;
import jakarta.annotation.PreDestroy;
import jakarta.ws.rs.WebApplicationException;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.s3.proxy.spi.rest.RequestContent.ContentType.EMPTY;

public class RequestLoggerController
{
private static final Logger log = Logger.get(RequestLoggerController.class);

private interface LoggerProc
{
void log(String format, Object... args);

boolean isEnabled();
}

private static final LoggerProc debugLogger = new LoggerProc()
{
@Override
public void log(String format, Object... args)
{
log.debug(format, args);
}

@Override
public boolean isEnabled()
{
return log.isDebugEnabled();
}
};

private static final LoggerProc infoLogger = new LoggerProc()
{
@Override
public void log(String format, Object... args)
{
log.info(format, args);
}

@Override
public boolean isEnabled()
{
return log.isInfoEnabled();
}
};

private static final RequestLoggingSession NOP_REQUEST_LOGGING_SESSION = () -> {};

private volatile LoggerProc loggerProc = debugLogger;
private final Map<UUID, RequestLoggingSession> sessions = new ConcurrentHashMap<>();

@PreDestroy
public void verifyState()
{
checkState(sessions.isEmpty(), "Some logging sessions were not closed: " + sessions);
}

// TODO - allow levels to be set for only certain users, IPs, etc.

public void setLevelInfo()
{
loggerProc = infoLogger;
}

public void setLevelDebug()
{
loggerProc = debugLogger;
}

public RequestLoggingSession newRequestSession(Request request, SigningServiceType serviceType)
{
return sessions.compute(request.requestId(), (requestId, current) -> {
checkState(current == null, "There is already a logging session for the request: " + requestId);
return internalNewRequestSession(request, serviceType);
});
}

public Optional<RequestLoggingSession> currentRequestSession(UUID requestId)
{
return Optional.ofNullable(sessions.get(requestId));
}

private RequestLoggingSession internalNewRequestSession(Request request, SigningServiceType serviceType)
{
if (!loggerProc.isEnabled()) {
return NOP_REQUEST_LOGGING_SESSION;
}

Map<String, String> entries = new ConcurrentHashMap<>();

Map<String, String> properties = new ConcurrentHashMap<>();

Map<String, String> errors = new ConcurrentHashMap<>();

Map<String, Object> requestDetails = ImmutableMap.of(
"request.id", request.requestId(),
"request.type", serviceType,
"request.uri", request.requestUri(),
"request.http.method", request.httpVerb(),
"request.http.entity", request.requestContent().contentType() != EMPTY);

addAll(entries, requestDetails);

logAndClear("RequestStart", entries);

return new RequestLoggingSession()
{
private final Stopwatch stopwatch = Stopwatch.createStarted();
private volatile boolean closed;

@Override
public void logProperty(String name, Object value)
{
properties.put(name, String.valueOf(value));
}

@Override
public void logError(String name, Object value)
{
errors.put(name, String.valueOf(value));
}

@SuppressWarnings({"ThrowableNotThrown", "SwitchStatementWithTooFewBranches"})
@Override
public void logException(Throwable e)
{
switch (Throwables.getRootCause(e)) {
case WebApplicationException webApplicationException -> {
errors.put("webException.status", Integer.toString(webApplicationException.getResponse().getStatus()));
errors.put("webException.message", webApplicationException.getMessage());
}

default -> {
errors.put("exception.type", e.getClass().getName());
errors.put("exception.message", e.getMessage());
}
}
}

@SuppressWarnings("resource")
@Override
public void close()
mosiac1 marked this conversation as resolved.
Show resolved Hide resolved
{
if (closed) {
return;
}
closed = true;

try {
addAll(entries, requestDetails);
add(entries, "request.elapsed.ms", stopwatch.elapsed().toMillis());
add(entries, "request.properties", properties);
add(entries, "request.errors", errors);

logAndClear("RequestEnd", entries);
}
finally {
sessions.remove(request.requestId());
}
}
};
}

private void addAll(Map<String, String> entries, Map<String, Object> values)
{
values.forEach((key, value) -> add(entries, key, value));
}

private void add(Map<String, String> entries, String key, Object value)
{
entries.put(key, String.valueOf(value));
}

private void logAndClear(String message, Map<String, String> entries)
{
// TODO - keep a list of recent entries, etc.

Map<String, String> copy = ImmutableMap.copyOf(entries);
entries.clear();

loggerProc.log("%s: %s", message, copy);
}
}
Loading
Loading