diff --git a/README.md b/README.md index d475493..476e0c7 100644 --- a/README.md +++ b/README.md @@ -1,60 +1,70 @@ # 总览 欢迎使用lakehouse-benchmark-ingestion。 -lakehouse-benchmark-ingestion是网易开源的数据湖性能基准测试lakehouse-benchmark项目下的数据同步工具,该工具基于Flink-CDC实现,能够将数据库中的数据实时同步到数据湖。 +lakehouse-benchmark-ingestion 是网易开源的数据湖性能基准测试 lakehouse-benchmark 项目下的数据同步工具,该工具基于 Flink-CDC 实现,能够将数据库中的数据实时同步到数据湖。 ## 快速开始 -1. 下载项目代码 `git clone xxx` -2. 修改resource/ingestion-conf.yaml,填写配置项信息 +1. 下载项目代码 `git clone https://github.com/NetEase/lakehouse-benchmark-ingestion.git` +2. 修改 resource/ingestion-conf.yaml ,填写配置项信息 3. 通过命令`mvn clean install -DskipTests`编译项目 -4. 进入target目录,通过`java -cp eduard-1.0-SNAPSHOT.jar com.netease.arctic.benchmark.ingestion.MainRunner -sinkType [arctic/iceberg/hudi] -sinkDatabase [dbName]`命令启动数据同步工具 -5. 通过`localhost:8081`打开Flink Web UI,观察数据同步的情况 +4. 进入 target 目录,通过`java -cp eduard-1.0-SNAPSHOT.jar com.netease.arctic.benchmark.ingestion.MainRunner -confDir [confDir] -sinkType [arctic/iceberg/hudi] -sinkDatabase [dbName]`命令启动数据同步工具 +5. 通过`localhost:8081`打开 Flink Web UI ,观察数据同步的情况 ## 支持的参数 -以下参数均可以通过resource/ingestion-conf.yaml文件进行配置。 - -| 参数项 | 是否必须 | 默认值 | 描述 | -|--------------------------|------|---------|-----------------------------------------------------------| -| source.type | 是 | (none) | 源端数据库的类型,目前仅支持MySQL | -| source.username | 是 | (none) | 源端数据库用户名 | -| source.password | 是 | (none) | 源端数据库密码 | -| source.hostname | 是 | (none) | 源端数据库地址 | -| source.port | 是 | (none) | 源端数据库端口 | -| source.table.name | 否 | * | 指定需要同步的表名称,支持指定多张表,默认情况下同步整个数据库 | -| source.scan.startup.mode | 否 | initial | MySQL CDC connector消费binlog时的启动模式,支持initial/latest-offset | -| source.parallelism | 否 | 4 | 读取源端数据时的任务并行度 | | | | - -### Arctic相关 - -| 参数项 | 是否必须 | 默认值 | 描述 | -|----------------------------|------|--------|------------------------| -| arctic.metastore.url | 是 | (none) | Arctic metastore的URL地址 | -| arctic.optimize.group.name | 否 | (none) | Arctic Optimizer资源组 | +### 命令行参数 + +| 参数项 | 是否必须 | 默认值 | 描述 | +|--------------|------|--------|------------------------------------------| +| confDir | 是 | (none) | 配置文件 ingestion-conf.yaml 所在目录的绝对路径 | +| sinkType | 是 | (none) | 目标端数据湖 format 的类型,支持 Arctic/Iceberg/Hudi | +| sinkDatabase | 是 | (none) | 目标端数据库的名称 | +| restPort | 否 | 8081 | Flink Web UI的端口 | + +### 配置文件参数 +以下参数均可以通过 resource/ingestion-conf.yaml 文件进行配置。 + +| 参数项 | 是否必须 | 默认值 | 描述 | +|--------------------------|------|---------|---------------------------------------------------------------| +| source.type | 是 | (none) | 源端数据库的类型,目前仅支持 MySQL | +| source.username | 是 | (none) | 源端数据库用户名 | +| source.password | 是 | (none) | 源端数据库密码 | +| source.hostname | 是 | (none) | 源端数据库地址 | +| source.port | 是 | (none) | 源端数据库端口 | +| source.table.name | 否 | * | 指定需要同步的表名称,支持指定多张表,默认情况下同步整个数据库 | +| source.scan.startup.mode | 否 | initial | MySQL CDC connector 消费 binlog 时的启动模式,支持 initial/latest-offset | +| source.parallelism | 否 | 4 | 读取源端数据时的任务并行度 | | | | + +**Arctic相关** + +| 参数项 | 是否必须 | 默认值 | 描述 | +|----------------------------|------|--------|---------------------------| +| arctic.metastore.url | 是 | (none) | Arctic metastore 的 URL 地址 | +| arctic.optimize.group.name | 否 | (none) | Arctic Optimizer 资源组 | -### Iceberg相关 +**Iceberg相关** -| 参数项 | 是否必须 | 默认值 | 描述 | -|----------------------|------|--------|----------------------------------| -| iceberg.uri | 是 | (none) | Hive metastore的thrift URI | -| iceberg.warehouse | 是 | (none) | Hive warehouse的地址 | -| iceberg.catalog-type | 否 | hive | Iceberg catalog的类型,支持hive/hadoop | +| 参数项 | 是否必须 | 默认值 | 描述 | +|----------------------|------|--------|------------------------------------| +| iceberg.uri | 是 | (none) | Hive metastore 的thrift URI | +| iceberg.warehouse | 是 | (none) | Hive warehouse 的地址 | +| iceberg.catalog-type | 否 | hive | Iceberg catalog 的类型,支持 hive/hadoop | -### Hudi相关 +**Hudi相关** -| 参数项 | 是否必须 | 默认值 | 描述 | -|---------------------------------------|------|---------------|----------------------------------------| -| hudi.catalog.path | 是 | (none) | Hudi Catalog的地址 | -| hudi.hive_sync.enable | 否 | true | 是否开启hive同步功能 | -| hudi.hive_sync.metastore.uris | 否 | (none) | Hive Metastore URL,当开启hive同步功能时需要填写该参数 | -| hudi.table.type | 否 | MERGE_ON_READ | 表操作的类型,支持MERGE_ON_READ/COPY_ON_WRITE | -| hudi.read.tasks | 否 | 4 | 读算子的并行度 | -| hudi.compaction.tasks | 否 | 4 | 在线 compaction 的并行度 | -| hudi.compaction.trigger.strategy | 否 | num_or_time | 压缩策略 | +| 参数项 | 是否必须 | 默认值 | 描述 | +|---------------------------------------|------|---------------|------------------------------------------| +| hudi.catalog.path | 是 | (none) | Hudi Catalog 的地址 | +| hudi.hive_sync.enable | 否 | true | 是否开启 hive 同步功能 | +| hudi.hive_sync.metastore.uris | 否 | (none) | Hive Metastore URL,当开启 hive 同步功能时需要填写该参数 | +| hudi.table.type | 否 | MERGE_ON_READ | 表操作的类型,支持 MERGE_ON_READ/COPY_ON_WRITE | +| hudi.read.tasks | 否 | 4 | 读算子的并行度 | +| hudi.compaction.tasks | 否 | 4 | 在线 compaction 的并行度 | +| hudi.compaction.trigger.strategy | 否 | num_or_time | 压缩策略 | -## 已支持的数据库与数据湖format +## 已支持的数据库与数据湖Format ### 源端数据库 1. MySQL -### 目标端数据湖format +### 目标端数据湖Format 1. Arctic 2. Iceberg 3. Hudi diff --git a/assembly.xml b/assembly.xml new file mode 100644 index 0000000..17b79de --- /dev/null +++ b/assembly.xml @@ -0,0 +1,27 @@ + + ${project.artifactId} + + tar.gz + + false + + + ${project.build.directory} + + eduard-1.0-SNAPSHOT.jar + + + / + + + + + ${project.build.outputDirectory}/ingestion-conf.yaml + /conf + + + + + diff --git a/pom.xml b/pom.xml index e3f0d62..8057ef4 100644 --- a/pom.xml +++ b/pom.xml @@ -15,6 +15,8 @@ 8 3.1.2 3.2.1 + 3.3.0 + arctic_benchmark_ingestion 1.14.5 2.12 @@ -83,6 +85,11 @@ hadoop-hdfs ${hadoop.version} + + org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + org.apache.hadoop hadoop-common @@ -385,6 +392,8 @@ META-INF/*.SF META-INF/*.DSA META-INF/*.RSA + ingestion-conf.yaml + ingestion-conf.yaml.template @@ -394,10 +403,34 @@ com.netease.arctic.shaded.org.apache.avro. + + + + + org.apache.maven.plugins + maven-assembly-plugin + ${maven-assembly-plugin.version} + + false + ${package.final.name} + + assembly.xml + + + + + make-assembly + package + + single + + + + \ No newline at end of file diff --git a/src/main/java/com/netease/arctic/benchmark/ingestion/BaseCatalogSync.java b/src/main/java/com/netease/arctic/benchmark/ingestion/BaseCatalogSync.java index c019da3..325d86a 100644 --- a/src/main/java/com/netease/arctic/benchmark/ingestion/BaseCatalogSync.java +++ b/src/main/java/com/netease/arctic/benchmark/ingestion/BaseCatalogSync.java @@ -21,7 +21,7 @@ import com.netease.arctic.benchmark.ingestion.params.CallContext; import com.netease.arctic.benchmark.ingestion.params.catalog.CatalogParams; import com.netease.arctic.benchmark.ingestion.params.database.BaseParameters; -import com.netease.arctic.benchmark.ingestion.source.MysqlCdcCatalog; +import com.netease.arctic.benchmark.ingestion.source.MysqlCDCCatalog; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple2; @@ -44,8 +44,8 @@ import java.util.function.Consumer; /** - * Basic class for data ingestion, includes getting the table schema, - * monitoring the database data via flink cdc and inserting the database data into the data lake. + * Basic class for data ingestion, includes getting the table schema, monitoring the database data + * via flink cdc and inserting the database data into the data lake. */ public abstract class BaseCatalogSync implements Consumer { @@ -66,10 +66,10 @@ public void accept(final CallContext context) { CatalogParams sourceCatalogParams = getSourceCatalogParam(configuration); CatalogParams destCatalogParams = getDestCatalogParam(configuration); - final MysqlCdcCatalog mysqlCdcCatalog = - (MysqlCdcCatalog) getCatalog(tableEnv, sourceCatalogParams); + final MysqlCDCCatalog mysqlCdcCatalog = + (MysqlCDCCatalog) getCatalog(tableEnv, sourceCatalogParams); final Catalog destCatalog = getCatalog(tableEnv, destCatalogParams); - String sourceDatabaseName = sourceCatalogParams.getDataBaseName(); + String sourceDatabaseName = sourceCatalogParams.getDatabaseName(); List syncTableList = getSyncTableList(mysqlCdcCatalog, sourceDatabaseName, baseParameters); final List> pathAndTable; @@ -77,9 +77,9 @@ public void accept(final CallContext context) { try { pathAndTable = SyncDbFunction.getPathAndTable(mysqlCdcCatalog, - sourceCatalogParams.getDataBaseName(), syncTableList); + sourceCatalogParams.getDatabaseName(), syncTableList); if (baseParameters.getSourceScanStartupMode().equals("initial")) { - createTable(destCatalog, destCatalogParams.getDataBaseName(), pathAndTable); + createTable(destCatalog, destCatalogParams.getDatabaseName(), pathAndTable); } source = SyncDbFunction.getMySqlSource(mysqlCdcCatalog, sourceDatabaseName, syncTableList, SyncDbFunction.getDebeziumDeserializeSchemas(pathAndTable), @@ -102,13 +102,13 @@ public Catalog getCatalog(StreamTableEnvironment tableEnv, CatalogParams catalog private CatalogParams getSourceCatalogParam(Configuration configuration) { String catalogName = baseParameters.getSourceType().toLowerCase() + "_catalog"; String databaseName = baseParameters.getSourceDatabaseName(); - return CatalogParams.builder().catalogName(catalogName).dataBaseName(databaseName).build(); + return CatalogParams.builder().catalogName(catalogName).databaseName(databaseName).build(); } private CatalogParams getDestCatalogParam(Configuration configuration) { String catalogName = baseParameters.getSinkType().toLowerCase() + "_catalog_ignore"; String databaseName = baseParameters.getSinkDatabase(); - return CatalogParams.builder().catalogName(catalogName).dataBaseName(databaseName).build(); + return CatalogParams.builder().catalogName(catalogName).databaseName(databaseName).build(); } private SingleOutputStreamOperator sideOutputHandler(StreamExecutionEnvironment env, @@ -119,7 +119,7 @@ private SingleOutputStreamOperator sideOutputHandler(StreamExecutionEnviro .uid("split stream").name("split stream").setParallelism(4); } - private List getSyncTableList(MysqlCdcCatalog mysqlCdcCatalog, String sourceDatabaseName, + private List getSyncTableList(MysqlCDCCatalog mysqlCdcCatalog, String sourceDatabaseName, BaseParameters baseParameters) { String tableListParam = baseParameters.getSourceTableName(); List tableList = new ArrayList<>(); @@ -140,10 +140,10 @@ public void insertData(StreamTableEnvironment tableEnv, SingleOutputStreamOperat CatalogParams sourceCatalogParams, CatalogParams destCatalogParams, List> s) { final StatementSet set = tableEnv.createStatementSet(); - SyncDbFunction.getParamsList(sourceCatalogParams.getDataBaseName(), s).forEach(p -> { + SyncDbFunction.getParamsList(sourceCatalogParams.getDatabaseName(), s).forEach(p -> { tableEnv.createTemporaryView(p.getTable(), process.getSideOutput(p.getTag()), p.getSchema()); String sql = String.format("INSERT INTO %s.%s.%s SELECT f0.* FROM %s", - destCatalogParams.getCatalogName(), destCatalogParams.getDataBaseName(), + destCatalogParams.getCatalogName(), destCatalogParams.getDatabaseName(), p.getPath().getObjectName(), p.getTable()); set.addInsertSql(sql); }); diff --git a/src/main/java/com/netease/arctic/benchmark/ingestion/MainRunner.java b/src/main/java/com/netease/arctic/benchmark/ingestion/MainRunner.java index a37fb5c..c0eab16 100644 --- a/src/main/java/com/netease/arctic/benchmark/ingestion/MainRunner.java +++ b/src/main/java/com/netease/arctic/benchmark/ingestion/MainRunner.java @@ -18,9 +18,9 @@ package com.netease.arctic.benchmark.ingestion; import com.netease.arctic.benchmark.ingestion.config.CatalogConfigUtil; -import com.netease.arctic.benchmark.ingestion.params.database.BaseParameters; import com.netease.arctic.benchmark.ingestion.params.CallContext; import com.netease.arctic.benchmark.ingestion.params.ParameterUtil; +import com.netease.arctic.benchmark.ingestion.params.database.BaseParameters; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -29,7 +29,9 @@ import org.apache.commons.cli.ParseException; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; @@ -38,19 +40,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.InputStreamReader; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.nio.file.Files; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; /** - * Start-up class, includes parsing configuration files, creating source and sink catalogs - * and calling synchronisation functions + * Start-up class, includes parsing configuration files, creating source and sink catalogs and + * calling synchronisation functions */ @Slf4j public class MainRunner { @@ -63,18 +66,21 @@ public class MainRunner { public static void main(String[] args) throws ClassNotFoundException, InstantiationException, IllegalAccessException { Class.forName("com.mysql.jdbc.Driver"); - System.setProperty("HADOOP_USER_NAME", "sloth"); - Map props = new HashMap<>(); - Configuration configuration = loadYAMLResource( - MainRunner.class.getClassLoader().getResourceAsStream(EDUARD_CONF_FILENAME), props); String[] params = parseParams(args); - String sinkType = params[0]; - String sinkDatabase = params[1]; + String confDir = params[0]; + String sinkType = params[1]; + String sinkDatabase = params[2]; + int restPort = Integer.parseInt(params[3]); + Map props = new HashMap<>(); + Configuration configuration = loadConfiguration(confDir, props); + BaseParameters baseParameters = new BaseParameters(configuration, sinkType, sinkDatabase); - env = StreamExecutionEnvironment.getExecutionEnvironment(setFlinkConf()); + env = StreamExecutionEnvironment.getExecutionEnvironment(setFlinkConf(restPort)); + env.setStateBackend(new FsStateBackend("file:///tmp/benchmark-ingestion")); env.getCheckpointConfig().setCheckpointInterval(60 * 1000L); + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10); tableEnv = StreamTableEnvironment.create(env); createSourceCatalog(baseParameters.getSourceType(), baseParameters); createSinkCatalog(sinkType, props); @@ -130,11 +136,38 @@ private static void createSinkCatalog(String sinkType, Map props ((StreamTableEnvironmentImpl) tableEnv).executeInternal(operation); } - private static Configuration loadYAMLResource(InputStream inputStream, - Map props) { + private static Configuration loadConfiguration(final String configDir, Map props) { + + if (configDir == null) { + throw new IllegalArgumentException( + "Given configuration directory is null, cannot load configuration"); + } + + final File confDirFile = new File(configDir); + if (!(confDirFile.exists())) { + throw new IllegalConfigurationException( + "The given configuration directory name '" + configDir + "' (" + + confDirFile.getAbsolutePath() + ") does not describe an existing directory."); + } + + // get Flink yaml configuration file + final File yamlConfigFile = new File(confDirFile, EDUARD_CONF_FILENAME); + + if (!yamlConfigFile.exists()) { + throw new IllegalConfigurationException("The Flink config file '" + yamlConfigFile + "' (" + + yamlConfigFile.getAbsolutePath() + ") does not exist."); + } + + Configuration configuration = loadYAMLResource(yamlConfigFile, props); + + return configuration; + } + + private static Configuration loadYAMLResource(File file, Map props) { final Configuration config = new Configuration(); - try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(Files.newInputStream(file.toPath())))) { String line; int lineNo = 0; @@ -173,9 +206,9 @@ private static Configuration loadYAMLResource(InputStream inputStream, return config; } - private static Configuration setFlinkConf() { + private static Configuration setFlinkConf(int restPort) { Configuration configuration = new Configuration(); - configuration.setInteger(RestOptions.PORT, 8081); + configuration.setInteger(RestOptions.PORT, restPort); return configuration; } @@ -185,12 +218,18 @@ private static String toUpperFirstCase(String str) { private static String[] parseParams(String[] args) { Options options = new Options(); + Option confDir = Option.builder("confDir").required(true).hasArg().argName("confDir") + .desc("Specify the directory of ingestion-conf yaml").build(); Option sinkType = Option.builder("sinkType").required(true).hasArg().argName("sinkType") .desc("Specify the type of target database").build(); Option sinkDatabase = Option.builder("sinkDatabase").required(true).hasArg() .argName("sinkDatabase").desc("Specify the database name of target database").build(); + Option restPort = Option.builder("restPort").required(false).hasArg().argName("restPort") + .desc("Specify the port of Flink Web UI").build(); + options.addOption(confDir); options.addOption(sinkType); options.addOption(sinkDatabase); + options.addOption(restPort); CommandLineParser parser = new DefaultParser(); CommandLine cmd = null; @@ -200,18 +239,29 @@ private static String[] parseParams(String[] args) { throw new RuntimeException(e); } - String[] params = new String[2]; + String[] params = new String[4]; + if (cmd.hasOption("confDir")) { + params[0] = cmd.getOptionValue("confDir"); + } else { + throw new RuntimeException("parse Param 'confDir' fail"); + } if (cmd.hasOption("sinkType")) { - params[0] = cmd.getOptionValue("sinkType"); + params[1] = cmd.getOptionValue("sinkType"); } else { throw new RuntimeException("parse Param 'sinkType' fail"); } if (cmd.hasOption("sinkDatabase")) { - params[1] = cmd.getOptionValue("sinkDatabase"); + params[2] = cmd.getOptionValue("sinkDatabase"); } else { throw new RuntimeException("parse Param 'sinkDatabase' fail"); } + if (cmd.hasOption("restPort")) { + params[3] = cmd.getOptionValue("restPort"); + } else { + params[3] = "8081"; + LOG.info("No rest port specified, will bind to 8081"); + } return params; } } diff --git a/src/main/java/com/netease/arctic/benchmark/ingestion/SyncDbFunction.java b/src/main/java/com/netease/arctic/benchmark/ingestion/SyncDbFunction.java index d09c022..57cecc8 100644 --- a/src/main/java/com/netease/arctic/benchmark/ingestion/SyncDbFunction.java +++ b/src/main/java/com/netease/arctic/benchmark/ingestion/SyncDbFunction.java @@ -17,8 +17,8 @@ package com.netease.arctic.benchmark.ingestion; -import com.netease.arctic.benchmark.ingestion.params.database.SyncDbParams; -import com.netease.arctic.benchmark.ingestion.source.MysqlCdcCatalog; +import com.netease.arctic.benchmark.ingestion.params.database.SyncDBParams; +import com.netease.arctic.benchmark.ingestion.source.MysqlCDCCatalog; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory; import com.ververica.cdc.connectors.mysql.table.StartupOptions; @@ -60,7 +60,7 @@ public class SyncDbFunction { private static final Logger LOG = LoggerFactory.getLogger(SyncDbFunction.class); public static List> getPathAndTable( - final MysqlCdcCatalog mysql, final String mysqlDb, List tableList) + final MysqlCDCCatalog mysql, final String mysqlDb, List tableList) throws DatabaseNotExistException { return mysql.listTables(mysqlDb).stream().filter(t -> !t.equals("heartbeat")) .filter(tableList::contains).map(t -> { @@ -74,7 +74,7 @@ public static List> getPathAndTable( }).collect(toList()); } - public static MySqlSource getMySqlSource(final MysqlCdcCatalog mysql, + public static MySqlSource getMySqlSource(final MysqlCDCCatalog mysql, final String srcCatalogDb, List tableList, final Map maps, String startUpMode) { return MySqlSource.builder().hostname(mysql.getHostname()).port(mysql.getPort()) @@ -102,7 +102,7 @@ public static Map getConverters( e -> RowRowConverter.create(e.f1.getResolvedSchema().toPhysicalRowDataType()))); } - public static List getParamsList(final String mysqlDb, + public static List getParamsList(final String mysqlDb, final List> pathAndTable) { return pathAndTable.stream().map(e -> { final OutputTag tag = new OutputTag(e.f0.getFullName()) {}; @@ -110,7 +110,7 @@ public static List getParamsList(final String mysqlDb, .map(c -> DataTypes.FIELD(c.getName(), c.getDataType())).collect(toList()); final Schema schema = Schema.newBuilder() .column("f0", DataTypes.ROW(fields.toArray(new DataTypes.Field[] {}))).build(); - return SyncDbParams.builder().table(e.f0.getObjectName()) + return SyncDBParams.builder().table(e.f0.getObjectName()) .path(new ObjectPath(mysqlDb, e.f0.getObjectName())).tag(tag).schema(schema).build(); }).collect(toList()); } diff --git a/src/main/java/com/netease/arctic/benchmark/ingestion/params/catalog/CatalogParams.java b/src/main/java/com/netease/arctic/benchmark/ingestion/params/catalog/CatalogParams.java index d473829..5f3e00a 100644 --- a/src/main/java/com/netease/arctic/benchmark/ingestion/params/catalog/CatalogParams.java +++ b/src/main/java/com/netease/arctic/benchmark/ingestion/params/catalog/CatalogParams.java @@ -29,5 +29,5 @@ public class CatalogParams { String catalogName; - String dataBaseName; + String databaseName; } diff --git a/src/main/java/com/netease/arctic/benchmark/ingestion/params/database/SyncDbParams.java b/src/main/java/com/netease/arctic/benchmark/ingestion/params/database/SyncDBParams.java similarity index 96% rename from src/main/java/com/netease/arctic/benchmark/ingestion/params/database/SyncDbParams.java rename to src/main/java/com/netease/arctic/benchmark/ingestion/params/database/SyncDBParams.java index 1a20237..e483731 100644 --- a/src/main/java/com/netease/arctic/benchmark/ingestion/params/database/SyncDbParams.java +++ b/src/main/java/com/netease/arctic/benchmark/ingestion/params/database/SyncDBParams.java @@ -32,7 +32,7 @@ @AllArgsConstructor @NoArgsConstructor @Builder -public class SyncDbParams implements Serializable { +public class SyncDBParams implements Serializable { Schema schema; OutputTag tag; String db; diff --git a/src/main/java/com/netease/arctic/benchmark/ingestion/sink/HudiCatalogSync.java b/src/main/java/com/netease/arctic/benchmark/ingestion/sink/HudiCatalogSync.java index 825ad2e..d539af1 100644 --- a/src/main/java/com/netease/arctic/benchmark/ingestion/sink/HudiCatalogSync.java +++ b/src/main/java/com/netease/arctic/benchmark/ingestion/sink/HudiCatalogSync.java @@ -59,15 +59,15 @@ public void createTable(Catalog catalog, String dbName, } final String HIVE_META_STORE_URI = hudiParameters.getHiveMetastoreUri(); - boolean is_hive_sync = hudiParameters.getHiveSyncEnable(); + boolean isHiveSync = hudiParameters.getHiveSyncEnable(); final Map options = new HashMap<>(); - if (is_hive_sync) { + if (isHiveSync) { options.put("hive_sync.metastore.uris", HIVE_META_STORE_URI); } pathAndTable.forEach(e -> { try { - fillHudiTableOptions(options, is_hive_sync, dbName, e.f0.getObjectName()); + fillHudiTableOptions(options, isHiveSync, dbName, e.f0.getObjectName()); ObjectPath objectPath = new ObjectPath(dbName, e.f0.getObjectName()); if (hudi.tableExists(objectPath)) { @@ -85,9 +85,9 @@ public void createTable(Catalog catalog, String dbName, }); } - private void fillHudiTableOptions(Map options, boolean is_hive_sync, - String dbName, String tableName) { - if (is_hive_sync) { + private void fillHudiTableOptions(Map options, boolean isHiveSync, String dbName, + String tableName) { + if (isHiveSync) { options.put("hive_sync.enable", "true"); options.put("hive_sync.mode", "hms"); options.put("hive_sync.db", dbName); diff --git a/src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCdcCatalog.java b/src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCDCCatalog.java similarity index 97% rename from src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCdcCatalog.java rename to src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCDCCatalog.java index be7c971..02af3a4 100644 --- a/src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCdcCatalog.java +++ b/src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCDCCatalog.java @@ -63,7 +63,7 @@ /** * Catalog for Mysql. */ -public class MysqlCdcCatalog extends AbstractJdbcCatalog { +public class MysqlCDCCatalog extends AbstractJdbcCatalog { private static final Set builtinDatabases = new HashSet() { { @@ -83,7 +83,7 @@ public int getPort() { private final int port; - public MysqlCdcCatalog(String catalogName, String defaultDatabase, String username, String pwd, + public MysqlCDCCatalog(String catalogName, String defaultDatabase, String username, String pwd, final String hostname, final int port) { super(catalogName, defaultDatabase, username, pwd, String.format("jdbc:mysql://%s:%d", hostname, port)); @@ -92,8 +92,8 @@ public MysqlCdcCatalog(String catalogName, String defaultDatabase, String userna } public static void main(String[] args) throws DatabaseNotExistException, TableNotExistException { - final MysqlCdcCatalog catalog = - new MysqlCdcCatalog("mysql", "chbenchmark", "sys", "netease", "10.171.161.168", 3332); + final MysqlCDCCatalog catalog = + new MysqlCDCCatalog("mysql", "chbenchmark", "sys", "netease", "10.171.161.168", 3332); catalog.listDatabases().forEach(System.out::println); catalog.listTables("test").forEach(System.out::println); System.out.println(catalog.getTable(new ObjectPath("test", "test")).getOptions()); diff --git a/src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCdcCatalogFactory.java b/src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCDCCatalogFactory.java similarity index 93% rename from src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCdcCatalogFactory.java rename to src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCDCCatalogFactory.java index e91324f..464989c 100644 --- a/src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCdcCatalogFactory.java +++ b/src/main/java/com/netease/arctic/benchmark/ingestion/source/MysqlCDCCatalogFactory.java @@ -33,11 +33,11 @@ import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; /** - * Factory for {@link MysqlCdcCatalog}. + * Factory for {@link MysqlCDCCatalog}. */ -public class MysqlCdcCatalogFactory implements CatalogFactory { +public class MysqlCDCCatalogFactory implements CatalogFactory { - private static final Logger LOG = LoggerFactory.getLogger(MysqlCdcCatalogFactory.class); + private static final Logger LOG = LoggerFactory.getLogger(MysqlCDCCatalogFactory.class); @Override public String factoryIdentifier() { @@ -68,7 +68,7 @@ public Catalog createCatalog(Context context) { FactoryUtil.createCatalogFactoryHelper(this, context); helper.validate(); - return new MysqlCdcCatalog(context.getName(), helper.getOptions().get(DEFAULT_DATABASE), + return new MysqlCDCCatalog(context.getName(), helper.getOptions().get(DEFAULT_DATABASE), helper.getOptions().get(USERNAME), helper.getOptions().get(PASSWORD), helper.getOptions().get(HOSTNAME), helper.getOptions().get(PORT)); } diff --git a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index e352c8e..34a4287 100644 --- a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -com.netease.arctic.benchmark.ingestion.source.MysqlCdcCatalogFactory +com.netease.arctic.benchmark.ingestion.source.MysqlCDCCatalogFactory com.netease.arctic.flink.catalog.factories.ArcticCatalogFactory org.apache.hudi.table.catalog.HoodieCatalogFactory org.apache.hudi.table.HoodieTableFactory