Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[For debugging purpose] add runtime stats for task create/update on the worker side #24161

Closed
wants to merge 7 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ private RuntimeMetricName()
// Blocked time for the operators due to waiting for inputs.
public static final String TASK_BLOCKED_TIME_NANOS = "taskBlockedTimeNanos";
public static final String TASK_UPDATE_DELIVERED_WALL_TIME_NANOS = "taskUpdateDeliveredWallTimeNanos";
public static final String TASK_UPDATE_RECEIVED_WALL_TIME_NANOS = "taskUpdateReceivedWallTimeNanos";
public static final String TASK_UPDATE_RECEIVED_CPU_TIME_NANOS = "taskUpdateReceivedCpuTimeNanos";
public static final String TASK_EXECUTION_CREATION_WALL_TIME_NANOS = "taskExecutionCreationWallTimeNanos";
public static final String TASK_EXECUTION_CREATION_CPU_TIME_NANOS = "taskExecutionCreationCpuTimeNanos";

public static final String TASK_UPDATE_SERIALIZED_CPU_TIME_NANOS = "taskUpdateSerializedCpuNanos";
public static final String TASK_PLAN_SERIALIZED_CPU_TIME_NANOS = "taskPlanSerializedCpuNanos";
// Time taken for a read call to storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.Session;
import com.facebook.presto.common.RuntimeUnit;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.buffer.BufferResult;
import com.facebook.presto.execution.buffer.LazyOutputBuffer;
Expand All @@ -34,6 +35,7 @@
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.TaskExchangeClientManager;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.server.TaskResourceUtils;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorMetadataUpdateHandle;
import com.facebook.presto.spi.connector.ConnectorMetadataUpdater;
Expand All @@ -49,6 +51,7 @@

import javax.annotation.Nullable;

import java.lang.management.ManagementFactory;
import java.net.URI;
import java.util.List;
import java.util.Optional;
Expand All @@ -66,6 +69,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;

public class SqlTask
Expand Down Expand Up @@ -179,7 +183,7 @@ public void stateChanged(TaskState newState)
return;
}

if (taskHolderReference.compareAndSet(taskHolder, new TaskHolder(createTaskInfo(taskHolder), taskHolder.getIoStats()))) {
if (taskHolderReference.compareAndSet(taskHolder, new TaskHolder(createTaskInfo(taskHolder, null), taskHolder.getIoStats()))) {
break;
}
}
Expand Down Expand Up @@ -239,10 +243,26 @@ public DateTime getTaskCreatedTime()
return taskStateMachine.getCreatedTime();
}

public TaskInfo getTaskInfo(Session session)
{
long startWallTimeGetTaskInfo = System.nanoTime();
long startCpuTimeGetTaskInfo = ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();

TaskInfo taskinfo;
try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
taskinfo = createTaskInfo(taskHolderReference.get(), session);
}

session.getRuntimeStats().addMetricValue("getTaskInfoOnWorkerCPUTimeNano", RuntimeUnit.NANO, max(0, ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime() - startCpuTimeGetTaskInfo));
session.getRuntimeStats().addMetricValue("getTaskInfoOnWorkerWallTimeNano", RuntimeUnit.NANO, max(0, System.nanoTime() - startWallTimeGetTaskInfo));

return taskinfo;
}

public TaskInfo getTaskInfo()
{
try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
return createTaskInfo(taskHolderReference.get());
return createTaskInfo(taskHolderReference.get(), null);
}
}

Expand Down Expand Up @@ -381,14 +401,17 @@ private static Set<PlanNodeId> getNoMoreSplits(TaskHolder taskHolder)
return ImmutableSet.of();
}

private TaskInfo createTaskInfo(TaskHolder taskHolder)
private TaskInfo createTaskInfo(TaskHolder taskHolder, Session session)
{
long startWallTimeCreateTaskInfo = System.nanoTime();
long startCpuTimeCreateTaskInfo = ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();

TaskStats taskStats = getTaskStats(taskHolder);
Set<PlanNodeId> noMoreSplits = getNoMoreSplits(taskHolder);
MetadataUpdates metadataRequests = getMetadataUpdateRequests(taskHolder);

TaskStatus taskStatus = createTaskStatus(taskHolder);
return new TaskInfo(
TaskInfo taskInfo = new TaskInfo(
taskStateMachine.getTaskId(),
taskStatus,
lastHeartbeat.get(),
Expand All @@ -398,6 +421,11 @@ private TaskInfo createTaskInfo(TaskHolder taskHolder)
needsPlan.get(),
metadataRequests,
nodeId);
if (session != null) {
session.getRuntimeStats().addMetricValue("createTaskInfoOnWorkerCPUTimeNano", RuntimeUnit.NANO, max(0, ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime() - startCpuTimeCreateTaskInfo));
session.getRuntimeStats().addMetricValue("createTaskInfoOnWorkerWallTimeNano", RuntimeUnit.NANO, max(0, System.nanoTime() - startWallTimeCreateTaskInfo));
}
return taskInfo;
}

public ListenableFuture<TaskStatus> getTaskStatus(TaskState callersCurrentState)
Expand Down Expand Up @@ -439,8 +467,11 @@ public TaskInfo updateTask(
// The LazyOutput buffer does not support write methods, so the actual
// output buffer must be established before drivers are created (e.g.
// a VALUES query).

outputBuffer.setOutputBuffers(outputBuffers);

long startWallTimeTaskExecutionCreation = System.nanoTime();
long startCpuTimeTaskExecutionCreation = ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();
// assure the task execution is only created once
SqlTaskExecution taskExecution;
synchronized (this) {
Expand All @@ -466,10 +497,16 @@ public TaskInfo updateTask(
needsPlan.set(false);
}
}
TaskResourceUtils.recordTaskExecutionCreationTimeNanos(session.getRuntimeStats(), ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime() - startCpuTimeTaskExecutionCreation, System.nanoTime() - startWallTimeTaskExecutionCreation);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anchor


long startWallTimeAddSources = System.nanoTime();
long startCpuTimeAddSources = ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();

if (taskExecution != null) {
taskExecution.addSources(sources);
}
session.getRuntimeStats().addMetricValue("addSourcesOnWorkerCPUTimeNano", RuntimeUnit.NANO, max(0, ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime() - startCpuTimeAddSources));
session.getRuntimeStats().addMetricValue("addSourcesOnWorkerWallTimeNano", RuntimeUnit.NANO, max(0, System.nanoTime() - startWallTimeAddSources));
}
catch (Error e) {
failed(e);
Expand All @@ -479,7 +516,7 @@ public TaskInfo updateTask(
failed(e);
}

return getTaskInfo();
return getTaskInfo(session);
}

public TaskMetadataContext getTaskMetadataContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.facebook.airlift.stats.CounterStat;
import com.facebook.airlift.stats.GcMonitor;
import com.facebook.presto.Session;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.RuntimeUnit;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.event.SplitMonitor;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
Expand Down Expand Up @@ -68,6 +70,7 @@
import javax.inject.Inject;

import java.io.Closeable;
import java.lang.management.ManagementFactory;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -95,6 +98,7 @@
import static com.google.common.base.Predicates.notNull;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static java.lang.Math.max;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
Expand Down Expand Up @@ -407,6 +411,10 @@ public TaskInfo updateTask(
requireNonNull(sources, "sources is null");
requireNonNull(outputBuffers, "outputBuffers is null");

RuntimeStats runtimeStats = session.getRuntimeStats();
long startHouseKeepingWallTime = System.nanoTime();
long starthouseKeepingCPUTime = ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();

SqlTask sqlTask = tasks.getUnchecked(taskId);
QueryContext queryContext = sqlTask.getQueryContext();
if (!queryContext.isMemoryLimitsInitialized()) {
Expand All @@ -431,6 +439,9 @@ public TaskInfo updateTask(
queryContext.setHeapDumpFilePath(heapDumpFilePath);

sqlTask.recordHeartbeat();
runtimeStats.addMetricValue("houseKeepingOnWorkerCPUTimeNano", RuntimeUnit.NANO, max(0, ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime() - starthouseKeepingCPUTime));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

runtimeStats.addMetricValue("houseKeepingOnWorkerWallTimeNano", RuntimeUnit.NANO, max(0, System.nanoTime() - startHouseKeepingWallTime));

return sqlTask.updateTask(session, fragment, sources, outputBuffers, tableWriteInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.facebook.airlift.json.Codec;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.Session;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.RuntimeUnit;
import com.facebook.presto.connector.ConnectorTypeSerdeManager;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
Expand Down Expand Up @@ -54,6 +56,7 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -74,6 +77,7 @@
import static com.facebook.presto.util.TaskUtils.randomizeWaitTime;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -135,19 +139,49 @@ public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdat
{
requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");

long startWallTime = System.nanoTime();
long startCpuTime = ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();

Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager, taskUpdateRequest.getExtraCredentials());
RuntimeStats runtimeStats = session.getRuntimeStats();

runtimeStats.addMetricValue("sessionCreationOnWorkerCPUTimeNano", RuntimeUnit.NANO, max(0, ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime() - startCpuTime));
runtimeStats.addMetricValue("sessionCreationOnWorkerWallTimeNano", RuntimeUnit.NANO, max(0, System.nanoTime() - startWallTime));

long startUpdateTaskWallTime = System.nanoTime();
long startUpdateTaskCPUTime = ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();
TaskInfo taskInfo = taskManager.updateTask(session,
taskId,
taskUpdateRequest.getFragment().map(planFragmentCodec::fromBytes),
taskUpdateRequest.getSources(),
taskUpdateRequest.getOutputIds(),
taskUpdateRequest.getTableWriteInfo());
runtimeStats.addMetricValue("taskUpdateOnWorkerCPUTimeNano", RuntimeUnit.NANO, max(0, ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime() - startUpdateTaskCPUTime));
runtimeStats.addMetricValue("taskUpdateOnWorkerWallTimeNano", RuntimeUnit.NANO, max(0, System.nanoTime() - startUpdateTaskWallTime));

TaskResourceUtils.recordTaskUpdateReceivedTimeNanos(session.getRuntimeStats(), ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime() - startCpuTime, System.nanoTime() - startWallTime);

long startSummarizeWallTime = System.nanoTime();
long startSummarizeCPUTime = ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();

if (shouldSummarize(uriInfo)) {
taskInfo = taskInfo.summarize();
}
runtimeStats.addMetricValue("taskInfoSummarizeOnWorkerCPUTimeNano", RuntimeUnit.NANO, max(0, ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime() - startSummarizeCPUTime));
runtimeStats.addMetricValue("taskInfoSummarizeOnWorkerWallTimeNano", RuntimeUnit.NANO, max(0, System.nanoTime() - startSummarizeWallTime));

long responseGenerationWallTime = System.nanoTime();
long responseGenerationCPUTime = ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();
Response response = Response.ok().entity(taskInfo).build();
runtimeStats.addMetricValue("responseGenerationOnWorkerCPUTimeNano", RuntimeUnit.NANO, max(0, ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime() - responseGenerationCPUTime));
runtimeStats.addMetricValue("responseGenerationOnWorkerWallTimeNano", RuntimeUnit.NANO, max(0, System.nanoTime() - responseGenerationWallTime));

runtimeStats.addMetricValue("createOrUpdateTaskOnWorkerCPUTimeNano", RuntimeUnit.NANO, max(0, ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime() - startCpuTime));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anchor

runtimeStats.addMetricValue("createOrUpdateTaskOnWorkerWallTimeNano", RuntimeUnit.NANO, max(0, System.nanoTime() - startWallTime));

return Response.ok().entity(taskInfo).build();
//send task's runtime stats back
taskInfo.getStats().getRuntimeStats().mergeWith(runtimeStats);
return response;
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package com.facebook.presto.server;

import com.facebook.presto.common.RuntimeMetricName;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.RuntimeUnit;
import com.facebook.presto.connector.ConnectorTypeSerdeManager;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.metadata.HandleResolver;
Expand All @@ -32,6 +35,7 @@

import static com.facebook.presto.operator.OperatorInfoUnion.convertToOperatorInfo;
import static com.facebook.presto.operator.OperatorInfoUnion.convertToOperatorInfoUnion;
import static java.lang.Math.max;
import static java.util.stream.Collectors.toList;

public class TaskResourceUtils
Expand Down Expand Up @@ -523,4 +527,16 @@ private static List<ConnectorMetadataUpdateHandle> convertToConnector(
.map(e -> connectorTypeSerde.deserialize(handleResolver.getMetadataUpdateHandleClass(e.getId()), e.getBytes()))
.collect(toList());
}

public static void recordTaskUpdateReceivedTimeNanos(RuntimeStats runtimeStats, long cpuTimeNanos, long wallTimeNanos)
{
runtimeStats.addMetricValue(RuntimeMetricName.TASK_UPDATE_RECEIVED_CPU_TIME_NANOS, RuntimeUnit.NANO, max(0, cpuTimeNanos));
runtimeStats.addMetricValue(RuntimeMetricName.TASK_UPDATE_RECEIVED_WALL_TIME_NANOS, RuntimeUnit.NANO, max(0, wallTimeNanos));
}

public static void recordTaskExecutionCreationTimeNanos(RuntimeStats runtimeStats, long cpuTimeNanos, long wallTimeNanos)
{
runtimeStats.addMetricValue(RuntimeMetricName.TASK_EXECUTION_CREATION_CPU_TIME_NANOS, RuntimeUnit.NANO, max(0, cpuTimeNanos));
runtimeStats.addMetricValue(RuntimeMetricName.TASK_EXECUTION_CREATION_WALL_TIME_NANOS, RuntimeUnit.NANO, max(0, wallTimeNanos));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,7 @@ public void success(TaskInfo value)
taskUpdateTimeline.removeElements(0, deliveredUpdates);
}
updateStats(currentRequestStartNanos);
mergeTaskRuntimeStats(value);
processTaskUpdate(value, sources);
updateErrorTracker.requestSucceeded();
if (oldestTaskUpdateTime != 0) {
Expand All @@ -1171,6 +1172,11 @@ public void success(TaskInfo value)
}
}

private void mergeTaskRuntimeStats(TaskInfo taskInfo)
{
session.getRuntimeStats().mergeWith(taskInfo.getStats().getRuntimeStats());
}

@Override
public void failed(Throwable cause)
{
Expand Down
Loading