Skip to content

[Flink] Fix source split when context reader missing #709

[Flink] Fix source split when context reader missing

[Flink] Fix source split when context reader missing #709

# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0
name: CI with flink cdc Test on HDFS
on:
push:
paths-ignore:
- "javadoc/**"
- "website/**"
- "cpp/**"
- "python/**"
- "**.md"
branches:
- 'main'
pull_request:
paths-ignore:
- "javadoc/**"
- "website/**"
- "cpp/**"
- "python/**"
- "**.md"
branches:
- 'main'
- 'release/**'
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
with:
tool-cache: false
android: true
dotnet: true
haskell: true
large-packages: false
docker-images: true
swap-storage: true
- uses: actions/checkout@v4
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Set up Python 3.9
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
pip install pymysql cryptography jproperties --no-cache-dir
wget https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.17.1/flink-s3-fs-hadoop-1.17.1.jar -O $HOME/flink-s3-fs-hadoop-1.17.1.jar
wget https://repo1.maven.org/maven2/org/apache/parquet/parquet-hadoop-bundle/1.12.3/parquet-hadoop-bundle-1.12.3.jar -O $HOME/parquet-hadoop-bundle-1.12.3.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-parquet/1.17.1/flink-parquet-1.17.1.jar -O $HOME/flink-parquet-1.17.1.jar
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
workspaces: "./rust -> target"
- name: Cache Docker images
uses: ScribeMD/[email protected]
with:
key: docker-${{ runner.os }}-${{ hashFiles('rust/Cross.toml', 'docker/lakesoul-docker-compose-env/docker-compose.yml') }}
- name: Pull images
run: |
docker pull -q bitnami/spark:3.3.1
- uses: actions-rs/cargo@v1
with:
use-cross: true
command: build
args: '--manifest-path rust/Cargo.toml --target x86_64-unknown-linux-gnu --release --all-features'
- name: Build with Maven
run: |
mkdir -p rust/target/release
cp rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_io_c.so rust/target/release
cp rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_metadata_c.so rust/target/release
MAVEN_OPTS="-Xmx4000m" mvn -q -B clean package -f pom.xml -Pcross-build -DskipTests
- name: Get jar names
run: |
echo "FLINK_JAR_NAME=$(python script/get_jar_name.py lakesoul-flink)" >> $GITHUB_ENV
echo "FLINK_TEST_JAR_NAME=$(python script/get_jar_name.py lakesoul-flink | sed -e 's/.jar/-tests.jar/g')" >> $GITHUB_ENV
echo "SPARK_JAR_NAME=$(python script/get_jar_name.py lakesoul-spark)" >> $GITHUB_ENV
echo "SPARK_TEST_JAR_NAME=$(python script/get_jar_name.py lakesoul-spark | sed -e 's/.jar/-tests.jar/g')" >> $GITHUB_ENV
- name: Copy built jar to work-dir
run: |
cp ./lakesoul-flink/target/$FLINK_JAR_NAME ./script/benchmark/work-dir
cp ./lakesoul-flink/target/$FLINK_TEST_JAR_NAME ./script/benchmark/work-dir
cp ./lakesoul-spark/target/$SPARK_JAR_NAME ./script/benchmark/work-dir
cp ./lakesoul-spark/target/$SPARK_TEST_JAR_NAME ./script/benchmark/work-dir
- uses: beyondstorage/setup-hdfs@master
with:
hdfs-version: '3.3.6'
- name: Modify HDFS User Group Mapping
run: |
sed -i '/^<\/configuration>/i <property><name>hadoop.proxyuser.spark.hosts</name><value>*</value></property><property><name>hadoop.proxyuser.spark.groups</name><value>*</value></property>' $HADOOP_HOME/etc/hadoop/core-site.xml
- name: Modify HDFS Host
run: |
sed -i 's/localhost/172.17.0.1/g' $HADOOP_HOME/etc/hadoop/core-site.xml
sed -i 's/localhost/172.17.0.1/g' $HADOOP_HOME/etc/hadoop/hdfs-site.xml
$HADOOP_HOME/sbin/stop-dfs.sh
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/bin/hadoop fs -chmod -R 777 /
- name: Deploy cluster
run: |
cd ./docker/lakesoul-docker-compose-env
docker compose pull -q
docker compose up -d
sleep 30s
- name: Start compaction task
run: |
cd ./script/benchmark/work-dir
nohup docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --proxy-user flink --driver-memory 2G --executor-memory 2G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.hadoop.fs.s3.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.fast.upload.buffer=disk --conf spark.hadoop.fs.s3a.fast.upload=true --conf spark.dmetasoul.lakesoul.native.io.enable=true --class com.dmetasoul.lakesoul.spark.compaction.CompactionTask --master local[4] /opt/spark/work-dir/$SPARK_JAR_NAME --threadpool.size=10 --database="" --file_num_limit=5 --file_size_limit=10KB > compaction.log 2>&1 &
- name: Start flink mysql cdc task-1
run: |
docker exec -t -u flink lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.entry.MysqlCdc /opt/flink/work-dir/$FLINK_JAR_NAME --source_db.host mysql --source_db.port 3306 --source_db.db_name test_cdc --source_db.user root --source_db.password root --source.parallelism 2 --sink.parallelism 4 --use.cdc true --warehouse_path hdfs://172.17.0.1:9000/lakesoul-test-bucket/data/ --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/chk --flink.savepoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/svp --job.checkpoint_interval 5000 --server_time_zone UTC
sleep 30s
- name: Start flink source to sink task-2
run: |
docker exec -t -u flink lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulSourceToSinkTable -C file:///opt/flink/work-dir/$FLINK_JAR_NAME /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --source.database.name test_cdc --source.table.name default_init --sink.database.name flink_sink --sink.table.name default_init --use.cdc true --hash.bucket.number 2 --job.checkpoint_interval 10000 --server_time_zone UTC --warehouse.path hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-sink/data --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-sink/chk
sleep 30s
- name: Start flink DataGenSource without primary key task-3
run: |
docker exec -t -u flink lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulDataGenSourceTable -C file:///opt/flink/work-dir/$FLINK_JAR_NAME /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --sink.database.name flink --sink.table.name sink_table --job.checkpoint_interval 10000 --server_time_zone UTC --warehouse.path hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink/ --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink/chk --sink.parallel 2 --data.size 1000 --write.time 5
- name: Download task dependency jar
run: |
cd ./script/benchmark/work-dir
if [ ! -e mysql-connector-java-8.0.30.jar ]; then wget -q https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar; fi
if [ ! -e flink-connector-jdbc-3.1.1-1.17.jar ]; then wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar; fi
if [ ! -e assertj-core-3.23.1.jar ]; then wget -q https://repo1.maven.org/maven2/org/assertj/assertj-core/3.23.1/assertj-core-3.23.1.jar; fi
- name: Create table and insert data
run: |
cd ./script/benchmark
python 1_create_table.py
docker exec -i lakesoul-docker-compose-env-mysql-1 bash /2_insert_table_data.sh
sleep 30s
- name: Start flink engine data check task
run: |
docker exec -t lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulFlinkDataCheck -C file:///opt/flink/work-dir/$FLINK_JAR_NAME -C file:///opt/flink/work-dir/mysql-connector-java-8.0.30.jar -C file:///opt/flink/work-dir/flink-connector-jdbc-3.1.1-1.17.jar -C file:///opt/flink/work-dir/assertj-core-3.23.1.jar /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --mysql.hostname mysql --mysql.database.name test_cdc --mysql.table.name default_init_1 --mysql.username root --mysql.password root --mysql.port 3306 --server.time.zone UTC --single.table.contract true --lakesoul.database.name test_cdc --lakesoul.table.name default_init_1 --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-engine/chk
sleep 30s
- name: Mysql cdc data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Flink source to sink data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME --cdc.contract false --single.table.contract true
- name: Adding columns for tables and deleting some data from tables
run: |
cd ./script/benchmark
python3 3_add_column.py
python3 delete_data.py
docker exec -i lakesoul-docker-compose-env-mysql-1 bash /2_insert_table_data.sh
sleep 30s
- name: Start flink engine data check task
run: |
docker exec -t lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulFlinkDataCheck -C file:///opt/flink/work-dir/$FLINK_JAR_NAME -C file:///opt/flink/work-dir/mysql-connector-java-8.0.30.jar -C file:///opt/flink/work-dir/flink-connector-jdbc-3.1.1-1.17.jar -C file:///opt/flink/work-dir/assertj-core-3.23.1.jar /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --mysql.hostname mysql --mysql.database.name test_cdc --mysql.table.name default_init_1 --mysql.username root --mysql.password root --mysql.port 3306 --server.time.zone UTC --single.table.contract true --lakesoul.database.name test_cdc --lakesoul.table.name default_init_1 --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-engine/chk
sleep 30s
- name: Mysql cdc data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Flink source to sink data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME --cdc.contract false --single.table.contract true
- name: Updating data in tables
run: |
cd ./script/benchmark
python3 4_update_data.py
sleep 30s
- name: Start flink engine data check task
run: |
docker exec -t lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulFlinkDataCheck -C file:///opt/flink/work-dir/$FLINK_JAR_NAME -C file:///opt/flink/work-dir/mysql-connector-java-8.0.30.jar -C file:///opt/flink/work-dir/flink-connector-jdbc-3.1.1-1.17.jar -C file:///opt/flink/work-dir/assertj-core-3.23.1.jar /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --mysql.hostname mysql --mysql.database.name test_cdc --mysql.table.name default_init_1 --mysql.username root --mysql.password root --mysql.port 3306 --server.time.zone UTC --single.table.contract true --lakesoul.database.name test_cdc --lakesoul.table.name default_init_1 --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-engine/chk
sleep 30s
- name: Mysql cdc data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.dmetasoul.lakesoul.native.io.enable=true --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Flink source to sink data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME --cdc.contract false --single.table.contract true
- name: Dropping columns and deleting some data in tables
run: |
cd ./script/benchmark
python3 6_drop_column.py
python3 delete_data.py
docker exec -i lakesoul-docker-compose-env-mysql-1 bash /2_insert_table_data.sh
sleep 30s
- name: Start flink engine data check task
run: |
docker exec -t lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulFlinkDataCheck -C file:///opt/flink/work-dir/$FLINK_JAR_NAME -C file:///opt/flink/work-dir/mysql-connector-java-8.0.30.jar -C file:///opt/flink/work-dir/flink-connector-jdbc-3.1.1-1.17.jar -C file:///opt/flink/work-dir/assertj-core-3.23.1.jar /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --mysql.hostname mysql --mysql.database.name test_cdc --mysql.table.name default_init_1 --mysql.username root --mysql.password root --mysql.port 3306 --server.time.zone UTC --single.table.contract true --lakesoul.database.name test_cdc --lakesoul.table.name default_init_1 --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-engine/chk
sleep 30s
- name: Mysql cdc data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.dmetasoul.lakesoul.native.io.enable=true --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Flink source to sink data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME --cdc.contract false --single.table.contract true
- name: Table without primary key data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.FlinkWriteDataCheck --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME --csv.path hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink/csv --lakesoul.table.path hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink/sink_table --server.time.zone UTC
- name: Print Flink Log
if: always()
run: |
docker logs lakesoul-docker-compose-env-jobmanager-1 > flink-session-cluster.log
- name: Upload Log
if: always()
continue-on-error: true
uses: actions/upload-artifact@v4
with:
name: flink-cluster-log
path: flink-session-cluster.log
retention-days: 5
if-no-files-found: error
- name: Upload Compaction Log
uses: actions/upload-artifact@v4
with:
name: lakesoul-compaction-log
path: ./script/benchmark/work-dir/compaction.log
retention-days: 3
if-no-files-found: error