Skip to content

Commit 8608371

Browse files
authored
[FLINK-36443][table][test] Improve flaky tests related to partition specs (#26335)
* [FLINK-36443][table][test] Improve flaky partitionable table source tests * Add sorting to partition spec
1 parent c6c90b8 commit 8608371

File tree

4 files changed

+105
-5
lines changed

4 files changed

+105
-5
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.table.api.TableException;
2222
import org.apache.flink.table.connector.source.DynamicTableSource;
2323
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
24+
import org.apache.flink.table.planner.utils.PartitionUtils;
2425

2526
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
2627
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -30,7 +31,6 @@
3031
import java.util.List;
3132
import java.util.Map;
3233
import java.util.Objects;
33-
import java.util.stream.Collectors;
3434

3535
import static org.apache.flink.util.Preconditions.checkNotNull;
3636

@@ -74,9 +74,7 @@ public boolean needAdjustFieldReferenceAfterProjection() {
7474

7575
@Override
7676
public String getDigests(SourceAbilityContext context) {
77-
return "partitions=["
78-
+ this.partitions.stream().map(Object::toString).collect(Collectors.joining(", "))
79-
+ "]";
77+
return "partitions=[" + PartitionUtils.sortPartitionsByKey(this.partitions) + "]";
8078
}
8179

8280
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.utils;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.stream.Collectors;
24+
25+
/** Helper functions for partitions. */
26+
public class PartitionUtils {
27+
28+
private PartitionUtils() {}
29+
30+
/**
31+
* Returns partitions sorted by key.
32+
*
33+
* @param partitions list of partition key value pairs
34+
* @return sorted partitions
35+
*/
36+
public static String sortPartitionsByKey(final List<Map<String, String>> partitions) {
37+
return partitions.stream()
38+
.map(PartitionUtils::sortPartitionByKey)
39+
.collect(Collectors.joining(", "));
40+
}
41+
42+
private static String sortPartitionByKey(final Map<String, String> partition) {
43+
return partition.entrySet().stream()
44+
.sorted(Map.Entry.comparingByKey())
45+
.map(entry -> entry.getKey() + "=" + entry.getValue())
46+
.collect(Collectors.joining(", ", "{", "}"));
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.abilities.source;
20+
21+
import org.junit.jupiter.api.BeforeEach;
22+
import org.junit.jupiter.api.Test;
23+
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
30+
class PartitionPushDownSpecTest {
31+
32+
private List<Map<String, String>> partitions;
33+
34+
@BeforeEach
35+
void beforeEach() {
36+
partitions =
37+
List.of(
38+
Map.of("part2", "A", "part1", "B", "part3", "C"),
39+
Map.of("part1", "C", "part2", "D"));
40+
}
41+
42+
@Test
43+
void testDigestsEmpty() {
44+
PartitionPushDownSpec spec = new PartitionPushDownSpec(Collections.emptyList());
45+
assertThat(spec.getDigests(null)).isEqualTo("partitions=[]");
46+
}
47+
48+
@Test
49+
void testDigestsSorted() {
50+
PartitionPushDownSpec spec = new PartitionPushDownSpec(partitions);
51+
assertThat(spec.getDigests(null))
52+
.isEqualTo("partitions=[{part1=B, part2=A, part3=C}, {part1=C, part2=D}]");
53+
}
54+
}

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ class TestPartitionableTableSource(
329329

330330
override def explainSource(): String = {
331331
if (remainingPartitions != null) {
332-
s"partitions=${remainingPartitions.mkString(", ")}"
332+
s"partitions=${PartitionUtils.sortPartitionsByKey(remainingPartitions)}"
333333
} else {
334334
""
335335
}

0 commit comments

Comments
 (0)