Skip to content

Commit

Permalink
[bug fix]
Browse files Browse the repository at this point in the history
1. 将flink版本升级到1.14.6
2. 通过datastream api来执行insert data
  • Loading branch information
lklhdu committed Nov 22, 2022
1 parent a12e9c0 commit 0688ccb
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<package.final.name>lakehouse_benchmark_ingestion</package.final.name>

<flink.version>1.14.5</flink.version>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<iceberg.version>0.14.0</iceberg.version>
<arctic.version>0.3.2</arctic.version>
<arctic.version>0.4.0-SNAPSHOT</arctic.version>
<hudi.version>0.11.1</hudi.version>
<flink-connector-mysql-cdc.version>2.3.0</flink-connector-mysql-cdc.version>
<lombok.version>1.18.20</lombok.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package com.netease.arctic.benchmark.ingestion.sink;

import com.netease.arctic.benchmark.ingestion.BaseCatalogSync;
import com.netease.arctic.benchmark.ingestion.SyncDbFunction;
import com.netease.arctic.benchmark.ingestion.params.catalog.CatalogParams;
import com.netease.arctic.benchmark.ingestion.params.database.BaseParameters;
import com.netease.arctic.benchmark.ingestion.params.table.HudiParameters;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
Expand All @@ -32,6 +35,13 @@
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -118,5 +128,26 @@ public void insertData(StreamTableEnvironment tableEnv, SingleOutputStreamOperat
CatalogParams sourceCatalogParams, CatalogParams destCatalogParams,
List<Tuple2<ObjectPath, ResolvedCatalogTable>> s) {

SyncDbFunction.getParamsList(sourceCatalogParams.getDatabaseName(), s).forEach(p -> {
DataStream<RowData> dataStream = process.getSideOutput(p.getTag());
final FlinkStreamerConfig cfg = new FlinkStreamerConfig();

// Read from kafka source
// todo
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
.getLogicalType();

Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
int parallelism = 4;

DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
if (StreamerUtil.needsAsyncCompaction(conf)) {
Pipelines.compact(conf, pipeline);
} else {
Pipelines.clean(conf, pipeline);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.netease.arctic.benchmark.ingestion.sink;

import com.netease.arctic.benchmark.ingestion.BaseCatalogSync;
import com.netease.arctic.benchmark.ingestion.SyncDbFunction;
import com.netease.arctic.benchmark.ingestion.params.catalog.CatalogParams;
import com.netease.arctic.benchmark.ingestion.params.database.BaseParameters;
import com.netease.arctic.benchmark.ingestion.params.table.IcebergParameters;
Expand All @@ -32,6 +33,13 @@
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkCatalog;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -53,7 +61,19 @@ public IcebergCatalogSync(BaseParameters baseParameters, IcebergParameters icebe
public void insertData(StreamTableEnvironment tableEnv, SingleOutputStreamOperator<Void> process,
CatalogParams sourceCatalogParams, CatalogParams destCatalogParams,
List<Tuple2<ObjectPath, ResolvedCatalogTable>> s) {
FlinkCatalog catalog = (FlinkCatalog) destCatalogParams.getCatalog();
CatalogLoader catalogLoader = getField(FlinkCatalog.class, catalog, "catalogLoader");
org.apache.iceberg.catalog.Catalog loadCatalog = catalogLoader.loadCatalog();
SyncDbFunction.getParamsList(sourceCatalogParams.getDatabaseName(), s).forEach(p -> {
TableIdentifier identifier =
TableIdentifier.of(Namespace.of(destCatalogParams.getDatabaseName()), p.getTable());
Table table = loadCatalog.loadTable(identifier);
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, identifier);

FlinkSink.forRowData(process.getSideOutput(p.getTag())).table(table).tableLoader(tableLoader)
.writeParallelism(4).append();

});
}

@Override
Expand Down Expand Up @@ -94,4 +114,15 @@ public void createTable(Catalog catalog, String dbName,
private void fillIcebergTableOptions(Map<String, String> options) {
options.put("format-version", "2");
}

private static <O, V> V getField(Class<O> clazz, O obj, String fieldName) {
try {
java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
Object v = field.get(obj);
return v == null ? null : (V) v;
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit 0688ccb

Please sign in to comment.