From 98a6450509a9eaa58ae299d32122b06fa0750664 Mon Sep 17 00:00:00 2001 From: David Phillips Date: Tue, 1 Jun 2021 14:41:00 -0700 Subject: [PATCH 1/2] Use FluentFuture in QueuedStatementResource This makes the code easier to read, but does not change behavior. # Conflicts: # presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java --- .../dispatcher/QueuedStatementResource.java | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java b/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java index 7f79f9db100b..9ae8fb09b5a0 100644 --- a/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java +++ b/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java @@ -16,7 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Ordering; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.log.Logger; import io.airlift.units.Duration; @@ -64,12 +64,12 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; -import static io.airlift.concurrent.MoreFutures.addTimeout; import static io.airlift.concurrent.Threads.threadsNamed; import static io.airlift.jaxrs.AsyncResponseHandler.bindAsyncResponse; import static io.prestosql.execution.QueryState.FAILED; @@ -185,7 +185,7 @@ public Response postStatement( // let authentication filter know that identity lifecycle has been handed off servletRequest.setAttribute(AUTHENTICATED_IDENTITY, null); - return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo), compressionEnabled); + return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo)); } @ResourceSecurity(PUBLIC) @@ -202,25 +202,21 @@ public void getStatus( { Query query = getQuery(queryId, slug, token); - // wait for query to be dispatched, up to the wait timeout - ListenableFuture futureStateChange = addTimeout( - query.waitForDispatched(), - () -> null, - WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait), - timeoutExecutor); - - // when state changes, fetch the next result - ListenableFuture queryResultsFuture = Futures.transform( - futureStateChange, - ignored -> query.getQueryResults(token, uriInfo), - responseExecutor); - - // transform to Response - ListenableFuture response = Futures.transform( - queryResultsFuture, - queryResults -> createQueryResultsResponse(queryResults, compressionEnabled), - directExecutor()); - bindAsyncResponse(asyncResponse, response, responseExecutor); + ListenableFuture future = getStatus(query, token, maxWait, uriInfo); + bindAsyncResponse(asyncResponse, future, responseExecutor); + } + + private ListenableFuture getStatus(Query query, long token, Duration maxWait, UriInfo uriInfo) + { + long waitMillis = WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait).toMillis(); + + return FluentFuture.from(query.waitForDispatched()) + // wait for query to be dispatched, up to the wait timeout + .withTimeout(waitMillis, MILLISECONDS, timeoutExecutor) + .catching(TimeoutException.class, ignored -> null, directExecutor()) + // when state changes, fetch the next result + .transform(ignored -> query.getQueryResults(token, uriInfo), responseExecutor) + .transform(this::createQueryResultsResponse, directExecutor()); } @ResourceSecurity(PUBLIC) @@ -246,7 +242,7 @@ private Query getQuery(QueryId queryId, String slug, long token) return query; } - private static Response createQueryResultsResponse(QueryResults results, boolean compressionEnabled) + private Response createQueryResultsResponse(QueryResults results) { Response.ResponseBuilder builder = Response.ok(results); if (!compressionEnabled) { From 1c5238afbed291ebc83c49ef02c6203c786d83eb Mon Sep 17 00:00:00 2001 From: Eric Hwang Date: Mon, 15 Nov 2021 17:51:25 -0800 Subject: [PATCH 2/2] Add QueuedStatementResource timeout for query submission (mem leak fix) QueuedStatementResource attempts to maintain an in-memory query cache by only expiring queries that are no longer tracked by DispatchManager. It is possible for a query to be created in QueuedStatementResource, but never submitted to DispatchManager if the client never comes back and calls getStatus() (which is the trigger for the query to be submitted to DispatchManager). To handle this, we add a timeout to purge queries from this cache if it has not been submitted by the query abandoned client timeout threshold. # Conflicts: # presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java --- .../dispatcher/QueuedStatementResource.java | 207 ++++++++++++------ 1 file changed, 142 insertions(+), 65 deletions(-) diff --git a/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java b/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java index 9ae8fb09b5a0..e5db058aa5cb 100644 --- a/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java +++ b/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java @@ -14,16 +14,17 @@ package io.prestosql.dispatcher; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Ordering; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.airlift.log.Logger; import io.airlift.units.Duration; import io.prestosql.client.QueryError; import io.prestosql.client.QueryResults; import io.prestosql.client.StatementStats; import io.prestosql.execution.ExecutionFailureInfo; +import io.prestosql.execution.QueryManagerConfig; import io.prestosql.execution.QueryState; import io.prestosql.server.HttpRequestSessionContext; import io.prestosql.server.ServerConfig; @@ -35,8 +36,10 @@ import io.prestosql.spi.security.GroupProvider; import io.prestosql.spi.security.Identity; +import javax.annotation.Nullable; +import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.DELETE; @@ -58,7 +61,6 @@ import javax.ws.rs.core.UriInfo; import java.net.URI; -import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -66,11 +68,14 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import static com.clearspring.analytics.util.Preconditions.checkState; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; -import static io.airlift.concurrent.Threads.threadsNamed; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.jaxrs.AsyncResponseHandler.bindAsyncResponse; import static io.prestosql.execution.QueryState.FAILED; import static io.prestosql.execution.QueryState.QUEUED; @@ -103,16 +108,16 @@ public class QueuedStatementResource private final Executor responseExecutor; private final ScheduledExecutorService timeoutExecutor; - private final ConcurrentMap queries = new ConcurrentHashMap<>(); - private final ScheduledExecutorService queryPurger = newSingleThreadScheduledExecutor(threadsNamed("dispatch-query-purger")); private final boolean compressionEnabled; + private final QueryManager queryManager; @Inject public QueuedStatementResource( GroupProvider groupProvider, DispatchManager dispatchManager, DispatchExecutor executor, - ServerConfig serverConfig) + ServerConfig serverConfig, + QueryManagerConfig queryManagerConfig) { this.groupProvider = requireNonNull(groupProvider, "groupProvider is null"); this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null"); @@ -122,43 +127,20 @@ public QueuedStatementResource( this.timeoutExecutor = requireNonNull(executor, "timeoutExecutor is null").getScheduledExecutor(); this.compressionEnabled = requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled(); - queryPurger.scheduleWithFixedDelay( - () -> { - try { - // snapshot the queries before checking states to avoid registration race - for (Entry entry : ImmutableSet.copyOf(queries.entrySet())) { - if (!entry.getValue().isSubmissionFinished()) { - continue; - } - - // forget about this query if the query manager is no longer tracking it - if (!dispatchManager.isQueryRegistered(entry.getKey())) { - Query query = queries.remove(entry.getKey()); - if (query != null) { - try { - query.destroy(); - } - catch (Throwable e) { - // this catch clause is broad so query purger does not get stuck - log.warn(e, "Error destroying identity"); - } - } - } - } - } - catch (Throwable e) { - log.warn(e, "Error removing old queries"); - } - }, - 200, - 200, - MILLISECONDS); + requireNonNull(queryManagerConfig, "queryManagerConfig is null"); + queryManager = new QueryManager(queryManagerConfig.getClientTimeout()); + } + + @PostConstruct + public void start() + { + queryManager.initialize(dispatchManager); } @PreDestroy public void stop() { - queryPurger.shutdownNow(); + queryManager.destroy(); } @ResourceSecurity(AUTHENTICATED_USER) @@ -174,18 +156,25 @@ public Response postStatement( throw badRequest(BAD_REQUEST, "SQL statement is empty"); } + Query query = registerQuery(statement, servletRequest, httpHeaders); + + return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo)); + } + + private Query registerQuery(String statement, HttpServletRequest servletRequest, HttpHeaders httpHeaders) + { String remoteAddress = servletRequest.getRemoteAddr(); Optional identity = Optional.ofNullable((Identity) servletRequest.getAttribute(AUTHENTICATED_IDENTITY)); MultivaluedMap headers = httpHeaders.getRequestHeaders(); SessionContext sessionContext = new HttpRequestSessionContext(headers, remoteAddress, identity, groupProvider); Query query = new Query(statement, sessionContext, dispatchManager); - queries.put(query.getQueryId(), query); + queryManager.registerQuery(query); // let authentication filter know that identity lifecycle has been handed off servletRequest.setAttribute(AUTHENTICATED_IDENTITY, null); - return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo)); + return query; } @ResourceSecurity(PUBLIC) @@ -235,7 +224,7 @@ public Response cancelQuery( private Query getQuery(QueryId queryId, String slug, long token) { - Query query = queries.get(queryId); + Query query = queryManager.getQuery(queryId); if (query == null || !query.getSlug().isValid(QUEUED_QUERY, slug, token)) { throw badRequest(NOT_FOUND, "Query not found"); } @@ -316,8 +305,9 @@ private static final class Query private final Slug slug = Slug.createNew(); private final AtomicLong lastToken = new AtomicLong(); - @GuardedBy("this") - private ListenableFuture querySubmissionFuture; + private final long initTime = System.nanoTime(); + private final AtomicReference submissionGate = new AtomicReference<>(); + private final SettableFuture creationFuture = SettableFuture.create(); public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager) { @@ -342,27 +332,38 @@ public long getLastToken() return lastToken.get(); } - public synchronized boolean isSubmissionFinished() + public boolean tryAbandonSubmissionWithTimeout(Duration querySubmissionTimeout) + { + return Duration.nanosSince(initTime).compareTo(querySubmissionTimeout) >= 0 && submissionGate.compareAndSet(null, false); + } + + public boolean isSubmissionAbandoned() + { + return Boolean.FALSE.equals(submissionGate.get()); + } + + public boolean isCreated() { - return querySubmissionFuture != null && querySubmissionFuture.isDone(); + return creationFuture.isDone(); } private ListenableFuture waitForDispatched() { - // if query query submission has not finished, wait for it to finish - synchronized (this) { - if (querySubmissionFuture == null) { - querySubmissionFuture = dispatchManager.createQuery(queryId, slug, sessionContext, query); - } - if (!querySubmissionFuture.isDone()) { - return querySubmissionFuture; - } + submitIfNeeded(); + if (!creationFuture.isDone()) { + return nonCancellationPropagating(creationFuture); } - // otherwise, wait for the query to finish return dispatchManager.waitForDispatched(queryId); } + private void submitIfNeeded() + { + if (submissionGate.compareAndSet(null, true)) { + creationFuture.setFuture(dispatchManager.createQuery(queryId, slug, sessionContext, query)); + } + } + public QueryResults getQueryResults(long token, UriInfo uriInfo) { long lastToken = this.lastToken.get(); @@ -373,14 +374,12 @@ public QueryResults getQueryResults(long token, UriInfo uriInfo) // advance (or stay at) the token this.lastToken.compareAndSet(lastToken, token); - synchronized (this) { - // if query submission has not finished, return simple empty result - if (querySubmissionFuture == null || !querySubmissionFuture.isDone()) { - return createQueryResults( - token + 1, - uriInfo, - DispatchInfo.queued(NO_DURATION, NO_DURATION)); - } + // if query submission has not finished, return simple empty result + if (!creationFuture.isDone()) { + return createQueryResults( + token + 1, + uriInfo, + DispatchInfo.queued(NO_DURATION, NO_DURATION)); } Optional dispatchInfo = dispatchManager.getDispatchInfo(queryId); @@ -394,9 +393,9 @@ public QueryResults getQueryResults(long token, UriInfo uriInfo) return createQueryResults(token + 1, uriInfo, dispatchInfo.get()); } - public synchronized void cancel() + public void cancel() { - querySubmissionFuture.addListener(() -> dispatchManager.cancelQuery(queryId), directExecutor()); + creationFuture.addListener(() -> dispatchManager.cancelQuery(queryId), directExecutor()); } public void destroy() @@ -464,4 +463,82 @@ private QueryError toQueryError(ExecutionFailureInfo executionFailureInfo) executionFailureInfo.toFailureInfo()); } } + + @ThreadSafe + private static class QueryManager + { + private final ConcurrentMap queries = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduledExecutorService = newSingleThreadScheduledExecutor(daemonThreadsNamed("drain-state-query-manager")); + + private final Duration querySubmissionTimeout; + + public QueryManager(Duration querySubmissionTimeout) + { + this.querySubmissionTimeout = requireNonNull(querySubmissionTimeout, "querySubmissionTimeout is null"); + } + + public void initialize(DispatchManager dispatchManager) + { + scheduledExecutorService.scheduleWithFixedDelay(() -> syncWith(dispatchManager), 200, 200, MILLISECONDS); + } + + public void destroy() + { + scheduledExecutorService.shutdownNow(); + } + + private void syncWith(DispatchManager dispatchManager) + { + queries.forEach((queryId, query) -> { + if (shouldBePurged(dispatchManager, query)) { + removeQuery(queryId); + } + }); + } + + private boolean shouldBePurged(DispatchManager dispatchManager, Query query) + { + if (query.isSubmissionAbandoned()) { + // Query submission was explicitly abandoned + return true; + } + if (query.tryAbandonSubmissionWithTimeout(querySubmissionTimeout)) { + // Query took too long to be submitted by the client + return true; + } + if (query.isCreated() && !dispatchManager.isQueryRegistered(query.getQueryId())) { + // Query was created in the DispatchManager, and DispatchManager has already purged the query + return true; + } + return false; + } + + private void removeQuery(QueryId queryId) + { + Optional.ofNullable(queries.remove(queryId)) + .ifPresent(QueryManager::destroyQuietly); + } + + private static void destroyQuietly(Query query) + { + try { + query.destroy(); + } + catch (Throwable t) { + log.error(t, "Error destroying query"); + } + } + + public void registerQuery(Query query) + { + Query existingQuery = queries.putIfAbsent(query.getQueryId(), query); + checkState(existingQuery == null, "Query already registered"); + } + + @Nullable + public Query getQuery(QueryId queryId) + { + return queries.get(queryId); + } + } }