From 0333afe7f54c8d209e74286f64f5cbdbd134e37d Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Mon, 18 Nov 2024 11:35:45 +0800 Subject: [PATCH] [flink] Avoid deprecated SetupableStreamOperator --- .../sink/cdc/FlinkCdcMultiTableSink.java | 8 +- .../cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 4 +- .../sink/cdc/FlinkCdcMultiTableSinkTest.java | 1 - .../AppendBypassCompactWorkerOperator.java | 2 - .../AutoTagForSavepointCommitterOperator.java | 24 +--- ...gForSavepointCommitterOperatorFactory.java | 94 ++++++++++++++ .../sink/BatchWriteGeneratorTagOperator.java | 24 +--- ...BatchWriteGeneratorTagOperatorFactory.java | 60 +++++++++ .../sink/CombinedTableCompactorSink.java | 6 +- .../paimon/flink/sink/CommitterOperator.java | 26 +--- .../flink/sink/CommitterOperatorFactory.java | 115 ++++++++++++++++++ .../apache/paimon/flink/sink/FlinkSink.java | 19 +-- .../paimon/flink/sink/LocalMergeOperator.java | 2 - .../flink/sink/PrepareCommitOperator.java | 2 - .../paimon/flink/sink/UnawareBucketSink.java | 2 + .../sink/index/IndexBootstrapOperator.java | 2 - .../AppendBypassCoordinateOperator.java | 8 +- ...AppendBypassCoordinateOperatorFactory.java | 6 +- ...oTagForSavepointCommitterOperatorTest.java | 35 +++--- .../BatchWriteGeneratorTagOperatorTest.java | 39 ++++-- .../flink/sink/CommitterOperatorTest.java | 110 ++++++++++------- .../flink/sink/StoreMultiCommitterTest.java | 15 ++- 22 files changed, 428 insertions(+), 176 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorFactory.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorFactory.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperatorFactory.java diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index 55e987c6055f..c7cc6236008c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -21,7 +21,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.sink.CommittableStateManager; import org.apache.paimon.flink.sink.Committer; -import org.apache.paimon.flink.sink.CommitterOperator; +import org.apache.paimon.flink.sink.CommitterOperatorFactory; import org.apache.paimon.flink.sink.FlinkSink; import org.apache.paimon.flink.sink.FlinkStreamPartitioner; import org.apache.paimon.flink.sink.MultiTableCommittable; @@ -63,19 +63,16 @@ public class FlinkCdcMultiTableSink implements Serializable { private final Catalog.Loader catalogLoader; private final double commitCpuCores; @Nullable private final MemorySize commitHeapMemory; - private final boolean commitChaining; private final String commitUser; public FlinkCdcMultiTableSink( Catalog.Loader catalogLoader, double commitCpuCores, @Nullable MemorySize commitHeapMemory, - boolean commitChaining, String commitUser) { this.catalogLoader = catalogLoader; this.commitCpuCores = commitCpuCores; this.commitHeapMemory = commitHeapMemory; - this.commitChaining = commitChaining; this.commitUser = commitUser; } @@ -129,10 +126,9 @@ public DataStreamSink sinkFrom( .transform( GLOBAL_COMMITTER_NAME, typeInfo, - new CommitterOperator<>( + new CommitterOperatorFactory<>( true, false, - commitChaining, commitUser, createCommitterFactory(), createCommittableStateManager())) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index ed8fdd113389..a9ad66847b4b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -66,7 +66,6 @@ public class FlinkCdcSyncDatabaseSinkBuilder { @Nullable private Integer parallelism; private double committerCpu; @Nullable private MemorySize committerMemory; - private boolean commitChaining; // Paimon catalog used to check and create tables. There will be two // places where this catalog is used. 1) in processing function, @@ -103,7 +102,6 @@ public FlinkCdcSyncDatabaseSinkBuilder withTableOptions(Options options) { this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM); this.committerCpu = options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU); this.committerMemory = options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY); - this.commitChaining = options.get(FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING); this.commitUser = createCommitUser(options); return this; } @@ -169,7 +167,7 @@ private void buildCombinedCdcSink() { FlinkCdcMultiTableSink sink = new FlinkCdcMultiTableSink( - catalogLoader, committerCpu, committerMemory, commitChaining, commitUser); + catalogLoader, committerCpu, committerMemory, commitUser); sink.sinkFrom(partitioned); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java index fd23e500d5e5..e1bd112ca751 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java @@ -60,7 +60,6 @@ public void cancel() {} () -> FlinkCatalogFactory.createPaimonCatalog(new Options()), FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(), null, - true, UUID.randomUUID().toString()); DataStreamSink dataStreamSink = sink.sinkFrom(input); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java index 92cd31ea8aa2..7c9ba436afb3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java @@ -21,7 +21,6 @@ import org.apache.paimon.append.UnawareAppendCompactionTask; import org.apache.paimon.table.FileStoreTable; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Either; @@ -31,7 +30,6 @@ public class AppendBypassCompactWorkerOperator public AppendBypassCompactWorkerOperator(FileStoreTable table, String commitUser) { super(table, commitUser); - this.chainingStrategy = ChainingStrategy.HEAD; } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index 6d27c6019483..0822f0461241 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -32,18 +32,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.SetupableStreamOperator; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import java.time.Duration; @@ -58,9 +53,7 @@ * time, tags are automatically created for each flink savepoint. */ public class AutoTagForSavepointCommitterOperator - implements OneInputStreamOperator, - SetupableStreamOperator, - BoundedOneInput { + implements OneInputStreamOperator, BoundedOneInput { public static final String SAVEPOINT_TAG_PREFIX = "savepoint-"; private static final long serialVersionUID = 1L; @@ -256,19 +249,4 @@ public void setKeyContextElement(StreamRecord record) throws Exception public void endInput() throws Exception { commitOperator.endInput(); } - - @Override - public void setup(StreamTask containingTask, StreamConfig config, Output output) { - commitOperator.setup(containingTask, config, output); - } - - @Override - public ChainingStrategy getChainingStrategy() { - return commitOperator.getChainingStrategy(); - } - - @Override - public void setChainingStrategy(ChainingStrategy strategy) { - commitOperator.setChainingStrategy(strategy); - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorFactory.java new file mode 100644 index 000000000000..1787f8e7adce --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorFactory.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.SerializableSupplier; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; + +import java.time.Duration; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + +/** + * {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link + * AutoTagForSavepointCommitterOperator}. + */ +public class AutoTagForSavepointCommitterOperatorFactory + extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory { + + private final CommitterOperatorFactory commitOperatorFactory; + + private final SerializableSupplier snapshotManagerFactory; + + private final SerializableSupplier tagManagerFactory; + + private final SerializableSupplier tagDeletionFactory; + + private final SerializableSupplier> callbacksSupplier; + + private final NavigableSet identifiersForTags; + + private final Duration tagTimeRetained; + + public AutoTagForSavepointCommitterOperatorFactory( + CommitterOperatorFactory commitOperatorFactory, + SerializableSupplier snapshotManagerFactory, + SerializableSupplier tagManagerFactory, + SerializableSupplier tagDeletionFactory, + SerializableSupplier> callbacksSupplier, + Duration tagTimeRetained) { + this.commitOperatorFactory = commitOperatorFactory; + this.tagManagerFactory = tagManagerFactory; + this.snapshotManagerFactory = snapshotManagerFactory; + this.tagDeletionFactory = tagDeletionFactory; + this.callbacksSupplier = callbacksSupplier; + this.identifiersForTags = new TreeSet<>(); + this.tagTimeRetained = tagTimeRetained; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new AutoTagForSavepointCommitterOperator<>( + commitOperatorFactory.createStreamOperator(parameters), + snapshotManagerFactory, + tagManagerFactory, + tagDeletionFactory, + callbacksSupplier, + tagTimeRetained); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return AutoTagForSavepointCommitterOperator.class; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index 23202b45077f..1cbcc4b2262f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -28,18 +28,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.SetupableStreamOperator; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import java.time.Instant; @@ -53,9 +48,7 @@ * completed, the corresponding tag is generated. */ public class BatchWriteGeneratorTagOperator - implements OneInputStreamOperator, - SetupableStreamOperator, - BoundedOneInput { + implements OneInputStreamOperator, BoundedOneInput { private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-"; @@ -250,19 +243,4 @@ public void setKeyContextElement(StreamRecord record) throws Exception public void endInput() throws Exception { commitOperator.endInput(); } - - @Override - public void setup(StreamTask containingTask, StreamConfig config, Output output) { - commitOperator.setup(containingTask, config, output); - } - - @Override - public ChainingStrategy getChainingStrategy() { - return commitOperator.getChainingStrategy(); - } - - @Override - public void setChainingStrategy(ChainingStrategy strategy) { - commitOperator.setChainingStrategy(strategy); - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorFactory.java new file mode 100644 index 000000000000..e3c0e5c49168 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; + +/** + * {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link + * BatchWriteGeneratorTagOperator}. + */ +public class BatchWriteGeneratorTagOperatorFactory + extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory { + private final CommitterOperatorFactory commitOperatorFactory; + + protected final FileStoreTable table; + + public BatchWriteGeneratorTagOperatorFactory( + CommitterOperatorFactory commitOperatorFactory, + FileStoreTable table) { + this.table = table; + this.commitOperatorFactory = commitOperatorFactory; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new BatchWriteGeneratorTagOperator<>( + commitOperatorFactory.createStreamOperator(parameters), table); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return BatchWriteGeneratorTagOperator.class; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index 87a28091fa30..fc84dd671469 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -153,15 +153,17 @@ protected DataStreamSink doCommit( .transform( GLOBAL_COMMITTER_NAME, new MultiTableCommittableTypeInfo(), - new CommitterOperator<>( + new CommitterOperatorFactory<>( streamingCheckpointEnabled, false, - options.get(SINK_COMMITTER_OPERATOR_CHAINING), commitUser, createCommitterFactory(isStreaming), createCommittableStateManager(), options.get(END_INPUT_WATERMARK))) .setParallelism(written.getParallelism()); + if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) { + committed = committed.startNewChain(); + } return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java index 021a5db413d5..383cbcd6ebf7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java @@ -25,8 +25,8 @@ import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -91,26 +91,9 @@ public class CommitterOperator extends AbstractStreamOpe private final Long endInputWatermark; public CommitterOperator( + StreamOperatorParameters parameters, boolean streamingCheckpointEnabled, boolean forceSingleParallelism, - boolean chaining, - String initialCommitUser, - Committer.Factory committerFactory, - CommittableStateManager committableStateManager) { - this( - streamingCheckpointEnabled, - forceSingleParallelism, - chaining, - initialCommitUser, - committerFactory, - committableStateManager, - null); - } - - public CommitterOperator( - boolean streamingCheckpointEnabled, - boolean forceSingleParallelism, - boolean chaining, String initialCommitUser, Committer.Factory committerFactory, CommittableStateManager committableStateManager, @@ -122,7 +105,10 @@ public CommitterOperator( this.committerFactory = checkNotNull(committerFactory); this.committableStateManager = committableStateManager; this.endInputWatermark = endInputWatermark; - setChainingStrategy(chaining ? ChainingStrategy.ALWAYS : ChainingStrategy.HEAD); + this.setup( + parameters.getContainingTask(), + parameters.getStreamConfig(), + parameters.getOutput()); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperatorFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperatorFactory.java new file mode 100644 index 000000000000..cce3d4e176bf --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperatorFactory.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; + +import java.util.NavigableMap; +import java.util.TreeMap; + +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** + * {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link + * CommitterOperator}. + */ +public class CommitterOperatorFactory + extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory { + protected final boolean streamingCheckpointEnabled; + + /** Whether to check the parallelism while runtime. */ + protected final boolean forceSingleParallelism; + /** + * This commitUser is valid only for new jobs. After the job starts, this commitUser will be + * recorded into the states of write and commit operators. When the job restarts, commitUser + * will be recovered from states and this value is ignored. + */ + protected final String initialCommitUser; + + /** Group the committable by the checkpoint id. */ + protected final NavigableMap committablesPerCheckpoint; + + protected final Committer.Factory committerFactory; + + protected final CommittableStateManager committableStateManager; + + /** + * Aggregate committables to global committables and commit the global committables to the + * external system. + */ + protected Committer committer; + + protected final Long endInputWatermark; + + public CommitterOperatorFactory( + boolean streamingCheckpointEnabled, + boolean forceSingleParallelism, + String initialCommitUser, + Committer.Factory committerFactory, + CommittableStateManager committableStateManager) { + this( + streamingCheckpointEnabled, + forceSingleParallelism, + initialCommitUser, + committerFactory, + committableStateManager, + null); + } + + public CommitterOperatorFactory( + boolean streamingCheckpointEnabled, + boolean forceSingleParallelism, + String initialCommitUser, + Committer.Factory committerFactory, + CommittableStateManager committableStateManager, + Long endInputWatermark) { + this.streamingCheckpointEnabled = streamingCheckpointEnabled; + this.forceSingleParallelism = forceSingleParallelism; + this.initialCommitUser = initialCommitUser; + this.committablesPerCheckpoint = new TreeMap<>(); + this.committerFactory = checkNotNull(committerFactory); + this.committableStateManager = committableStateManager; + this.endInputWatermark = endInputWatermark; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new CommitterOperator<>( + parameters, + streamingCheckpointEnabled, + forceSingleParallelism, + initialCommitUser, + committerFactory, + committableStateManager, + endInputWatermark); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return CommitterOperator.class; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 59f2f4b1035f..92f7db69882b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.table.api.config.ExecutionConfigOptions; import javax.annotation.Nullable; @@ -269,11 +270,10 @@ protected DataStreamSink doCommit(DataStream written, String com } Options options = Options.fromMap(table.options()); - OneInputStreamOperator committerOperator = - new CommitterOperator<>( + OneInputStreamOperatorFactory committerOperator = + new CommitterOperatorFactory<>( streamingCheckpointEnabled, true, - options.get(SINK_COMMITTER_OPERATOR_CHAINING), commitUser, createCommitterFactory(), createCommittableStateManager(), @@ -281,8 +281,9 @@ protected DataStreamSink doCommit(DataStream written, String com if (options.get(SINK_AUTO_TAG_FOR_SAVEPOINT)) { committerOperator = - new AutoTagForSavepointCommitterOperator<>( - (CommitterOperator) committerOperator, + new AutoTagForSavepointCommitterOperatorFactory<>( + (CommitterOperatorFactory) + committerOperator, table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), @@ -292,8 +293,9 @@ protected DataStreamSink doCommit(DataStream written, String com if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) { committerOperator = - new BatchWriteGeneratorTagOperator<>( - (CommitterOperator) committerOperator, + new BatchWriteGeneratorTagOperatorFactory<>( + (CommitterOperatorFactory) + committerOperator, table); } SingleOutputStreamOperator committed = @@ -311,6 +313,9 @@ protected DataStreamSink doCommit(DataStream written, String com table.name(), options.get(SINK_OPERATOR_UID_SUFFIX))); } + if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) { + committed = committed.startNewChain(); + } configureGlobalCommitter( committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY)); return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java index 6931fe907218..e38844f60a73 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java @@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; @@ -82,7 +81,6 @@ public LocalMergeOperator(TableSchema schema) { "LocalMergeOperator currently only support tables with primary keys"); this.schema = schema; this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete(); - setChainingStrategy(ChainingStrategy.ALWAYS); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java index 3668386ddc2d..9e82ffd9ff41 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java @@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -54,7 +53,6 @@ public abstract class PrepareCommitOperator extends AbstractStreamOpera public PrepareCommitOperator(Options options) { this.options = options; - setChainingStrategy(ChainingStrategy.ALWAYS); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java index 98b58aa8e96d..f59e728a70f4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java @@ -74,11 +74,13 @@ public DataStream doWrite( new CommittableTypeInfo(), new CompactionTaskTypeInfo()), new AppendBypassCoordinateOperatorFactory<>(table)) + .startNewChain() .forceNonParallel() .transform( "Compact Worker: " + table.name(), new CommittableTypeInfo(), new AppendBypassCompactWorkerOperator(table, initialCommitUser)) + .startNewChain() .setParallelism(written.getParallelism()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java index 5c8ba8f9441f..3eb1f11a02a5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java @@ -27,7 +27,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -44,7 +43,6 @@ public IndexBootstrapOperator( IndexBootstrap bootstrap, SerializableFunction converter) { this.bootstrap = bootstrap; this.converter = converter; - setChainingStrategy(ChainingStrategy.ALWAYS); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java index 45090f7b68b4..b8b0d61e10a9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java @@ -26,8 +26,8 @@ import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.types.Either; @@ -58,10 +58,12 @@ public class AppendBypassCoordinateOperator private transient LinkedBlockingQueue compactTasks; public AppendBypassCoordinateOperator( - FileStoreTable table, ProcessingTimeService processingTimeService) { + StreamOperatorParameters> parameters, + FileStoreTable table, + ProcessingTimeService processingTimeService) { this.table = table; this.processingTimeService = processingTimeService; - this.chainingStrategy = ChainingStrategy.HEAD; + setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperatorFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperatorFactory.java index 7c53e01b47e6..a4c51e5b5a9b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperatorFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperatorFactory.java @@ -45,11 +45,7 @@ T createStreamOperator( StreamOperatorParameters> parameters) { AppendBypassCoordinateOperator operator = - new AppendBypassCoordinateOperator<>(table, processingTimeService); - operator.setup( - parameters.getContainingTask(), - parameters.getStreamConfig(), - parameters.getOutput()); + new AppendBypassCoordinateOperator<>(parameters, table, processingTimeService); return (T) operator; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java index 3b58c24d16b1..ee930a06fc3d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.SavepointType; import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.jupiter.api.Test; @@ -198,13 +198,15 @@ private void processCommittable( } @Override - protected OneInputStreamOperator createCommitterOperator( - FileStoreTable table, - String commitUser, - CommittableStateManager committableStateManager) { - return new AutoTagForSavepointCommitterOperator<>( - (CommitterOperator) - super.createCommitterOperator(table, commitUser, committableStateManager), + protected OneInputStreamOperatorFactory + createCommitterOperatorFactory( + FileStoreTable table, + String commitUser, + CommittableStateManager committableStateManager) { + return new AutoTagForSavepointCommitterOperatorFactory<>( + (CommitterOperatorFactory) + super.createCommitterOperatorFactory( + table, commitUser, committableStateManager), table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), @@ -213,14 +215,15 @@ protected OneInputStreamOperator createCommitterOperat } @Override - protected OneInputStreamOperator createCommitterOperator( - FileStoreTable table, - String commitUser, - CommittableStateManager committableStateManager, - ThrowingConsumer initializeFunction) { - return new AutoTagForSavepointCommitterOperator<>( - (CommitterOperator) - super.createCommitterOperator( + protected OneInputStreamOperatorFactory + createCommitterOperatorFactory( + FileStoreTable table, + String commitUser, + CommittableStateManager committableStateManager, + ThrowingConsumer initializeFunction) { + return new AutoTagForSavepointCommitterOperatorFactory<>( + (CommitterOperatorFactory) + super.createCommitterOperatorFactory( table, commitUser, committableStateManager, initializeFunction), table::snapshotManager, table::tagManager, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java index 147110637aef..68162832eac9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java @@ -27,13 +27,21 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; import org.junit.jupiter.api.Test; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.HashMap; import java.util.Objects; @@ -54,12 +62,23 @@ public void testBatchWriteGeneratorTag() throws Exception { StreamTableWrite write = table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite(); - OneInputStreamOperator committerOperator = - createCommitterOperator( + OneInputStreamOperatorFactory committerOperatorFactory = + createCommitterOperatorFactory( table, initialCommitUser, new RestoreAndFailCommittableStateManager<>( ManifestCommittableSerializer::new)); + + OneInputStreamOperator committerOperator = + committerOperatorFactory.createStreamOperator( + new StreamOperatorParameters<>( + new SourceOperatorStreamTask(new DummyEnvironment()), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>()), + null, + null, + null)); + committerOperator.open(); TableCommitImpl tableCommit = table.newCommit(initialCommitUser); @@ -106,13 +125,15 @@ public void testBatchWriteGeneratorTag() throws Exception { } @Override - protected OneInputStreamOperator createCommitterOperator( - FileStoreTable table, - String commitUser, - CommittableStateManager committableStateManager) { - return new BatchWriteGeneratorTagOperator<>( - (CommitterOperator) - super.createCommitterOperator(table, commitUser, committableStateManager), + protected OneInputStreamOperatorFactory + createCommitterOperatorFactory( + FileStoreTable table, + String commitUser, + CommittableStateManager committableStateManager) { + return new BatchWriteGeneratorTagOperatorFactory<>( + (CommitterOperatorFactory) + super.createCommitterOperatorFactory( + table, commitUser, committableStateManager), table); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java index 668d651236fd..28c93ca79be0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java @@ -51,10 +51,13 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.Preconditions; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -259,8 +262,8 @@ public void testRestoreCommitUser() throws Exception { // 3. Check whether success List actual = new ArrayList<>(); - OneInputStreamOperator operator = - createCommitterOperator( + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( table, initialCommitUser, new NoopCommittableStateManager(), @@ -274,7 +277,7 @@ public void testRestoreCommitUser() throws Exception { }); OneInputStreamOperatorTestHarness testHarness1 = - createTestHarness(operator); + createTestHarness(operatorFactory); testHarness1.initializeState(snapshot); testHarness1.close(); @@ -315,10 +318,11 @@ public void testRestoreEmptyMarkDoneState() throws Exception { public void testCommitInputEnd() throws Exception { FileStoreTable table = createFileStoreTable(); String commitUser = UUID.randomUUID().toString(); - OneInputStreamOperator operator = - createCommitterOperator(table, commitUser, new NoopCommittableStateManager()); + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( + table, commitUser, new NoopCommittableStateManager()); OneInputStreamOperatorTestHarness testHarness = - createTestHarness(operator); + createTestHarness(operatorFactory); testHarness.open(); Assertions.assertThatCode( () -> { @@ -378,10 +382,10 @@ public void testCommitInputEnd() throws Exception { }) .doesNotThrowAnyException(); - if (operator instanceof CommitterOperator) { + if (operatorFactory instanceof CommitterOperator) { Assertions.assertThat( ((ManifestCommittable) - ((CommitterOperator) operator) + ((CommitterOperator) operatorFactory) .committablesPerCheckpoint.get(Long.MAX_VALUE)) .fileCommittables() .size()) @@ -604,14 +608,14 @@ public void testCalcDataBytesSend() throws Exception { public void testCommitMetrics() throws Exception { FileStoreTable table = createFileStoreTable(); - OneInputStreamOperator operator = - createCommitterOperator( + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( table, null, new RestoreAndFailCommittableStateManager<>( ManifestCommittableSerializer::new)); OneInputStreamOperatorTestHarness testHarness = - createTestHarness(operator); + createTestHarness(operatorFactory); testHarness.open(); long timestamp = 0; StreamTableWrite write = @@ -627,7 +631,9 @@ public void testCommitMetrics() throws Exception { testHarness.notifyOfCompletedCheckpoint(cpId); MetricGroup commitMetricGroup = - operator.getMetricGroup() + testHarness + .getOneInputOperator() + .getMetricGroup() .addGroup("paimon") .addGroup("table", table.name()) .addGroup("commit"); @@ -685,10 +691,11 @@ public void testCommitMetrics() throws Exception { public void testParallelism() throws Exception { FileStoreTable table = createFileStoreTable(); String commitUser = UUID.randomUUID().toString(); - OneInputStreamOperator operator = - createCommitterOperator(table, commitUser, new NoopCommittableStateManager()); + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( + table, commitUser, new NoopCommittableStateManager()); try (OneInputStreamOperatorTestHarness testHarness = - createTestHarness(operator, 10, 10, 3)) { + createTestHarness(operatorFactory, 10, 10, 3)) { Assertions.assertThatCode(testHarness::open) .hasMessage("Committer Operator parallelism in paimon MUST be one."); } @@ -700,13 +707,13 @@ public void testParallelism() throws Exception { protected OneInputStreamOperatorTestHarness createRecoverableTestHarness(FileStoreTable table) throws Exception { - OneInputStreamOperator operator = - createCommitterOperator( + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( table, null, new RestoreAndFailCommittableStateManager<>( ManifestCommittableSerializer::new)); - return createTestHarness(operator); + return createTestHarness(operatorFactory); } private OneInputStreamOperatorTestHarness createLossyTestHarness( @@ -716,18 +723,20 @@ private OneInputStreamOperatorTestHarness createLossyT private OneInputStreamOperatorTestHarness createLossyTestHarness( FileStoreTable table, String commitUser) throws Exception { - OneInputStreamOperator operator = - createCommitterOperator(table, commitUser, new NoopCommittableStateManager()); - return createTestHarness(operator); + OneInputStreamOperatorFactory operatorFactory = + createCommitterOperatorFactory( + table, commitUser, new NoopCommittableStateManager()); + return createTestHarness(operatorFactory); } private OneInputStreamOperatorTestHarness createTestHarness( - OneInputStreamOperator operator) throws Exception { - return createTestHarness(operator, 1, 1, 0); + OneInputStreamOperatorFactory operatorFactory) + throws Exception { + return createTestHarness(operatorFactory, 1, 1, 0); } private OneInputStreamOperatorTestHarness createTestHarness( - OneInputStreamOperator operator, + OneInputStreamOperatorFactory operatorFactory, int maxParallelism, int parallelism, int subTaskIndex) @@ -736,22 +745,23 @@ private OneInputStreamOperatorTestHarness createTestHa new CommittableTypeInfo().createSerializer(new ExecutionConfig()); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( - operator, + operatorFactory, maxParallelism, parallelism, subTaskIndex, - serializer, new OperatorID()); + harness.getStreamConfig().setupNetworkInputs(Preconditions.checkNotNull(serializer)); + harness.getStreamConfig().serializeAllConfigs(); harness.setup(serializer); return harness; } - protected OneInputStreamOperator createCommitterOperator( - FileStoreTable table, - String commitUser, - CommittableStateManager committableStateManager) { - return new CommitterOperator<>( - true, + protected OneInputStreamOperatorFactory + createCommitterOperatorFactory( + FileStoreTable table, + String commitUser, + CommittableStateManager committableStateManager) { + return new CommitterOperatorFactory<>( true, true, commitUser == null ? initialCommitUser : commitUser, @@ -765,13 +775,13 @@ protected OneInputStreamOperator createCommitterOperat committableStateManager); } - protected OneInputStreamOperator createCommitterOperator( - FileStoreTable table, - String commitUser, - CommittableStateManager committableStateManager, - ThrowingConsumer initializeFunction) { - return new CommitterOperator( - true, + protected OneInputStreamOperatorFactory + createCommitterOperatorFactory( + FileStoreTable table, + String commitUser, + CommittableStateManager committableStateManager, + ThrowingConsumer initializeFunction) { + return new CommitterOperatorFactory( true, true, commitUser == null ? initialCommitUser : commitUser, @@ -784,8 +794,24 @@ protected OneInputStreamOperator createCommitterOperat context), committableStateManager) { @Override - public void initializeState(StateInitializationContext context) throws Exception { - initializeFunction.accept(context); + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) + new CommitterOperator( + parameters, + streamingCheckpointEnabled, + forceSingleParallelism, + initialCommitUser, + committerFactory, + committableStateManager, + endInputWatermark) { + @Override + public void initializeState(StateInitializationContext context) + throws Exception { + initializeFunction.accept(context); + } + }; } }; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java index 10e432f3c8c2..752679fb5903 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java @@ -645,11 +645,10 @@ public void testCommitMetrics() throws Exception { private OneInputStreamOperatorTestHarness createRecoverableTestHarness() throws Exception { - CommitterOperator operator = - new CommitterOperator<>( + CommitterOperatorFactory operator = + new CommitterOperatorFactory<>( true, false, - true, initialCommitUser, context -> new StoreMultiCommitter(catalogLoader, context), new RestoreAndFailCommittableStateManager<>( @@ -659,11 +658,10 @@ public void testCommitMetrics() throws Exception { private OneInputStreamOperatorTestHarness createLossyTestHarness() throws Exception { - CommitterOperator operator = - new CommitterOperator<>( + CommitterOperatorFactory operator = + new CommitterOperatorFactory<>( true, false, - true, initialCommitUser, context -> new StoreMultiCommitter(catalogLoader, context), new CommittableStateManager() { @@ -682,12 +680,13 @@ public void snapshotState( private OneInputStreamOperatorTestHarness createTestHarness( - CommitterOperator operator) + CommitterOperatorFactory + operatorFactory) throws Exception { TypeSerializer serializer = new MultiTableCommittableTypeInfo().createSerializer(new ExecutionConfig()); OneInputStreamOperatorTestHarness harness = - new OneInputStreamOperatorTestHarness<>(operator, serializer); + new OneInputStreamOperatorTestHarness<>(operatorFactory, serializer); harness.setup(serializer); return harness; }