From 9a1f6872602dd09e51482abceca3cabfab7c1a24 Mon Sep 17 00:00:00 2001
From: Ivan Burmistrov <90776737+isburmistrov@users.noreply.github.com>
Date: Thu, 31 Oct 2024 14:43:43 +0000
Subject: [PATCH] [ShareChat] Introduced the concept of uniform parallelism
(#1)
### Context
In Tardis, the autoscale struggled with finding the right balance. The
issue is that in the heterogeneous parallelism across all vertices the
decisions of autoscaler can be suboptimal. Vertices are not independent,
and the current parallelism of "parent" vertex influences how much
traffic receives a "child" vertex, hence impacts the decision when we
decide new parallelims of the "child" vertex. Basically - the relative
parallelism of vertices can change after scale event, and in the new -
changed - situation the decision about the optimal scaling can be very
different.
How it looks like in practice is tons of "bouncing": autoscaler scales
down, then quickly realizes it needs to upscale back. And this process
never ends, no matter how hard we try to tune parameters.
### This PR
Introduces the concept of "flat parallelism". To reduce the "cognitive
load" on autoscaler and prevent the situation when relative parallelism
changes over time, we can simply maintain the same parallelism across
all vertices. Pretty much like we do now in Tardis.
With this setting, Tardis job autoscales perfectly, maintaining small
lag without "bouncing" back situations.
---
.github/workflows/ci.yml | 1 +
.../generated/auto_scaler_configuration.html | 6 ++
.../flink/autoscaler/ScalingExecutor.java | 55 ++++++++++-
.../autoscaler/config/AutoScalerOptions.java | 8 ++
.../flink/autoscaler/ScalingExecutorTest.java | 94 +++++++++++++++++++
5 files changed, 163 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index bff879088c..35c313f23e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -26,6 +26,7 @@ on:
branches:
- main
- release-*
+ - main-sharechat
pull_request:
jobs:
test_ci:
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 49ff71efa2..7ec8aab883 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -224,5 +224,11 @@
Integer |
The minimum parallelism the autoscaler can use. |
+
+ job.autoscaler.vertex.uniform-parallelism.enabled |
+ false |
+ Boolean |
+ If true, the autoscaler will enforce the same parallelism for all vertices. |
+
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 02e5ad4f15..e78bef9978 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -238,6 +238,8 @@ Map computeScalingSummary(
var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
+ var uniformParallelism =
+ context.getConfiguration().get(AutoScalerOptions.VERTEX_UNIFORM_PARALLELISM);
evaluatedMetrics
.getVertexMetrics()
.forEach(
@@ -261,6 +263,15 @@ Map computeScalingSummary(
restartTime,
delayedScaleDown);
if (NO_CHANGE == parallelismChange.getChangeType()) {
+ if (uniformParallelism) {
+ // even if no change for the given vertex, in the case of
+ // uniform parallelism, such vertices must participate in
+ // the further scaling decision, so we put them to out
+ out.put(
+ v,
+ getNoChangeScalingSummary(
+ metrics, currentParallelism));
+ }
return;
} else if (REQUIRED_CHANGE == parallelismChange.getChangeType()) {
requiredVertices.add(v);
@@ -281,7 +292,49 @@ Map computeScalingSummary(
return Map.of();
}
- return out;
+ return uniformParallelism ? enforceUniformParallelism(out) : out;
+ }
+
+ private static ScalingSummary getNoChangeScalingSummary(
+ Map metrics, int currentParallelism) {
+ // we can't use constructor with parameters here because it checks whether new parallelism
+ // != current parallelism.
+ var noChangeScalingSummary = new ScalingSummary();
+ noChangeScalingSummary.setCurrentParallelism(currentParallelism);
+ noChangeScalingSummary.setNewParallelism(currentParallelism);
+ // EXPECTED_PROCESSING_RATE is expected to be present in a candidate for scaling
+ metrics.put(ScalingMetric.EXPECTED_PROCESSING_RATE, metrics.get(TRUE_PROCESSING_RATE));
+ noChangeScalingSummary.setMetrics(metrics);
+ return noChangeScalingSummary;
+ }
+
+ // Equalize parallelism across all vertices.
+ // The logic is simple: we compute maximum parallelism, and update newParallelism for each
+ // ScalingSummary to this value.
+ // This function doesn't return those vertices that have currentParallelism == computed max
+ // parallelism
+ private Map enforceUniformParallelism(
+ Map perVertexSummary) {
+ final var maxParallelism =
+ perVertexSummary.values().stream()
+ .mapToInt(ScalingSummary::getNewParallelism)
+ .max();
+ if (maxParallelism.isEmpty()) {
+ return perVertexSummary;
+ }
+ var result = new HashMap();
+ perVertexSummary.forEach(
+ (v, s) -> {
+ if (s.getCurrentParallelism() != maxParallelism.getAsInt()) {
+ result.put(
+ v,
+ new ScalingSummary(
+ s.getCurrentParallelism(),
+ maxParallelism.getAsInt(),
+ s.getMetrics()));
+ }
+ });
+ return result;
}
private boolean isJobUnderMemoryPressure(
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index e1ea6a8695..505f5478f6 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -132,6 +132,14 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"The maximum parallelism the autoscaler can use. Note that this limit will be ignored if it is higher than the max parallelism configured in the Flink config or directly on each operator.");
+ public static final ConfigOption VERTEX_UNIFORM_PARALLELISM =
+ autoScalerConfig("vertex.uniform-parallelism.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withFallbackKeys(oldOperatorConfigKey("vertex.uniform-parallelism.enabled"))
+ .withDescription(
+ "If true, the autoscaler will enforce the same parallelism for all vertices.");
+
public static final ConfigOption MAX_SCALE_DOWN_FACTOR =
autoScalerConfig("scale-down.max-factor")
.doubleType()
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index 3eeac97b06..712a41f40d 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -759,6 +759,100 @@ public void testAdjustByMaxParallelism() throws Exception {
"8"));
}
+ @Test
+ public void testUniformParallelism() throws Exception {
+ var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+ var source = JobVertexID.fromHexString(sourceHexString);
+ var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e";
+ var filterOperator = JobVertexID.fromHexString(filterOperatorHexString);
+ var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+ var sink = JobVertexID.fromHexString(sinkHexString);
+ conf.set(AutoScalerOptions.VERTEX_UNIFORM_PARALLELISM, true);
+ // set scale down interval to zero so we consider scale downs immediately
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d);
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+
+ JobTopology jobTopology =
+ new JobTopology(
+ new VertexInfo(source, Map.of(), 100, MAX_PARALLELISM, false, null),
+ new VertexInfo(
+ filterOperator,
+ Map.of(source, REBALANCE),
+ 25,
+ MAX_PARALLELISM,
+ false,
+ null),
+ new VertexInfo(
+ sink,
+ Map.of(filterOperator, HASH),
+ 50,
+ MAX_PARALLELISM,
+ false,
+ null));
+
+ // The expected new parallelism is 7 without adjustment by max parallelism.
+ var metrics =
+ new EvaluatedMetrics(
+ Map.of(
+ source,
+ evaluated(100, 70, 100),
+ filterOperator,
+ evaluated(25, 70, 70),
+ sink,
+ evaluated(50, 80, 50)),
+ dummyGlobalMetrics);
+
+ conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of(sourceHexString, sinkHexString));
+ // we don't expect the scaling to happen because filterOperator must not scale, and others
+ // are ignored
+ scaleAndCheck(jobTopology, metrics, Map.of());
+
+ conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of(sinkHexString));
+ // parallelism is set to be the maximum across 2 vertices
+ scaleAndCheck(
+ jobTopology,
+ metrics,
+ Map.of(
+ sourceHexString, "70",
+ filterOperatorHexString, "70"));
+
+ conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of());
+ // parallelism is set to be the maximum across 3 vertices
+ scaleAndCheck(
+ jobTopology,
+ metrics,
+ Map.of(
+ sourceHexString, "80",
+ filterOperatorHexString, "80",
+ sinkHexString, "80"));
+ }
+
+ private void scaleAndCheck(
+ JobTopology jobTopology,
+ EvaluatedMetrics metrics,
+ Map expectedParallelismOverrides)
+ throws Exception {
+ var now = Instant.now();
+ var assertion =
+ assertThat(
+ scalingExecutor.scaleResource(
+ context,
+ metrics,
+ new HashMap<>(),
+ new ScalingTracking(),
+ now,
+ jobTopology,
+ new DelayedScaleDown()));
+ if (!expectedParallelismOverrides.isEmpty()) {
+ assertion.isTrue();
+ Map parallelismOverrides = stateStore.getParallelismOverrides(context);
+ assertThat(parallelismOverrides).containsAllEntriesOf(expectedParallelismOverrides);
+ } else {
+ assertion.isFalse();
+ }
+ }
+
@ParameterizedTest
@MethodSource("testDataForQuota")
public void testQuota(