Skip to content

Commit

Permalink
[Flink] Support session job when sync to external db (#530)
Browse files Browse the repository at this point in the history
* support session when syncdatabase

Signed-off-by: ChenYunHey <[email protected]>

* fix code

Signed-off-by: ChenYunHey <[email protected]>

---------

Signed-off-by: ChenYunHey <[email protected]>
  • Loading branch information
ChenYunHey authored Aug 27, 2024
1 parent 793c353 commit a824ec1
Showing 1 changed file with 102 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.types.DataType;
Expand All @@ -45,28 +44,44 @@ public class SyncDatabase {
static String password;
static boolean useBatch;
static int sinkParallelism;
static String jdbcOrDorisOptions;

public static void main(String[] args) throws Exception {
StringBuilder connectorOptions = new StringBuilder();
ParameterTool parameter = ParameterTool.fromArgs(args);

sourceDatabase = parameter.get(SOURCE_DB_DB_NAME.key());
sourceTableName = parameter.get(SOURCE_DB_LAKESOUL_TABLE.key()).toLowerCase();
dbType = parameter.get(TARGET_DATABASE_TYPE.key());
targetDatabase = parameter.get(TARGET_DB_DB_NAME.key());
targetTableName = parameter.get(TARGET_DB_TABLE_NAME.key()).toLowerCase();
url = parameter.get(TARGET_DB_URL.key());
if (dbType.equals("mysql") || dbType.equals("postgresql") || dbType.equals("doris")){
for (int i = 0; i < args.length; i++) {
if ( args[i].startsWith("--jdbc") || args[i].startsWith("--doris")){
connectorOptions.append("'")
.append(args[i].substring(7))
.append("'")
.append("=")
.append("'")
.append(args[i+1])
.append("'")
.append(",");
}
}
if (connectorOptions.length()>0){
jdbcOrDorisOptions = connectorOptions.substring(0, connectorOptions.length() - 1);
}
}
if (!dbType.equals("mongodb")) {
username = parameter.get(TARGET_DB_USER.key());
password = parameter.get(TARGET_DB_PASSWORD.key());
}
sinkParallelism = parameter.getInt(SINK_PARALLELISM.key(), SINK_PARALLELISM.defaultValue());
useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue());
//int replicationNum = parameter.getInt(DORIS_REPLICATION_NUM.key(), DORIS_REPLICATION_NUM.defaultValue());

String fenodes = parameter.get(DORIS_FENODES.key(), DORIS_FENODES.defaultValue());
Configuration conf = new Configuration();
conf.setString(RestOptions.BIND_PORT, "8081-8089");
//StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(sinkParallelism);

Expand Down Expand Up @@ -118,7 +133,7 @@ public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] field
String mysqlType = "TEXT";
if (pk != null) {
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
mysqlType = "VARCHAR(255)";
}
}
stringFieldTypes[i] = mysqlType;
Expand Down Expand Up @@ -147,7 +162,7 @@ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNam
String mysqlType = "TEXT";
if (pk != null) {
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
mysqlType = "VARCHAR(255)";
}
}
stringFieldTypes[i] = mysqlType;
Expand Down Expand Up @@ -175,7 +190,7 @@ public static String[] getDorisFieldTypes(DataType[] fieldTypes) {
stringFieldTypes[i] = "DATETIME";
} else if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
stringFieldTypes[i] = "VARCHAR";
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType ) {
stringFieldTypes[i] = "TIMESTAMP";
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
Expand Down Expand Up @@ -212,32 +227,55 @@ public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException
StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
Catalog lakesoulCatalog = new LakeSoulCatalog();
tEnvs.registerCatalog("lakeSoul", lakesoulCatalog);
tEnvs.useCatalog("lakeSoul");
tEnvs.useDatabase(sourceDatabase);
String jdbcUrl = url + targetDatabase;
Connection conn = DriverManager.getConnection(jdbcUrl, username, password);

TableResult schemaResult = tEnvs.executeSql(
"SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "` LIMIT 1");

String[] fieldNames = schemaResult.getTableSchema().getFieldNames();
DataType[] fieldTypes = schemaResult.getTableSchema().getFieldDataTypes();
Table lakesoulTable = tEnvs.from("`lakeSoul`.`" + sourceDatabase + "`.`" + sourceTableName + "`");
DataType[] fieldDataTypes = lakesoulTable.getSchema().getFieldDataTypes();
String[] fieldNames = lakesoulTable.getSchema().getFieldNames();
String tablePk = getTablePk(sourceDatabase, sourceTableName);
String[] stringFieldsTypes = getPgFieldsTypes(fieldTypes, fieldNames, tablePk);
String[] stringFieldsTypes = getPgFieldsTypes(fieldDataTypes, fieldNames, tablePk);

String createTableSql = pgAndMsqlCreateTableSql(stringFieldsTypes, fieldNames, targetTableName, tablePk);
Statement statement = conn.createStatement();
// Create the target table in MySQL
statement.executeUpdate(createTableSql.toString());
String createCatalog = "create catalog postgres_catalog with('type'='jdbc','default-database'=" + "'" + targetDatabase + "'" + "," + "'username'=" +
"'" + username + "'" + "," + "'password'=" + "'" + password + "'" + "," + "'base-url'=" + "'" + url + "'" + ")";
// Move data from LakeSoul to MySQL

tEnvs.executeSql(createCatalog);
String insertQuery = "INSERT INTO postgres_catalog.`" + targetDatabase + "`.`" + targetTableName +
"` SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "`";

tEnvs.executeSql(insertQuery);
StringBuilder coulmns = new StringBuilder();
for (int i = 0; i < fieldDataTypes.length; i++) {
if (stringFieldsTypes[i].equals("BYTEA")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("BYTES");
} else if (stringFieldsTypes[i].equals("TEXT")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("VARCHAR");
} else if (stringFieldsTypes[i].equals("FLOAT8")) {
coulmns.append("`").append(fieldNames[i]).append("`").append("DOUBLE");
} else {
coulmns.append("`").append(fieldNames[i]).append("` ").append(stringFieldsTypes[i]);
}
if (i < fieldDataTypes.length - 1) {
coulmns.append(",");
}
}
String sql;
if (jdbcOrDorisOptions==null){
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
}
}else {
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s', %s)",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism, jdbcOrDorisOptions);
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s', %s)",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism, jdbcOrDorisOptions);
}
}
tEnvs.executeSql(sql);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);
statement.close();
conn.close();
}
Expand All @@ -255,10 +293,9 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept
Catalog lakesoulCatalog = new LakeSoulCatalog();
tEnvs.registerCatalog("lakeSoul", lakesoulCatalog);
String jdbcUrl = url + targetDatabase;
TableResult schemaResult = tEnvs.executeSql(
"SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "` LIMIT 1");
DataType[] fieldDataTypes = schemaResult.getTableSchema().getFieldDataTypes();
String[] fieldNames = schemaResult.getTableSchema().getFieldNames();
Table lakesoulTable = tEnvs.from("`lakeSoul`.`" + sourceDatabase + "`.`" + sourceTableName + "`");
DataType[] fieldDataTypes = lakesoulTable.getSchema().getFieldDataTypes();
String[] fieldNames = lakesoulTable.getSchema().getFieldNames();
String tablePk = getTablePk(sourceDatabase, sourceTableName);
String[] stringFieldsTypes = getMysqlFieldsTypes(fieldDataTypes, fieldNames, tablePk);
String createTableSql = pgAndMsqlCreateTableSql(stringFieldsTypes, fieldNames, targetTableName, tablePk);
Expand All @@ -271,6 +308,9 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept
for (int i = 0; i < fieldDataTypes.length; i++) {
if (stringFieldsTypes[i].equals("BLOB")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("BYTES");
} else if (stringFieldsTypes[i].equals("TEXT")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("VARCHAR");

} else {
coulmns.append("`").append(fieldNames[i]).append("` ").append(stringFieldsTypes[i]);
}
Expand All @@ -279,14 +319,26 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept
}
}
String sql;
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s' , 'sink.parallelism' = '%s')",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
if (jdbcOrDorisOptions==null){
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
}
}else {
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s', %s)",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism, jdbcOrDorisOptions);
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s', %s)",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism, jdbcOrDorisOptions);
}
}

tEnvs.executeSql(sql);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);

Expand All @@ -306,22 +358,28 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes)
Catalog lakesoulCatalog = new LakeSoulCatalog();
tEnvs.registerCatalog("lakeSoul", lakesoulCatalog);
String jdbcUrl = url + targetDatabase;
TableResult schemaResult = tEnvs.executeSql(
"SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "` LIMIT 1");
DataType[] fieldDataTypes = schemaResult.getTableSchema().getFieldDataTypes();
Table lakesoulTable = tEnvs.from("`lakeSoul`.`" + sourceDatabase + "`.`" + sourceTableName + "`");
DataType[] fieldDataTypes = lakesoulTable.getSchema().getFieldDataTypes();
String[] fieldNames = lakesoulTable.getSchema().getFieldNames();
String[] dorisFieldTypes = getDorisFieldTypes(fieldDataTypes);
String[] fieldNames = schemaResult.getTableSchema().getFieldNames();

StringBuilder coulmns = new StringBuilder();
for (int i = 0; i < fieldDataTypes.length; i++) {
coulmns.append("`").append(fieldNames[i]).append("` ").append(dorisFieldTypes[i]);
if (i < fieldDataTypes.length - 1) {
coulmns.append(",");
}
}
String sql = String.format(
"create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s')",
targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password);
String sql;
if (jdbcOrDorisOptions == null){
sql = String.format(
"create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s')",
targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password);
}else {
sql = String.format(
"create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s', %s)",
targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password, jdbcOrDorisOptions);
}

tEnvs.executeSql(sql);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);
}
Expand Down

0 comments on commit a824ec1

Please sign in to comment.