Skip to content

Commit

Permalink
Fix SharedStorageUtilsTest
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanoid committed May 5, 2023
1 parent 9a3ef6b commit 0885794
Showing 1 changed file with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static SharedStorageBody.SharedStorageBodyResult sharedStorageBody(List<DataStre

BOperator bOp = new BOperator();
SingleOutputStreamOperator<Long> afterBOp =
data.transform("b", TypeInformation.of(Long.class), bOp);
afterAOp.transform("b", TypeInformation.of(Long.class), bOp);

Map<ItemDescriptor<?>, SharedStorageStreamOperator> ownerMap = new HashMap<>();
ownerMap.put(SUM, aOp);
Expand Down Expand Up @@ -89,7 +89,9 @@ public void testSharedStorage() throws Exception {

/** Operator A: add input elements to the shared {@link #SUM}. */
static class AOperator extends AbstractStreamOperator<Long>
implements OneInputStreamOperator<Long, Long>, SharedStorageStreamOperator {
implements OneInputStreamOperator<Long, Long>,
SharedStorageStreamOperator,
BoundedOneInput {

private final String sharedStorageAccessorID;
private SharedStorageContext sharedStorageContext;
Expand All @@ -115,15 +117,18 @@ public void processElement(StreamRecord<Long> element) throws Exception {
Long currentSum = getter.get(SUM);
setter.set(SUM, currentSum + element.getValue());
});
output.collect(element);
}

@Override
public void endInput() throws Exception {
// Informs BOperator to get the value from shared {@link #SUM}.
output.collect(new StreamRecord<>(0L));
}
}

/** Operator B: when input ends, get the value from shared {@link #SUM}. */
static class BOperator extends AbstractStreamOperator<Long>
implements OneInputStreamOperator<Long, Long>,
SharedStorageStreamOperator,
BoundedOneInput {
implements OneInputStreamOperator<Long, Long>, SharedStorageStreamOperator {

private final String sharedStorageAccessorID;
private SharedStorageContext sharedStorageContext;
Expand All @@ -143,10 +148,7 @@ public String getSharedStorageAccessorID() {
}

@Override
public void processElement(StreamRecord<Long> element) throws Exception {}

@Override
public void endInput() throws Exception {
public void processElement(StreamRecord<Long> element) throws Exception {
sharedStorageContext.invoke(
(getter, setter) -> {
output.collect(new StreamRecord<>(getter.get(SUM)));
Expand Down

0 comments on commit 0885794

Please sign in to comment.