Skip to content

Commit

Permalink
1.代码review后关于变量命令的一些修改
Browse files Browse the repository at this point in the history
2.添加maven-assembly的plugin
3.修改readme部分的参数说明
  • Loading branch information
lklhdu committed Nov 15, 2022
1 parent d63a04f commit dc63ff3
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 97 deletions.
94 changes: 52 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,60 +1,70 @@
# 总览
欢迎使用lakehouse-benchmark-ingestion。
lakehouse-benchmark-ingestion是网易开源的数据湖性能基准测试lakehouse-benchmark项目下的数据同步工具,该工具基于Flink-CDC实现,能够将数据库中的数据实时同步到数据湖。
lakehouse-benchmark-ingestion 是网易开源的数据湖性能基准测试 lakehouse-benchmark 项目下的数据同步工具,该工具基于 Flink-CDC 实现,能够将数据库中的数据实时同步到数据湖。

## 快速开始
1. 下载项目代码 `git clone xxx`
2. 修改resource/ingestion-conf.yaml,填写配置项信息
1. 下载项目代码 `git clone https://github.com/NetEase/lakehouse-benchmark-ingestion.git`
2. 修改 resource/ingestion-conf.yaml ,填写配置项信息
3. 通过命令`mvn clean install -DskipTests`编译项目
4. 进入target目录,通过`java -cp eduard-1.0-SNAPSHOT.jar com.netease.arctic.benchmark.ingestion.MainRunner -sinkType [arctic/iceberg/hudi] -sinkDatabase [dbName]`命令启动数据同步工具
5. 通过`localhost:8081`打开Flink Web UI,观察数据同步的情况
4. 进入 target 目录,通过`java -cp eduard-1.0-SNAPSHOT.jar com.netease.arctic.benchmark.ingestion.MainRunner -confDir [confDir] -sinkType [arctic/iceberg/hudi] -sinkDatabase [dbName]`命令启动数据同步工具
5. 通过`localhost:8081`打开 Flink Web UI ,观察数据同步的情况

## 支持的参数
以下参数均可以通过resource/ingestion-conf.yaml文件进行配置。

| 参数项 | 是否必须 | 默认值 | 描述 |
|--------------------------|------|---------|-----------------------------------------------------------|
| source.type || (none) | 源端数据库的类型,目前仅支持MySQL |
| source.username || (none) | 源端数据库用户名 |
| source.password || (none) | 源端数据库密码 |
| source.hostname || (none) | 源端数据库地址 |
| source.port || (none) | 源端数据库端口 |
| source.table.name || * | 指定需要同步的表名称,支持指定多张表,默认情况下同步整个数据库 |
| source.scan.startup.mode || initial | MySQL CDC connector消费binlog时的启动模式,支持initial/latest-offset |
| source.parallelism || 4 | 读取源端数据时的任务并行度 | | | |

### Arctic相关

| 参数项 | 是否必须 | 默认值 | 描述 |
|----------------------------|------|--------|------------------------|
| arctic.metastore.url || (none) | Arctic metastore的URL地址 |
| arctic.optimize.group.name || (none) | Arctic Optimizer资源组 |
### 命令行参数

| 参数项 | 是否必须 | 默认值 | 描述 |
|--------------|------|--------|------------------------------------------|
| confDir || (none) | 配置文件 ingestion-conf.yaml 所在目录的绝对路径 |
| sinkType || (none) | 目标端数据湖 format 的类型,支持 Arctic/Iceberg/Hudi |
| sinkDatabase || (none) | 目标端数据库的名称 |
| restPort || 8081 | Flink Web UI的端口 |

### 配置文件参数
以下参数均可以通过 resource/ingestion-conf.yaml 文件进行配置。

| 参数项 | 是否必须 | 默认值 | 描述 |
|--------------------------|------|---------|---------------------------------------------------------------|
| source.type || (none) | 源端数据库的类型,目前仅支持 MySQL |
| source.username || (none) | 源端数据库用户名 |
| source.password || (none) | 源端数据库密码 |
| source.hostname || (none) | 源端数据库地址 |
| source.port || (none) | 源端数据库端口 |
| source.table.name || * | 指定需要同步的表名称,支持指定多张表,默认情况下同步整个数据库 |
| source.scan.startup.mode || initial | MySQL CDC connector 消费 binlog 时的启动模式,支持 initial/latest-offset |
| source.parallelism || 4 | 读取源端数据时的任务并行度 | | | |

**Arctic相关**

| 参数项 | 是否必须 | 默认值 | 描述 |
|----------------------------|------|--------|---------------------------|
| arctic.metastore.url || (none) | Arctic metastore 的 URL 地址 |
| arctic.optimize.group.name || (none) | Arctic Optimizer 资源组 |

### Iceberg相关
**Iceberg相关**

| 参数项 | 是否必须 | 默认值 | 描述 |
|----------------------|------|--------|----------------------------------|
| iceberg.uri || (none) | Hive metastore的thrift URI |
| iceberg.warehouse || (none) | Hive warehouse的地址 |
| iceberg.catalog-type || hive | Iceberg catalog的类型,支持hive/hadoop |
| 参数项 | 是否必须 | 默认值 | 描述 |
|----------------------|------|--------|------------------------------------|
| iceberg.uri || (none) | Hive metastore 的thrift URI |
| iceberg.warehouse || (none) | Hive warehouse 的地址 |
| iceberg.catalog-type || hive | Iceberg catalog 的类型,支持 hive/hadoop |

### Hudi相关
**Hudi相关**

| 参数项 | 是否必须 | 默认值 | 描述 |
|---------------------------------------|------|---------------|----------------------------------------|
| hudi.catalog.path || (none) | Hudi Catalog的地址 |
| hudi.hive_sync.enable || true | 是否开启hive同步功能 |
| hudi.hive_sync.metastore.uris || (none) | Hive Metastore URL,当开启hive同步功能时需要填写该参数 |
| hudi.table.type || MERGE_ON_READ | 表操作的类型,支持MERGE_ON_READ/COPY_ON_WRITE |
| hudi.read.tasks || 4 | 读算子的并行度 |
| hudi.compaction.tasks || 4 | 在线 compaction 的并行度 |
| hudi.compaction.trigger.strategy || num_or_time | 压缩策略 |
| 参数项 | 是否必须 | 默认值 | 描述 |
|---------------------------------------|------|---------------|------------------------------------------|
| hudi.catalog.path || (none) | Hudi Catalog 的地址 |
| hudi.hive_sync.enable || true | 是否开启 hive 同步功能 |
| hudi.hive_sync.metastore.uris || (none) | Hive Metastore URL,当开启 hive 同步功能时需要填写该参数 |
| hudi.table.type || MERGE_ON_READ | 表操作的类型,支持 MERGE_ON_READ/COPY_ON_WRITE |
| hudi.read.tasks || 4 | 读算子的并行度 |
| hudi.compaction.tasks || 4 | 在线 compaction 的并行度 |
| hudi.compaction.trigger.strategy || num_or_time | 压缩策略 |


## 已支持的数据库与数据湖format
## 已支持的数据库与数据湖Format
### 源端数据库
1. MySQL
### 目标端数据湖format
### 目标端数据湖Format
1. Arctic
2. Iceberg
3. Hudi
27 changes: 27 additions & 0 deletions assembly.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<assembly>
<id>${project.artifactId}</id>
<formats>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>${project.build.directory}</directory>
<includes>
<include>eduard-1.0-SNAPSHOT.jar</include>
<!--
<include>metadata.properties</include>
-->
</includes>
<outputDirectory>/</outputDirectory>
</fileSet>
</fileSets>
<files>
<file>
<source>${project.build.outputDirectory}/ingestion-conf.yaml</source>
<outputDirectory>/conf</outputDirectory>
</file>
</files>

</assembly>

33 changes: 33 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
<maven.compiler.target>8</maven.compiler.target>
<maven-checkstyle-plugin.version>3.1.2</maven-checkstyle-plugin.version>
<maven-shade-plugin.version>3.2.1</maven-shade-plugin.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<package.final.name>arctic_benchmark_ingestion</package.final.name>

<flink.version>1.14.5</flink.version>
<scala.binary.version>2.12</scala.binary.version>
Expand Down Expand Up @@ -83,6 +85,11 @@
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down Expand Up @@ -385,6 +392,8 @@
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>ingestion-conf.yaml</exclude>
<exclude>ingestion-conf.yaml.template</exclude>
</excludes>
</filter>
</filters>
Expand All @@ -394,10 +403,34 @@
<shadedPattern>com.netease.arctic.shaded.org.apache.avro.</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<finalName>${package.final.name}</finalName>
<descriptors>
<descriptor>assembly.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.netease.arctic.benchmark.ingestion.params.CallContext;
import com.netease.arctic.benchmark.ingestion.params.catalog.CatalogParams;
import com.netease.arctic.benchmark.ingestion.params.database.BaseParameters;
import com.netease.arctic.benchmark.ingestion.source.MysqlCdcCatalog;
import com.netease.arctic.benchmark.ingestion.source.MysqlCDCCatalog;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -44,8 +44,8 @@
import java.util.function.Consumer;

/**
* Basic class for data ingestion, includes getting the table schema,
* monitoring the database data via flink cdc and inserting the database data into the data lake.
* Basic class for data ingestion, includes getting the table schema, monitoring the database data
* via flink cdc and inserting the database data into the data lake.
*/
public abstract class BaseCatalogSync implements Consumer<CallContext> {

Expand All @@ -66,20 +66,20 @@ public void accept(final CallContext context) {
CatalogParams sourceCatalogParams = getSourceCatalogParam(configuration);
CatalogParams destCatalogParams = getDestCatalogParam(configuration);

final MysqlCdcCatalog mysqlCdcCatalog =
(MysqlCdcCatalog) getCatalog(tableEnv, sourceCatalogParams);
final MysqlCDCCatalog mysqlCdcCatalog =
(MysqlCDCCatalog) getCatalog(tableEnv, sourceCatalogParams);
final Catalog destCatalog = getCatalog(tableEnv, destCatalogParams);
String sourceDatabaseName = sourceCatalogParams.getDataBaseName();
String sourceDatabaseName = sourceCatalogParams.getDatabaseName();
List<String> syncTableList =
getSyncTableList(mysqlCdcCatalog, sourceDatabaseName, baseParameters);
final List<Tuple2<ObjectPath, ResolvedCatalogTable>> pathAndTable;
final MySqlSource<RowData> source;

try {
pathAndTable = SyncDbFunction.getPathAndTable(mysqlCdcCatalog,
sourceCatalogParams.getDataBaseName(), syncTableList);
sourceCatalogParams.getDatabaseName(), syncTableList);
if (baseParameters.getSourceScanStartupMode().equals("initial")) {
createTable(destCatalog, destCatalogParams.getDataBaseName(), pathAndTable);
createTable(destCatalog, destCatalogParams.getDatabaseName(), pathAndTable);
}
source = SyncDbFunction.getMySqlSource(mysqlCdcCatalog, sourceDatabaseName, syncTableList,
SyncDbFunction.getDebeziumDeserializeSchemas(pathAndTable),
Expand All @@ -102,13 +102,13 @@ public Catalog getCatalog(StreamTableEnvironment tableEnv, CatalogParams catalog
private CatalogParams getSourceCatalogParam(Configuration configuration) {
String catalogName = baseParameters.getSourceType().toLowerCase() + "_catalog";
String databaseName = baseParameters.getSourceDatabaseName();
return CatalogParams.builder().catalogName(catalogName).dataBaseName(databaseName).build();
return CatalogParams.builder().catalogName(catalogName).databaseName(databaseName).build();
}

private CatalogParams getDestCatalogParam(Configuration configuration) {
String catalogName = baseParameters.getSinkType().toLowerCase() + "_catalog_ignore";
String databaseName = baseParameters.getSinkDatabase();
return CatalogParams.builder().catalogName(catalogName).dataBaseName(databaseName).build();
return CatalogParams.builder().catalogName(catalogName).databaseName(databaseName).build();
}

private SingleOutputStreamOperator<Void> sideOutputHandler(StreamExecutionEnvironment env,
Expand All @@ -119,7 +119,7 @@ private SingleOutputStreamOperator<Void> sideOutputHandler(StreamExecutionEnviro
.uid("split stream").name("split stream").setParallelism(4);
}

private List<String> getSyncTableList(MysqlCdcCatalog mysqlCdcCatalog, String sourceDatabaseName,
private List<String> getSyncTableList(MysqlCDCCatalog mysqlCdcCatalog, String sourceDatabaseName,
BaseParameters baseParameters) {
String tableListParam = baseParameters.getSourceTableName();
List<String> tableList = new ArrayList<>();
Expand All @@ -140,10 +140,10 @@ public void insertData(StreamTableEnvironment tableEnv, SingleOutputStreamOperat
CatalogParams sourceCatalogParams, CatalogParams destCatalogParams,
List<Tuple2<ObjectPath, ResolvedCatalogTable>> s) {
final StatementSet set = tableEnv.createStatementSet();
SyncDbFunction.getParamsList(sourceCatalogParams.getDataBaseName(), s).forEach(p -> {
SyncDbFunction.getParamsList(sourceCatalogParams.getDatabaseName(), s).forEach(p -> {
tableEnv.createTemporaryView(p.getTable(), process.getSideOutput(p.getTag()), p.getSchema());
String sql = String.format("INSERT INTO %s.%s.%s SELECT f0.* FROM %s",
destCatalogParams.getCatalogName(), destCatalogParams.getDataBaseName(),
destCatalogParams.getCatalogName(), destCatalogParams.getDatabaseName(),
p.getPath().getObjectName(), p.getTable());
set.addInsertSql(sql);
});
Expand Down
Loading

0 comments on commit dc63ff3

Please sign in to comment.