Skip to content

Commit

Permalink
[ShareChat] Introduced the concept of uniform parallelism (#1)
Browse files Browse the repository at this point in the history
### Context

In Tardis, the autoscale struggled with finding the right balance. The
issue is that in the heterogeneous parallelism across all vertices the
decisions of autoscaler can be suboptimal. Vertices are not independent,
and the current parallelism of "parent" vertex influences how much
traffic receives a "child" vertex, hence impacts the decision when we
decide new parallelims of the "child" vertex. Basically - the relative
parallelism of vertices can change after scale event, and in the new -
changed - situation the decision about the optimal scaling can be very
different.

How it looks like in practice is tons of "bouncing": autoscaler scales
down, then quickly realizes it needs to upscale back. And this process
never ends, no matter how hard we try to tune parameters.

<img width="896" alt="image"
src="https://github.com/user-attachments/assets/05d32637-cc96-4d60-8021-49396e297234">

### This PR

Introduces the concept of "flat parallelism". To reduce the "cognitive
load" on autoscaler and prevent the situation when relative parallelism
changes over time, we can simply maintain the same parallelism across
all vertices. Pretty much like we do now in Tardis.

With this setting, Tardis job autoscales perfectly, maintaining small
lag without "bouncing" back situations.

<img width="895" alt="image"
src="https://github.com/user-attachments/assets/ca2f77e8-7ad4-4009-8fdd-3be5971dd8f6">

<img width="893" alt="image"
src="https://github.com/user-attachments/assets/f9060ee1-0503-47b0-ac89-8babfc472862">
  • Loading branch information
isburmistrov authored Oct 31, 2024
1 parent 4b0aae9 commit 9a1f687
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on:
branches:
- main
- release-*
- main-sharechat
pull_request:
jobs:
test_ci:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,5 +224,11 @@
<td>Integer</td>
<td>The minimum parallelism the autoscaler can use.</td>
</tr>
<tr>
<td><h5>job.autoscaler.vertex.uniform-parallelism.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, the autoscaler will enforce the same parallelism for all vertices.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(

var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
var uniformParallelism =
context.getConfiguration().get(AutoScalerOptions.VERTEX_UNIFORM_PARALLELISM);
evaluatedMetrics
.getVertexMetrics()
.forEach(
Expand All @@ -261,6 +263,15 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
restartTime,
delayedScaleDown);
if (NO_CHANGE == parallelismChange.getChangeType()) {
if (uniformParallelism) {
// even if no change for the given vertex, in the case of
// uniform parallelism, such vertices must participate in
// the further scaling decision, so we put them to out
out.put(
v,
getNoChangeScalingSummary(
metrics, currentParallelism));
}
return;
} else if (REQUIRED_CHANGE == parallelismChange.getChangeType()) {
requiredVertices.add(v);
Expand All @@ -281,7 +292,49 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
return Map.of();
}

return out;
return uniformParallelism ? enforceUniformParallelism(out) : out;
}

private static ScalingSummary getNoChangeScalingSummary(
Map<ScalingMetric, EvaluatedScalingMetric> metrics, int currentParallelism) {
// we can't use constructor with parameters here because it checks whether new parallelism
// != current parallelism.
var noChangeScalingSummary = new ScalingSummary();
noChangeScalingSummary.setCurrentParallelism(currentParallelism);
noChangeScalingSummary.setNewParallelism(currentParallelism);
// EXPECTED_PROCESSING_RATE is expected to be present in a candidate for scaling
metrics.put(ScalingMetric.EXPECTED_PROCESSING_RATE, metrics.get(TRUE_PROCESSING_RATE));
noChangeScalingSummary.setMetrics(metrics);
return noChangeScalingSummary;
}

// Equalize parallelism across all vertices.
// The logic is simple: we compute maximum parallelism, and update newParallelism for each
// ScalingSummary to this value.
// This function doesn't return those vertices that have currentParallelism == computed max
// parallelism
private Map<JobVertexID, ScalingSummary> enforceUniformParallelism(
Map<JobVertexID, ScalingSummary> perVertexSummary) {
final var maxParallelism =
perVertexSummary.values().stream()
.mapToInt(ScalingSummary::getNewParallelism)
.max();
if (maxParallelism.isEmpty()) {
return perVertexSummary;
}
var result = new HashMap<JobVertexID, ScalingSummary>();
perVertexSummary.forEach(
(v, s) -> {
if (s.getCurrentParallelism() != maxParallelism.getAsInt()) {
result.put(
v,
new ScalingSummary(
s.getCurrentParallelism(),
maxParallelism.getAsInt(),
s.getMetrics()));
}
});
return result;
}

private boolean isJobUnderMemoryPressure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"The maximum parallelism the autoscaler can use. Note that this limit will be ignored if it is higher than the max parallelism configured in the Flink config or directly on each operator.");

public static final ConfigOption<Boolean> VERTEX_UNIFORM_PARALLELISM =
autoScalerConfig("vertex.uniform-parallelism.enabled")
.booleanType()
.defaultValue(false)
.withFallbackKeys(oldOperatorConfigKey("vertex.uniform-parallelism.enabled"))
.withDescription(
"If true, the autoscaler will enforce the same parallelism for all vertices.");

public static final ConfigOption<Double> MAX_SCALE_DOWN_FACTOR =
autoScalerConfig("scale-down.max-factor")
.doubleType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,100 @@ public void testAdjustByMaxParallelism() throws Exception {
"8"));
}

@Test
public void testUniformParallelism() throws Exception {
var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
var source = JobVertexID.fromHexString(sourceHexString);
var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e";
var filterOperator = JobVertexID.fromHexString(filterOperatorHexString);
var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
var sink = JobVertexID.fromHexString(sinkHexString);
conf.set(AutoScalerOptions.VERTEX_UNIFORM_PARALLELISM, true);
// set scale down interval to zero so we consider scale downs immediately
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d);
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);

JobTopology jobTopology =
new JobTopology(
new VertexInfo(source, Map.of(), 100, MAX_PARALLELISM, false, null),
new VertexInfo(
filterOperator,
Map.of(source, REBALANCE),
25,
MAX_PARALLELISM,
false,
null),
new VertexInfo(
sink,
Map.of(filterOperator, HASH),
50,
MAX_PARALLELISM,
false,
null));

// The expected new parallelism is 7 without adjustment by max parallelism.
var metrics =
new EvaluatedMetrics(
Map.of(
source,
evaluated(100, 70, 100),
filterOperator,
evaluated(25, 70, 70),
sink,
evaluated(50, 80, 50)),
dummyGlobalMetrics);

conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of(sourceHexString, sinkHexString));
// we don't expect the scaling to happen because filterOperator must not scale, and others
// are ignored
scaleAndCheck(jobTopology, metrics, Map.of());

conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of(sinkHexString));
// parallelism is set to be the maximum across 2 vertices
scaleAndCheck(
jobTopology,
metrics,
Map.of(
sourceHexString, "70",
filterOperatorHexString, "70"));

conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of());
// parallelism is set to be the maximum across 3 vertices
scaleAndCheck(
jobTopology,
metrics,
Map.of(
sourceHexString, "80",
filterOperatorHexString, "80",
sinkHexString, "80"));
}

private void scaleAndCheck(
JobTopology jobTopology,
EvaluatedMetrics metrics,
Map<String, String> expectedParallelismOverrides)
throws Exception {
var now = Instant.now();
var assertion =
assertThat(
scalingExecutor.scaleResource(
context,
metrics,
new HashMap<>(),
new ScalingTracking(),
now,
jobTopology,
new DelayedScaleDown()));
if (!expectedParallelismOverrides.isEmpty()) {
assertion.isTrue();
Map<String, String> parallelismOverrides = stateStore.getParallelismOverrides(context);
assertThat(parallelismOverrides).containsAllEntriesOf(expectedParallelismOverrides);
} else {
assertion.isFalse();
}
}

@ParameterizedTest
@MethodSource("testDataForQuota")
public void testQuota(
Expand Down

0 comments on commit 9a1f687

Please sign in to comment.