Skip to content

Commit

Permalink
[flink] Add tests for chaining and fix writer chaining behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Nov 30, 2024
1 parent 0333afe commit dda6bbf
Show file tree
Hide file tree
Showing 41 changed files with 936 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.paimon.table.sink.KeyAndBucketExtractor;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

/** {@link CdcDynamicBucketSinkBase} for {@link CdcRecord}. */
public class CdcDynamicBucketSink extends CdcDynamicBucketSinkBase<CdcRecord> {
Expand All @@ -42,8 +42,8 @@ protected KeyAndBucketExtractor<CdcRecord> createExtractor(TableSchema schema) {
}

@Override
protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcDynamicBucketWriteOperator(table, writeProvider, commitUser);
protected OneInputStreamOperatorFactory<Tuple2<CdcRecord, Integer>, Committable>
createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcDynamicBucketWriteOperator.Factory(table, writeProvider, commitUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand All @@ -43,11 +47,12 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator<Tuple2<Cdc

private final long retrySleepMillis;

public CdcDynamicBucketWriteOperator(
private CdcDynamicBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}
Expand Down Expand Up @@ -85,4 +90,30 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr
throw new IOException(e);
}
}

/** {@link StreamOperatorFactory} of {@link CdcDynamicBucketWriteOperator}. */
public static class Factory extends TableWriteOperator.Factory<Tuple2<CdcRecord, Integer>> {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcDynamicBucketWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcDynamicBucketWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

/**
* A {@link FlinkSink} for fixed-bucket table which accepts {@link CdcRecord} and waits for a schema
Expand All @@ -39,8 +39,8 @@ public CdcFixedBucketSink(FileStoreTable table) {
}

@Override
protected OneInputStreamOperator<CdcRecord, Committable> createWriteOperator(
protected OneInputStreamOperatorFactory<CdcRecord, Committable> createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcRecordStoreWriteOperator(table, writeProvider, commitUser);
return new CdcRecordStoreWriteOperator.Factory(table, writeProvider, commitUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand Down Expand Up @@ -74,12 +77,13 @@ public class CdcRecordStoreMultiWriteOperator
private String commitUser;
private ExecutorService compactExecutor;

public CdcRecordStoreMultiWriteOperator(
private CdcRecordStoreMultiWriteOperator(
StreamOperatorParameters<MultiTableCommittable> parameters,
Catalog.Loader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
super(options);
super(parameters, options);
this.catalogLoader = catalogLoader;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
Expand Down Expand Up @@ -254,4 +258,42 @@ public Map<Identifier, StoreSinkWrite> writes() {
public String commitUser() {
return commitUser;
}

/** {@link StreamOperatorFactory} of {@link CdcRecordStoreMultiWriteOperator}. */
public static class Factory
extends PrepareCommitOperator.Factory<CdcMultiplexRecord, MultiTableCommittable> {
private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider;
private final String initialCommitUser;
private final Catalog.Loader catalogLoader;

public Factory(
Catalog.Loader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
super(options);
this.catalogLoader = catalogLoader;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<MultiTableCommittable>> T createStreamOperator(
StreamOperatorParameters<MultiTableCommittable> parameters) {
return (T)
new CdcRecordStoreMultiWriteOperator(
parameters,
catalogLoader,
storeSinkWriteProvider,
initialCommitUser,
options);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcRecordStoreMultiWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
Expand All @@ -27,6 +28,9 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand All @@ -50,11 +54,12 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> {

private final long retrySleepMillis;

public CdcRecordStoreWriteOperator(
protected CdcRecordStoreWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}
Expand Down Expand Up @@ -92,4 +97,30 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
throw new IOException(e);
}
}

/** {@link StreamOperatorFactory} of {@link CdcRecordStoreWriteOperator}. */
public static class Factory extends TableWriteOperator.Factory<CdcRecord> {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcRecordStoreWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcRecordStoreWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

import javax.annotation.Nullable;

Expand All @@ -42,9 +42,9 @@ public CdcUnawareBucketSink(FileStoreTable table, Integer parallelism) {
}

@Override
protected OneInputStreamOperator<CdcRecord, Committable> createWriteOperator(
protected OneInputStreamOperatorFactory<CdcRecord, Committable> createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcUnawareBucketWriteOperator(table, writeProvider, commitUser);
return new CdcUnawareBucketWriteOperator.Factory(table, writeProvider, commitUser);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowKind;

import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/** A {@link PrepareCommitOperator} to write {@link CdcRecord} to unaware-bucket mode table. */
public class CdcUnawareBucketWriteOperator extends CdcRecordStoreWriteOperator {

public CdcUnawareBucketWriteOperator(
private CdcUnawareBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
Expand All @@ -42,4 +47,30 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
super.processElement(element);
}
}

/** {@link StreamOperatorFactory} of {@link CdcUnawareBucketWriteOperator}. */
public static class Factory extends CdcRecordStoreWriteOperator.Factory {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcUnawareBucketWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcUnawareBucketWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -137,9 +137,10 @@ public DataStreamSink<?> sinkFrom(
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

protected OneInputStreamOperator<CdcMultiplexRecord, MultiTableCommittable> createWriteOperator(
StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) {
return new CdcRecordStoreMultiWriteOperator(
protected OneInputStreamOperatorFactory<CdcMultiplexRecord, MultiTableCommittable>
createWriteOperator(
StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) {
return new CdcRecordStoreMultiWriteOperator.Factory(
catalogLoader, writeProvider, commitUser, new Options());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,8 +689,8 @@ public void testUsingTheSameCompactExecutor() throws Exception {

private OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable>
createTestHarness(Catalog.Loader catalogLoader) throws Exception {
CdcRecordStoreMultiWriteOperator operator =
new CdcRecordStoreMultiWriteOperator(
CdcRecordStoreMultiWriteOperator.Factory operatorFactory =
new CdcRecordStoreMultiWriteOperator.Factory(
catalogLoader,
(t, commitUser, state, ioManager, memoryPoolFactory, metricGroup) ->
new StoreSinkWriteImpl(
Expand All @@ -709,7 +709,7 @@ public void testUsingTheSameCompactExecutor() throws Exception {
TypeSerializer<MultiTableCommittable> outputSerializer =
new MultiTableCommittableTypeInfo().createSerializer(new ExecutionConfig());
OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> harness =
new OneInputStreamOperatorTestHarness<>(operator, inputSerializer);
new OneInputStreamOperatorTestHarness<>(operatorFactory, inputSerializer);
harness.setup(outputSerializer);
return harness;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ public void testUpdateColumnType() throws Exception {

private OneInputStreamOperatorTestHarness<CdcRecord, Committable> createTestHarness(
FileStoreTable table) throws Exception {
CdcRecordStoreWriteOperator operator =
new CdcRecordStoreWriteOperator(
CdcRecordStoreWriteOperator.Factory operatorFactory =
new CdcRecordStoreWriteOperator.Factory(
table,
(t, commitUser, state, ioManager, memoryPool, metricGroup) ->
new StoreSinkWriteImpl(
Expand All @@ -272,7 +272,7 @@ private OneInputStreamOperatorTestHarness<CdcRecord, Committable> createTestHarn
TypeSerializer<Committable> outputSerializer =
new CommittableTypeInfo().createSerializer(new ExecutionConfig());
OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
new OneInputStreamOperatorTestHarness<>(operator, inputSerializer);
new OneInputStreamOperatorTestHarness<>(operatorFactory, inputSerializer);
harness.setup(outputSerializer);
return harness;
}
Expand Down
Loading

0 comments on commit dda6bbf

Please sign in to comment.