Skip to content

Commit

Permalink
Introduced the concept of uniform parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
isburmistrov committed Oct 31, 2024
1 parent 4b0aae9 commit 5a36e40
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on:
branches:
- main
- release-*
- main-sharechat
pull_request:
jobs:
test_ci:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,5 +224,11 @@
<td>Integer</td>
<td>The minimum parallelism the autoscaler can use.</td>
</tr>
<tr>
<td><h5>job.autoscaler.vertex.uniform-parallelism.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, the autoscaler will enforce the same parallelism for all vertices.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(

var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
var uniformParallelism =
context.getConfiguration().get(AutoScalerOptions.VERTEX_UNIFORM_PARALLELISM);
evaluatedMetrics
.getVertexMetrics()
.forEach(
Expand All @@ -261,6 +263,15 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
restartTime,
delayedScaleDown);
if (NO_CHANGE == parallelismChange.getChangeType()) {
if (uniformParallelism) {
// even if no change for the given vertex, in the case of
// uniform parallelism, such vertices must participate in
// the further scaling decision, so we put them to out
out.put(
v,
getNoChangeScalingSummary(
metrics, currentParallelism));
}
return;
} else if (REQUIRED_CHANGE == parallelismChange.getChangeType()) {
requiredVertices.add(v);
Expand All @@ -281,7 +292,49 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
return Map.of();
}

return out;
return uniformParallelism ? enforceUniformParallelism(out) : out;
}

private static ScalingSummary getNoChangeScalingSummary(
Map<ScalingMetric, EvaluatedScalingMetric> metrics, int currentParallelism) {
// we can't use constructor with parameters here because it checks whether new parallelism
// != current parallelism.
var noChangeScalingSummary = new ScalingSummary();
noChangeScalingSummary.setCurrentParallelism(currentParallelism);
noChangeScalingSummary.setNewParallelism(currentParallelism);
// EXPECTED_PROCESSING_RATE is expected to be present in a candidate for scaling
metrics.put(ScalingMetric.EXPECTED_PROCESSING_RATE, metrics.get(TRUE_PROCESSING_RATE));
noChangeScalingSummary.setMetrics(metrics);
return noChangeScalingSummary;
}

// Equalize parallelism across all vertices.
// The logic is simple: we compute maximum parallelism, and update newParallelism for each
// ScalingSummary to this value.
// This function doesn't return those vertices that have currentParallelism == computed max
// parallelism
private Map<JobVertexID, ScalingSummary> enforceUniformParallelism(
Map<JobVertexID, ScalingSummary> perVertexSummary) {
final var maxParallelism =
perVertexSummary.values().stream()
.mapToInt(ScalingSummary::getNewParallelism)
.max();
if (maxParallelism.isEmpty()) {
return perVertexSummary;
}
var result = new HashMap<JobVertexID, ScalingSummary>();
perVertexSummary.forEach(
(v, s) -> {
if (s.getCurrentParallelism() != maxParallelism.getAsInt()) {
result.put(
v,
new ScalingSummary(
s.getCurrentParallelism(),
maxParallelism.getAsInt(),
s.getMetrics()));
}
});
return result;
}

private boolean isJobUnderMemoryPressure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"The maximum parallelism the autoscaler can use. Note that this limit will be ignored if it is higher than the max parallelism configured in the Flink config or directly on each operator.");

public static final ConfigOption<Boolean> VERTEX_UNIFORM_PARALLELISM =
autoScalerConfig("vertex.uniform-parallelism.enabled")
.booleanType()
.defaultValue(false)
.withFallbackKeys(oldOperatorConfigKey("vertex.uniform-parallelism.enabled"))
.withDescription(
"If true, the autoscaler will enforce the same parallelism for all vertices.");

public static final ConfigOption<Double> MAX_SCALE_DOWN_FACTOR =
autoScalerConfig("scale-down.max-factor")
.doubleType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,100 @@ public void testAdjustByMaxParallelism() throws Exception {
"8"));
}

@Test
public void testUniformParallelism() throws Exception {
var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
var source = JobVertexID.fromHexString(sourceHexString);
var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e";
var filterOperator = JobVertexID.fromHexString(filterOperatorHexString);
var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
var sink = JobVertexID.fromHexString(sinkHexString);
conf.set(AutoScalerOptions.VERTEX_UNIFORM_PARALLELISM, true);
// set scale down interval to zero so we consider scale downs immediately
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d);
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);

JobTopology jobTopology =
new JobTopology(
new VertexInfo(source, Map.of(), 100, MAX_PARALLELISM, false, null),
new VertexInfo(
filterOperator,
Map.of(source, REBALANCE),
25,
MAX_PARALLELISM,
false,
null),
new VertexInfo(
sink,
Map.of(filterOperator, HASH),
50,
MAX_PARALLELISM,
false,
null));

// The expected new parallelism is 7 without adjustment by max parallelism.
var metrics =
new EvaluatedMetrics(
Map.of(
source,
evaluated(100, 70, 100),
filterOperator,
evaluated(25, 70, 70),
sink,
evaluated(50, 80, 50)),
dummyGlobalMetrics);

conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of(sourceHexString, sinkHexString));
// we don't expect the scaling to happen because filterOperator must not scale, and others
// are ignored
scaleAndCheck(jobTopology, metrics, Map.of());

conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of(sinkHexString));
// parallelism is set to be the maximum across 2 vertices
scaleAndCheck(
jobTopology,
metrics,
Map.of(
sourceHexString, "70",
filterOperatorHexString, "70"));

conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of());
// parallelism is set to be the maximum across 3 vertices
scaleAndCheck(
jobTopology,
metrics,
Map.of(
sourceHexString, "80",
filterOperatorHexString, "80",
sinkHexString, "80"));
}

private void scaleAndCheck(
JobTopology jobTopology,
EvaluatedMetrics metrics,
Map<String, String> expectedParallelismOverrides)
throws Exception {
var now = Instant.now();
var assertion =
assertThat(
scalingExecutor.scaleResource(
context,
metrics,
new HashMap<>(),
new ScalingTracking(),
now,
jobTopology,
new DelayedScaleDown()));
if (!expectedParallelismOverrides.isEmpty()) {
assertion.isTrue();
Map<String, String> parallelismOverrides = stateStore.getParallelismOverrides(context);
assertThat(parallelismOverrides).containsAllEntriesOf(expectedParallelismOverrides);
} else {
assertion.isFalse();
}
}

@ParameterizedTest
@MethodSource("testDataForQuota")
public void testQuota(
Expand Down

0 comments on commit 5a36e40

Please sign in to comment.