diff --git a/.gitignore b/.gitignore index 93b05a6acc..b6876a3284 100644 --- a/.gitignore +++ b/.gitignore @@ -15,10 +15,8 @@ zookeeper.dockerfile controller.dockerfile broker.dockerfile app.dockerfile -spark.dockerfile deps.dockerfile worker.dockerfile -etl.dockerfile hadoop.dockerfile # we don't put binary file to git repo gradle-wrapper.jar diff --git a/README.md b/README.md index 523b2a7c48..cbe1fa3c2c 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,6 @@ 11. [GUI](docs/gui/README.md): 簡單好用的叢集資訊圖形化工具 12. [Connector](./docs/connector/README.md): 提供基於 `kafka connector` 實作的高效平行化工具,包含效能測試和資料遷移等工具 13. [Build](docs/build_project.md): 說明如何建構與測試本專案各模組 -14. [etl](./docs/etl/README.md): 構建 spark-kafka 的資料傳輸通道 # Kafka Q&A diff --git a/build.gradle b/build.gradle index f6be5ea3f5..a3800de960 100644 --- a/build.gradle +++ b/build.gradle @@ -36,9 +36,4 @@ spotless { } } } - scala { - licenseHeaderFile(file("$rootDir/checkstyle/apache.header"), "package ") - target '**/scala/**/*.scala' - scalafmt() - } } \ No newline at end of file diff --git a/config/spark2kafka.properties b/config/spark2kafka.properties deleted file mode 100644 index 7e6d195533..0000000000 --- a/config/spark2kafka.properties +++ /dev/null @@ -1,40 +0,0 @@ -#Parameters you must configure -#============================================================== -#The data source path should be a directory. -source.path = - -#The CSV Column Name.For example:sA=string,sB=integer,sC=boolean... -column.names = - -#Primary keys.For example:sA=string,sB=integer,sC=boolean... -primary.keys = - -#The Kafka bootstrap servers -kafka.bootstrap.servers = - -#Set your topic name. -topic.name = - -#Spark checkpoint path -checkpoint = - -#Parameters that can be selected for configuration -#============================================================== - -#Set the number of topic partitions, if it is empty that will set it to 15. -topic.partitions = - -#Set the number of topic replicas, if it is empty that will set it to 1. -topic.replicas = - -#The rest of the topic can be configured parameters.For example: keyA=valueA,keyB=valueB,keyC=valueC... -topic.config = - -#Option to clean up completed files after processing.Available options are "archive", "delete", "off". Default:delete -clean.source = - -#You must config it when clean.source set archive. -archive.path = - -#recursive.file is used to recursively load files. Default:true -recursive.file = diff --git a/docker/start_etl.sh b/docker/start_etl.sh deleted file mode 100755 index 3c829ff388..0000000000 --- a/docker/start_etl.sh +++ /dev/null @@ -1,146 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Configure the path of Spark2Kafka.properties to run. For example: ./docker/start_etl.sh PropertiesPath -declare -r DOCKER_FOLDER=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd) -source "$DOCKER_FOLDER"/docker_build_common.sh - -# ===============================[version control]================================= -declare -r SPARK_VERSION=${SPARK_VERSION:-3.3.1} -# ===============================[global variables]================================ -declare -r LOCAL_PATH=$(cd -- "$(dirname -- "${DOCKER_FOLDER}")" &>/dev/null && pwd) -# ===============================[properties keys]================================= -declare -r SOURCE_KEY="source.path" -declare -r CHECKPOINT_KEY="checkpoint" -# ===============================[spark driver/executor resource]================== -declare -r RESOURCES_CONFIGS="${RESOURCES_CONFIGS:-"--conf spark.driver.memory=4g --conf spark.executor.memory=4g"}" -# ===================================[functions]=================================== - -function showHelp() { - echo "Usage: [ENV] start_etl.sh properties_path" - echo "required Arguments: " - echo " master where to submit spark job" - echo " property.file The path of Spark2Kafka.properties." - echo "ENV: " - echo " ACCOUNT=skiptests set the account to clone from" - echo " VERSION=main set branch of astraea" - echo " RESOURCES_CONFIGS=\"--conf spark.driver.memory=4g\" set spark memory" -} - -function runContainer() { - local master=$1 - local propertiesPath=$2 - - if [[ ! -f "$propertiesPath" ]]; then - echo "$propertiesPath is not a property file" - exit 1 - fi - - source_path=$(cat $propertiesPath | grep $SOURCE_KEY | cut -d "=" -f2) - checkpoint_path=$(cat $propertiesPath | grep $CHECKPOINT_KEY | cut -d "=" -f2) - source_name=$(echo "${source_path}" | tr '/' '-') - - docker run --rm -v "$LOCAL_PATH":/tmp/astraea \ - ghcr.io/skiptests/astraea/deps \ - /bin/bash \ - -c "cd /tmp/astraea && ./gradlew clean shadowJar --no-daemon" - - jar_path=$(find "$LOCAL_PATH"/etl/build/libs -type f -name "*all.jar") - - if [[ ! -f "$jar_path" ]]; then - echo "$jar_path is not a uber jar" - exit 1 - fi - - # the driver is running on client mode, so the source path must be readable from following container. - # hence, we will mount source path to container directly - mkdir -p "$source_path" - if [ $? -ne 0 ]; then - echo "failed to create folder on $source_path" - exit 1 - fi - - mkdir -p "$checkpoint_path" - if [ $? -ne 0 ]; then - echo "failed to create folder on $checkpoint_path" - exit 1 - fi - - ui_port=$(($(($RANDOM % 10000)) + 10000)) - if [[ "$master" == "local"* ]]; then - network_config="-p ${ui_port}:${ui_port}" - else - # expose the driver network - network_config="--network host" - fi - - if [[ "$master" == "spark:"* ]] || [[ "$master" == "local"* ]]; then - docker run -d --init \ - --name "csv-kafka-${source_name}" \ - $network_config \ - -v "$propertiesPath":"$propertiesPath":ro \ - -v "$jar_path":/tmp/astraea-etl.jar:ro \ - -v "${source_path}":"${source_path}" \ - -v "${checkpoint_path}":"${checkpoint_path}" \ - -e JAVA_OPTS="$HEAP_OPTS" \ - ghcr.io/skiptests/astraea/spark:$SPARK_VERSION \ - ./bin/spark-submit \ - --conf "spark.ui.port=$ui_port" \ - $RESOURCES_CONFIGS \ - --packages org.apache.spark:spark-sql-kafka-0-10_2.13:"$SPARK_VERSION" \ - --class org.astraea.etl.Spark2Kafka \ - --jars file:///tmp/astraea-etl.jar \ - --master $master \ - /tmp/astraea-etl.jar \ - "$propertiesPath" - - echo "application web: http:$ADDRESS:$ui_port" - else - echo "$master is unsupported" - exit 1 - fi - -} - -# ===================================[main]=================================== -checkDocker - -master="local[*]" -property_file_path="" - -while [[ $# -gt 0 ]]; do - if [[ "$1" == "help" ]]; then - showHelp - exit 0 - fi - - if [[ "$1" == "master"* ]]; then - master=$(echo "$1" | cut -d "=" -f 2) - fi - - if [[ "$1" == "property.file"* ]]; then - property_file_path=$(echo "$1" | cut -d "=" -f 2) - fi - - shift -done - -if [[ "$property_file_path" == "" ]]; then - showHelp - exit 0 -fi - -runContainer "$master" "$property_file_path" diff --git a/docker/start_spark.sh b/docker/start_spark.sh deleted file mode 100755 index c628aa53dc..0000000000 --- a/docker/start_spark.sh +++ /dev/null @@ -1,225 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -declare -r DOCKER_FOLDER=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd) -source $DOCKER_FOLDER/docker_build_common.sh - -# ===============================[global variables]=============================== -declare -r VERSION=${REVISION:-${VERSION:-3.3.2}} -declare -r HADOOP_VERSION=${HADOOP_REVERSION:-${HADOOP_REVERSION:-3}} -declare -r REPO=${REPO:-ghcr.io/skiptests/astraea/spark} -declare -r IMAGE_NAME="$REPO:$VERSION" -declare -r SPARK_PORT=${SPARK_PORT:-"$(getRandomPort)"} -declare -r SPARK_UI_PORT=${SPARK_UI_PORT:-"$(getRandomPort)"} -declare -r DOCKERFILE=$DOCKER_FOLDER/spark.dockerfile -declare -r MASTER_NAME="spark-master" -declare -r WORKER_NAME="spark-worker" - -# ===================================[functions]=================================== - -function showHelp() { - echo "Usage: [ENV] start_spark.sh" - echo "Optional Arguments: " - echo " master=spark://node00:1111 start a spark worker. Or start a spark master if master-url is not defined" - echo " folder=/tmp/aa:/tmp/bb mount the host folder /tmp/aa to spark container /tmp/bb" - echo "ENV: " - echo " VERSION=3.3.1 set version of spark distribution" - echo " BUILD=false set true if you want to build image locally" - echo " RUN=false set false if you want to build/pull image only" - echo " PYTHON_DEPS=delta-spark=1.0.0 set the python dependencies which are pre-installed in the docker image" -} - -function checkOs() { - if [[ "$OSTYPE" == "darwin"* ]]; then - echo "This script requires to run container with \"--network host\", but the feature is unsupported by Mac OS" - exit 2 - fi -} - -# Spark needs to manage the hardware resource for this node, so we don't run multiples workers/masters in same node. -function checkConflictContainer() { - local name=$1 - local role=$2 - local container_names=$(docker ps --format "{{.Names}}") - if [[ $(echo "${container_names}" | grep "$name") != "" ]]; then - echo "It is disallowed to run multiples spark $role in same node" - exit 2 - fi -} - -function generateDockerfileByVersion() { - echo "# this dockerfile is generated dynamically -FROM ubuntu:20.04 AS build - -# install tools -RUN apt-get update && apt-get install -y wget unzip - -# download spark -WORKDIR /tmp -RUN wget https://archive.apache.org/dist/spark/spark-${VERSION}/spark-${VERSION}-bin-hadoop${HADOOP_VERSION}-scala2.13.tgz -RUN mkdir /opt/spark -RUN tar -zxvf spark-${VERSION}-bin-hadoop${HADOOP_VERSION}-scala2.13.tgz -C /opt/spark --strip-components=1 - -# the python3 in ubuntu 22.04 is 3.10 by default, and it has a known issue (https://github.com/vmprof/vmprof-python/issues/240) -# The issue obstructs us from installing 3-third python libraries, so we downgrade the ubuntu to 20.04 -FROM ubuntu:20.04 - -# Do not ask for confirmations when running apt-get, etc. -ENV DEBIAN_FRONTEND noninteractive - -# install tools -RUN apt-get update && apt-get install -y openjdk-11-jre python3 python3-pip - -# copy spark -COPY --from=build /opt/spark /opt/spark - -# add user -RUN groupadd $USER && useradd -ms /bin/bash -g $USER $USER - -# change user -RUN chown -R $USER:$USER /opt/spark -USER $USER - -# export ENV -ENV SPARK_HOME /opt/spark -WORKDIR /opt/spark -" >"$DOCKERFILE" -} - -function generateDockerfileBySource() { - echo "# this dockerfile is generated dynamically -FROM ubuntu:23.10 AS build - -# Do not ask for confirmations when running apt-get, etc. -ENV DEBIAN_FRONTEND noninteractive - -# install tools -RUN apt-get update && apt-get install -y openjdk-11-jdk python3 python3-pip git curl - -# build spark from source code -RUN git clone https://github.com/apache/spark /tmp/spark -WORKDIR /tmp/spark -RUN git checkout $VERSION -ENV MAVEN_OPTS=\"-Xmx3g\" -RUN ./dev/make-distribution.sh --pip --tgz -RUN mkdir /opt/spark -RUN tar -zxvf \$(find ./ -maxdepth 1 -type f -name spark-*SNAPSHOT*.tgz) -C /opt/spark --strip-components=1 -RUN ./build/mvn install -DskipTests - -# the python3 in ubuntu 22.04 is 3.10 by default, and it has a known issue (https://github.com/vmprof/vmprof-python/issues/240) -# The issue obstructs us from installing 3-third python libraries, so we downgrade the ubuntu to 20.04 -FROM ubuntu:20.04 - -# Do not ask for confirmations when running apt-get, etc. -ENV DEBIAN_FRONTEND noninteractive - -# install tools -RUN apt-get update && apt-get install -y openjdk-11-jre python3 python3-pip - -# copy spark -COPY --from=build /opt/spark /opt/spark - -# add user -RUN groupadd $USER && useradd -ms /bin/bash -g $USER $USER - -# change user -RUN chown -R $USER:$USER /opt/spark -USER $USER - -# export ENV -ENV SPARK_HOME /opt/spark -WORKDIR /opt/spark -" >"$DOCKERFILE" -} - -function generateDockerfile() { - if [[ -n "$REVISION" ]]; then - generateDockerfileBySource - else - generateDockerfileByVersion - fi -} - -# ===================================[main]=================================== - -raw_folder_mapping="" -master_url="" -mount_command="" - -while [[ $# -gt 0 ]]; do - if [[ "$1" == "help" ]]; then - showHelp - exit 0 - fi - - if [[ "$1" == "master"* ]]; then - master_url=$(echo "$1" | cut -d "=" -f 2) - fi - - if [[ "$1" == "folder"* ]]; then - raw_folder_mapping=$1 - folder_mapping=$(echo "$1" | cut -d "=" -f 2) - host_folder=$(echo "$folder_mapping" | cut -d ":" -f 1) - container_folder=$(echo "$folder_mapping" | cut -d ":" -f 2) - mount_command="-v $host_folder:$container_folder" - fi - shift -done - -checkDocker -generateDockerfile -buildImageIfNeed "$IMAGE_NAME" - -if [[ "$RUN" != "true" ]]; then - echo "docker image: $IMAGE_NAME is created" - exit 0 -fi - -checkNetwork -checkOs - -if [[ "$master_url" != "" ]]; then - checkConflictContainer $WORKER_NAME "worker" - docker run -d --init \ - -e SPARK_WORKER_WEBUI_PORT=$SPARK_UI_PORT \ - -e SPARK_WORKER_PORT=$SPARK_PORT \ - -e SPARK_NO_DAEMONIZE=true \ - $mount_command \ - --name "$WORKER_NAME" \ - --network host \ - "$IMAGE_NAME" ./sbin/start-worker.sh "$master_url" - - echo "=================================================" - echo "Starting Spark worker $ADDRESS:$SPARK_PORT" - echo "Bound WorkerWebUI started at http://${ADDRESS}:${SPARK_UI_PORT}" - echo "=================================================" -else - checkConflictContainer $MASTER_NAME "master" - docker run -d --init \ - -e SPARK_MASTER_WEBUI_PORT=$SPARK_UI_PORT \ - -e SPARK_MASTER_PORT=$SPARK_PORT \ - -e SPARK_NO_DAEMONIZE=true \ - $mount_command \ - --name "$MASTER_NAME" \ - --network host \ - "$IMAGE_NAME" ./sbin/start-master.sh - - echo "=================================================" - echo "Starting Spark master at spark://$ADDRESS:$SPARK_PORT" - echo "Bound MasterWebUI started at http://${ADDRESS}:${SPARK_UI_PORT}" - echo "execute $DOCKER_FOLDER/start_spark.sh master=spark://$ADDRESS:$SPARK_PORT $raw_folder_mapping to add worker" - echo "=================================================" -fi diff --git a/docs/build_project.md b/docs/build_project.md index 6d5d4003e5..3c5c160651 100644 --- a/docs/build_project.md +++ b/docs/build_project.md @@ -4,7 +4,6 @@ - app: 處理與web server,成本分析與負載平衡等功能之模組 - common: 有關叢集,客戶端等分析,管理與平衡等功能之模組 -- etl: 轉換 csv 檔,再匯入 delta 並使串接部份有更好的平行化處理之模組 - gui: 與圖形化界面功能相關的模組 - connector: 基於 Kafka connector 實作的各式分散式工具 - it: 針對專案測試所提供的叢集環境 diff --git a/docs/etl/README.md b/docs/etl/README.md deleted file mode 100644 index a10eef0793..0000000000 --- a/docs/etl/README.md +++ /dev/null @@ -1,51 +0,0 @@ -Astraea etl 中文文件 -=== - -Astraea etl 的目標是構建 kafka-spark-delta 的資料傳輸通道。目前支持的功能爲讀取csv檔案透過 spark streaming 轉入 kafka topic。 - -### 系統架構 - -spark -> kafka 將csv檔案透過 spark streaming 轉入 kafka topic,包括讀取資料,清洗資料與寫入資料。該模組分爲三個部分。 -1. 讀取資料: spark streaming 會讀取source path 下所有csv檔案,並將其轉化為dataframe。 -2. 清洗資料: dataframe 以行為單位,通過將json convertor 封裝進 udf 的方式,將資料變為 json 格式字串的集合。 -3. 寫入資料: 將轉化為 json 格式的資料以一行對應一條 record 的方式,發送到kafka中指定的topic。record的key為table的primary key,value 為轉換為json 字串的csv row。 - -![framework diagram](../pictures/etl_README_1.png) - -### Astraea etl 使用 - -Astraea etl 通過讀取[property file](../../config/spark2kafka.properties) 來獲取系統運行時可能需要的資訊。 -以下是property中參數簡介 - -| 參數名稱 | 說明 | 預設值 | -|:-----------------:|:----------------------------------------------------------------------------|:-----:| -| source.path | (必填) 資料來源路徑 | 無 | -| column.names | (必填) csv table的欄位名稱及該欄位對應的屬性 For example:sA=string,sB=integer,sC=boolean... | 無 | -| primary.keys | (必填) csv table中的primary key. For example:sA=string,sB=integer,sC=boolean... | 無 | -| bootstrap.servers | (必填) 欲連接的Kafka server address | 無 | -| topic.name | (必填) 欲發往的topic name | 無 | -| checkpoint | (必填) spark checkpoint 存放路徑 | 無 | -| topic.partitions | (選填) 目標topic的partition數量 | 15 | -| topic.replicas | (選填) 目標topic的replica數量 | 1 | -| topic.configs | (選填) 配置kafka的其他參數 For example: compression.type\=lz4 | 無 | - -#### 使用範例 - -專案內的工具都有整合到`container`中,使用者利用docker運行,可方便管理,使用前請注意兩件事情: - -1. 確認自己的Kafka server ip與spark master ip,並且Kafka與spark 有正常運作,關於啟動Kafka 可參考 [run_kafka_broker](run_kafka_broker.md)。 -2. 可使用 [Astraea GUI](../gui/README.md) 來建構測試用途的 `topics`。 - -使用`docker`執行`start_etl`,以下爲範例 - -```bash -# Run Spark-submit -./docker/start_etl.sh master=spark://192.168.103.189:8080 \ -property.file=/home/kafka/spark2kafkaTest/spark2kafka.properties -``` - -### Astraea etl 實驗 - -experiments 資料夾中收錄不同版本的實驗紀錄,主要使用 [performance tool](../performance_benchmark.md) 測試並紀錄數據。 - -* [2022 Dec26](experiments/etl_1.md), 測試 spark->kafka 模組功能及替換[Strict Cost Partitioner](../partitioner/strict_cost_partitioner.md) 的影響 (Astraea revision: [be8c3ffdf35ab0651dfc1a33b5552fd7e3381069](https://github.com/skiptests/astraea/tree/be8c3ffdf35ab0651dfc1a33b5552fd7e3381069)) \ No newline at end of file diff --git a/docs/etl/experiments/etl_1.md b/docs/etl/experiments/etl_1.md deleted file mode 100644 index 8290d49a18..0000000000 --- a/docs/etl/experiments/etl_1.md +++ /dev/null @@ -1,138 +0,0 @@ -# etl 測試 - -此次實驗目的主要是測試 Astraea etl 的三個部分,包括測試 -1. 10GB 資料需要多少時間跑完 -2. 檢查 input output 資料的一致性,例如資料筆數,抽檢資料是否一致 -3. 替換 spark 中的 kafka partitioner 再次測試效能,看有沒有變好 - -## 測試環境 - -### 硬體規格 - -實驗使用6台實體機器,以下皆以代號表示,分別是 B1, B2, B3, B4, M1, C1 ,六台實體機器規格均相同 - -| 硬體 | 品名 | -| ---------- | ------------------------------------------------------------ | -| CPU | Intel i9-12900K 3.2G(5.2G)/30M/UHD770/125W | -| 主機板 | 微星 Z690 CARBON WIFI(ATX/1H1P/Intel 2.5G+Wi-Fi 6E) | -| 記憶體 | 十銓 T-Force Vulcan 32G(16G*2) DDR5-5200 (CL40) | -| 硬碟 | 威剛XPG SX8200Pro 2TB/M.2 2280/讀:3500M/寫:3000M/TLC/SMI控 * 2 | -| 散熱器 | NZXT Kraken Z53 24cm水冷排/2.4吋液晶冷頭/6年/厚:5.6cm | -| 電源供應器 | 海韻 FOCUS GX-850(850W) 雙8/金牌/全模組 | -| 網卡 | Marvell AQtion 10Gbit Network Adapter | - -### 網路拓樸 - -``` - switch(10G) -┌─────┬─────┬─────┬─────┬─────┐ -B1 B2 B3 S1 S2 C1 -``` - -### 軟體版本 - -| 軟體 | 版本(/image ID) | -| ---------------------- |------------------------------------------| -| 作業系統 | ubuntu-20.04.3-live-server-amd64 | -| Astraea revision | 75bcc3faa39864d5ec5f5ed530346184e79fc0c9 | -| Zookeeper version | 3.8.0 | -| Apache Kafka version | 3.3.1 | -| Java version | OpenJDK 11 | -| Docker version | 20.10.17, build 100c701 | -| grafana image ID | b6ea013786be | -| prometheus version | v2.32.1 | -| node-exporter image ID | 1dbe0e931976 | - -實驗執行軟體 - -| 執行軟體 | B1 | B2 | B3 | S1 | S2 | C1 | -| ------------------------ | :--: | :--: | :--: | :--: | :--: | :--: | -| Spark master | | | | V | | | -| Spark worker | | | | V | V | | -| Zookeeper | V | | | | | | -| Kafka Broker | V | V | V | | | | -| Node Exporter | V | V | V | | | | -| Prometheus | | | V | | | | -| Grafana | | | V | | | | -| Astraea Performance tool | | | | | | V | - -## 測試情境 - -整個實驗分爲兩種情景,在普通情景下測試與在kafka叢集中一臺broker,網路延遲較高、且網路頻寬較低的不平衡情境下進行測試。 - -測試流程:兩種情景都需要首先啓動spark cluster。可以參考[start_spark](../../run_spark.md)啓動spark cluster. - - -### 普通情景 - -在普通情景下,只會用到上述的五臺機器{B1, B2, B3, S1, S2}。本情景將測試10GB 資料需要多少時間跑完與檢查 input output 資料的一致性。 - -#### 測試10GB資料需要多少時間跑完 - -```bash -# Run Spark-submit -./docker/start_etl.sh master=spark://192.168.103.189:8080 \ -property.file=/home/kafka/spark2kafkaTest/spark2kafka.properties -``` - -從圖中可以看出10GB的資料處理完畢需要花費2分55秒 -![processing time](../../pictures/etl_experiment_1_1.png) - -#### 檢查 input output 資料的一致性 - -10GB資料共98090266筆資料。 -![number of records](../../pictures/etl_experiment_1_2.png) - -當資料經過etl處理完畢發往指定topic後。通過subscribe topic來消費所有該topic下的資料,以測定資料是否有缺失。 -可以看到資料的筆數是相同的。 -![number of consumer records](../../pictures/etl_experiment_1_3.png) - -抽查對比consumer到的資料,各欄位資料內容也與原資料一致,所以資料一致性方面未發現問題。 -![record comparison](../../pictures/etl_experiment_1_8.png) - -### 不平衡情景 -#### 不平衡情景下會誘發的問題 -以下圖爲例 - -實驗環境:testTopic用來接受etl產生的資料,分佈於B1, B2, B3 - costTopic用來對kafka叢集中的單一節點造成負載,分佈於B1 - -圖中左側爲不平衡情景,右側爲普通情景,方便直觀感受差別 -costTopic: 接受使一個節點較忙碌的資料。它只分布在B1上。 -testTopic: etl產生的資料會發往該topic。它分布在B1, B2, B3上。 -圖中的testTopic有三個是因為它顯示了該topic在三個節點中各自的流量。 -而costTopic之所以只有一個是因為只有B1一個節點接收到資料。 - -左側實驗開始時先向costTopic發送資料,使其到達節點的頻寬上線。在一段時間後啓動etl,可以看到因爲etl發送資料分走了原先costTopic所佔據的頻寬,造成其效能下降。等到etl運行完畢costTopic的效能恢復到開始狀態。 -左側數據處理完畢共花費3分40秒 - -右側實驗在普通情景下的效能即每秒處理的資料量要明顯高於左側,數據處理完畢的時間也更短總共花費3分鐘。這證明在kafka叢集不平衡的情景下,會影響到etl的效能。 -![imbalance kafka cluster](../../pictures/etl_experiment_1_9.png) - -#### 實驗過程 -在該情景下會用到上述的全部六臺機器,同時B1, B2, B3的網路頻寬將被設置爲2.5G,確保etl效能的變化在叢集高負載的情況下會有較明顯的體現。 -其中C1將被用來向B1發送資料,以確保B1處在高負載的狀態。 - -使用default partitioner進行測試 -不替換時總共花費了4min處理完畢數據 - -![processing time of default partitioner](../../pictures/etl_experiment_1_4.png) - -替換 spark 中的 kafka partitioner進行測試 -替換partitioner後的花費5min處理完畢數據 - -![processing time of strict partitioner](../../pictures/etl_experiment_1_5.png) - -所以結論替換partitioner後反而變成了負優化,進一步觀察分布在各Broker partition的offset。處在負載最重的B1的partitions反而吃到了最多的資料。這與設想的不符合,這一問題還需要再探究其發生原因。 - -![partition offset of strict partitioner](../../pictures/etl_experiment_1_6.png) - -## 結論 - -在普通情景下,擁有兩個worker的spark cluster中,使用standalone mode 啓動 astraea etl ,處理資料的平均速率爲58.5MB/s。 - -在kafka叢集不平衡情境下,etl的效能會收到影響,這會導致處理相同的資料要花費更多的時間表。以下爲該情景下,替換partitioner前後的效能對比。 - -| 吞吐/延遲比較 | default partitioner | strict partitioner | 改善 | -| ------------- |---------------------|--------------------| ---------------------------- | -| 吞吐量 | 42.7 MB/second | 34.1 MiB/second | 平均吞吐提升:約 0.80 倍 | diff --git a/docs/run_spark.md b/docs/run_spark.md deleted file mode 100644 index becc90fa61..0000000000 --- a/docs/run_spark.md +++ /dev/null @@ -1,14 +0,0 @@ -#TODO astraea#1365 撰寫start_spark.sh中文文件 - -```bash -# Run Spark Master -SPARK_PORT=8080 SPARK_UI_PORT=7077 docker/start_spark.sh \ -folder=/home/kafka/spark2kafkaTest/ImportcsvTest/source:/home/kafka/spark2kafkaTest/ImportcsvTest/source -``` - -```bash -# Run Spark Worker -SPARK_PORT=8081 SPARK_UI_PORT=7078 docker/start_spark.sh \ -master=spark://192.168.103.189:8080 \ -folder=/home/kafka/spark2kafkaTest/ImportcsvTest/source:/home/kafka/spark2kafkaTest/ImportcsvTest/source -``` \ No newline at end of file diff --git a/etl/build.gradle b/etl/build.gradle deleted file mode 100644 index c99e2b5df7..0000000000 --- a/etl/build.gradle +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -plugins { - id "com.github.johnrengelman.shadow" version "8.1.1" - id 'scala' -} - -apply from: "$rootDir/gradle/dependencies.gradle" - -repositories { - // Use Maven Central for resolving dependencies. - mavenCentral() -} - - -dependencies { - // Gradle module metadata for jackson-databind 2.13.4.1 references non-existent jackson-bom. - // Unfortunately, spark 3.3.1 uses this version. - // This will be removed when spark is updated to 3.3.2. https://issues.apache.org/jira/browse/SPARK-40886 - compileOnly 'com.fasterxml.jackson:jackson-bom:2.13.4.20221013' - // Astraea etl is run by spark framework, so we don't need to package spark-related dependencies - compileOnly libs["scala"] - compileOnly libs["spark-sql"] - compileOnly libs["spark-kafka"] - compileOnly libs["jackson-databind"] - implementation libs["kafka-client"] - implementation project(':common') - - - testImplementation libs["junit"] - // This will be removed when spark is updated to 3.3.2. https://issues.apache.org/jira/browse/SPARK-40886 - testImplementation enforcedPlatform('com.fasterxml.jackson:jackson-bom:2.13.4.20221013') - // there are unit tests requiring spark, so we add them back for test scope - testImplementation libs["spark-sql"] - testImplementation libs["spark-kafka"] - testImplementation project(':it') -} - -ext { - numberOfForks = project.hasProperty('maxParallelForks') ? maxParallelForks.toInteger() : Math.max((int) (Runtime.runtime.availableProcessors() / 2), 1) -} - -archivesBaseName = "astraea-etl" - -tasks.named('test') { - // spark can't run with JDK 17 as it uses internal class - // see https://lists.apache.org/thread/814cpb1rpp73zkhtv9t4mkzzrznl82yn - jvmArgs '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED' - // Use JUnit Platform for unit tests. - useJUnitPlatform() - - maxParallelForks = numberOfForks - // make isolation for tests. It may be expensive but stability is first choice. - forkEvery = 1 - testLogging { - events "PASSED", "STARTED", "FAILED", "SKIPPED" - exceptionFormat = 'full' - } - - minHeapSize = "1024m" - maxHeapSize = "2048m" -} \ No newline at end of file diff --git a/etl/src/main/scala/org/astraea/etl/DataColumn.scala b/etl/src/main/scala/org/astraea/etl/DataColumn.scala deleted file mode 100644 index 552ccca173..0000000000 --- a/etl/src/main/scala/org/astraea/etl/DataColumn.scala +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -case class DataColumn(name: String, isPk: Boolean = false, dataType: DataType) diff --git a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala b/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala deleted file mode 100644 index 18d85ccf2d..0000000000 --- a/etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -import org.apache.spark.sql.expressions.UserDefinedFunction -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{StructField, StructType} -import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.astraea.common.json.JsonConverter - -import scala.jdk.CollectionConverters._ -import scala.language.implicitConversions - -class DataFrameProcessor(dataFrame: DataFrame) { - - val defaultConverter: UserDefinedFunction = - udf[String, Map[String, String]]((value: Map[String, String]) => { - JsonConverter - .jackson() - .toJson( - value.asJava - ) - }) - - /** Turn the original DataFrame into a key-value table.Integrate all columns - * into one value->josh. If there are multiple primary keys, key will become - * keyA_keyB_.... - * - * {{{ - * Seq(Person("Michael","A.K", 29), Person("Andy","B.C", 30), Person("Justin","C.L", 19)) - * - * Person - * |-- FirstName: string - * |-- SecondName: string - * |-- Age: Integer - * - * Key:FirstName,SecondName - * - * // +-----------+---------------------------------------------------+ - * // | key| value| - * // +-----------+---------------------------------------------------+ - * // |Michael,A.K|{"FirstName":"Michael","SecondName":"A.K","Age":29}| - * // | Andy,B.C|{"FirstName":"Andy","SecondName":"B.C","Age":30} | - * // | Justin,C.L|{"FirstName":"Justin","SecondName":"C.L","Age":19} | - * // +-----------+---------------------------------------------------+ - * }}} - * - * @param cols - * cols metadata - * @return - * json df - */ - def csvToJSON(cols: Seq[DataColumn]): DataFrameProcessor = { - new DataFrameProcessor( - dataFrame - .withColumn( - "value", - defaultConverter( - map(cols.flatMap(c => List(lit(c.name), col(c.name))): _*) - ) - ) - .withColumn( - "key", - defaultConverter( - map( - cols - .filter(dataColumn => dataColumn.isPk) - .flatMap(c => List(lit(c.name), col(c.name))): _* - ) - ) - ) - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - ) - } - - def dataFrame(): DataFrame = { - dataFrame - } - - def toKafkaWriterBuilder(metadata: Metadata): SparkStreamWriter[Row] = { - SparkStreamWriter.writeToKafka(this, metadata) - } -} - -object DataFrameProcessor { - def builder(sparkSession: SparkSession) = new Builder(sparkSession) - def fromLocalCsv( - sparkSession: SparkSession, - metadata: Metadata - ): DataFrameProcessor = { - builder(sparkSession) - .source(metadata.sourcePath) - .columns(metadata.columns) - .cleanSource(metadata.cleanSource) - .recursiveFileLookup(metadata.recursiveFile) - .sourceArchiveDir(metadata.archivePath) - .buildFromCsv() - } - class Builder( - private val sparkSession: SparkSession - ) { - private val SOURCE_ARCHIVE_DIR = "sourceArchiveDir" - private val CLEAN_SOURCE = "cleanSource" - private val RECURSIVE_FILE_LOOK_UP = "recursiveFileLookup" - - private var _recursiveFileLookup: String = "" - private var _cleanSource: String = "" - private var _source: String = "" - private var _sourceArchiveDir: String = "" - private var _columns: Seq[DataColumn] = Seq.empty - - def recursiveFileLookup(r: String): Builder = { - _recursiveFileLookup = r - this - } - - def cleanSource(c: String): Builder = { - _cleanSource = c - this - } - - def source(s: String): Builder = { - _source = s - this - } - - def columns(c: Seq[DataColumn]): Builder = { - _columns = c - this - } - - def sourceArchiveDir(a: String): Builder = { - _sourceArchiveDir = a - this - } - - def buildFromCsv(): DataFrameProcessor = { - if (_cleanSource == "archive") { - if (_sourceArchiveDir.isBlank) - throw new IllegalArgumentException( - s"$SOURCE_ARCHIVE_DIR is blank.When you set cleanSource to 'archive', you must configure ArchiveDir in spark2kafka config" - ) - } - - val df = sparkSession.readStream - .option(CLEAN_SOURCE, _cleanSource) - .option(SOURCE_ARCHIVE_DIR, _sourceArchiveDir) - .option(RECURSIVE_FILE_LOOK_UP, _recursiveFileLookup) - .schema(schema(_columns)) - .csv(_source) - .filter(row => { - val bool = (0 until row.length).exists(i => !row.isNullAt(i)) - bool - }) - new DataFrameProcessor(df) - } - - private def schema(columns: Seq[DataColumn]): StructType = - StructType(columns.map { col => - if (col.dataType != DataType.StringType) - throw new IllegalArgumentException( - "Sorry, only string type is currently supported.Because a problem(astraea #1286) has led to the need to wrap the non-nullable type." - ) - StructField(col.name, col.dataType.sparkType) - }) - } -} diff --git a/etl/src/main/scala/org/astraea/etl/DataType.scala b/etl/src/main/scala/org/astraea/etl/DataType.scala deleted file mode 100644 index de776c4707..0000000000 --- a/etl/src/main/scala/org/astraea/etl/DataType.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -sealed abstract class DataType( - typeName: String, - sparkDataType: org.apache.spark.sql.types.DataType -) extends Serializable { - def name: String = { - typeName - } - - def sparkType: org.apache.spark.sql.types.DataType = { - sparkDataType - } -} - -/** Column Types supported by astraea partitioner. */ -object DataType { - private val STRING_TYPE = "string" - private val BOOLEAN_TYPE = "boolean" - private val DATE_TYPE = "date" - private val DOUBLE_TYPE = "double" - private val BYTE_TYPE = "byte" - private val INTEGER_TYPE = "integer" - private val LONG_TYPE = "long" - private val SHORT_TYPE = "short" - private val TIMESTAMP_TYPE = "timestamp" - - case object StringType - extends DataType(STRING_TYPE, org.apache.spark.sql.types.StringType) - case object BooleanType - extends DataType(BOOLEAN_TYPE, org.apache.spark.sql.types.BooleanType) - case object DateType - extends DataType(DATE_TYPE, org.apache.spark.sql.types.DateType) - case object DoubleType - extends DataType(DOUBLE_TYPE, org.apache.spark.sql.types.DoubleType) - case object ByteType - extends DataType(BYTE_TYPE, org.apache.spark.sql.types.ByteType) - case object IntegerType - extends DataType(INTEGER_TYPE, org.apache.spark.sql.types.IntegerType) - case object LongType - extends DataType(LONG_TYPE, org.apache.spark.sql.types.LongType) - case object ShortType - extends DataType(SHORT_TYPE, org.apache.spark.sql.types.ShortType) - case object TimestampType - extends DataType(TIMESTAMP_TYPE, org.apache.spark.sql.types.TimestampType) - - /** @param str - * String that needs to be parsed as a DataType. - * @return - * DataType - */ - def of(str: String): DataType = { - val value = all.filter(_.name == str) - if (value.isEmpty) { - throw new IllegalArgumentException( - s"$str is not supported data type.The data types supported ${all.mkString(",")}." - ) - } - value.head - } - - /** @param map - * A set of string needs to be parsed as DataType - * @return - * Map[String, DataType] - */ - def of(map: Map[String, String]): Map[String, DataType] = { - map.map(x => (x._1, of(x._2))) - } - - /** @return - * All supported data types. - */ - def all: Seq[DataType] = { - Seq( - StringType, - BooleanType, - DateType, - DoubleType, - ByteType, - IntegerType, - LongType, - ShortType, - TimestampType - ) - } -} diff --git a/etl/src/main/scala/org/astraea/etl/Metadata.scala b/etl/src/main/scala/org/astraea/etl/Metadata.scala deleted file mode 100644 index fe012c7adb..0000000000 --- a/etl/src/main/scala/org/astraea/etl/Metadata.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -import java.io.File -import java.nio.file.{Files, Path} -import java.util.Properties -import scala.jdk.CollectionConverters._ -import scala.util.Using - -/** Parameters required for Astraea ETL. - * - * @param sourcePath - * The data source path should be a directory. - * @param columns - * The CSV Column metadata.Contains column name, data type and is pk or not. - * @param kafkaBootstrapServers - * The Kafka bootstrap servers. - * @param topicName - * Set your topic name, if it is empty that will set it to "'spark'-'current - * time by hourDay'-'random number'". - * @param numberOfPartitions - * Set the number of topic partitions, if it is empty that will set it to 15. - * @param numberOfReplicas - * Set the number of topic replicas, if it is empty that will set it to 3. - * @param checkpoint - * Spark checkpoint path. - */ -case class Metadata private ( - sourcePath: String, - checkpoint: String, - columns: Seq[DataColumn], - kafkaBootstrapServers: String, - topicName: String, - topicConfigs: Map[String, String], - numberOfPartitions: Int, - numberOfReplicas: Short, - cleanSource: String, - recursiveFile: String, - archivePath: String -) - -object Metadata { - private[etl] val ARCHIVE_PATH = "archive.path" - private[etl] val SOURCE_PATH_KEY = "source.path" - private[etl] val CHECKPOINT_KEY = "checkpoint" - private[etl] val COLUMN_NAME_KEY = "column.names" - private[etl] val COLUMN_TYPES_KEY = "column.types" - private[etl] val CLEAN_SOURCE = "clean.source" - private[etl] val PRIMARY_KEY_KEY = "primary.keys" - private[etl] val KAFKA_BOOTSTRAP_SERVERS_KEY = "kafka.bootstrap.servers" - private[etl] val TOPIC_NAME_KEY = "topic.name" - private[etl] val TOPIC_CONFIGS_KEY = "topic.configs" - private[etl] val TOPIC_PARTITIONS_KEY = "topic.partitions" - private[etl] val TOPIC_REPLICAS_KEY = "topic.replicas" - private[etl] val RECURSIVE_FILE = "recursive.file" - - private[etl] val DEFAULT_PARTITIONS = 15 - private[etl] val DEFAULT_REPLICAS = 1.toShort - private[etl] val DEFAULT_RECURSIVE = "ture" - private[etl] val DEFAULT_CLEAN_SOURCE = "delete" - - // Parameters needed to configure ETL. - def of(path: Path): Metadata = { - // remove the empty/blank value - val properties = - readProp(path).asScala.filter(_._2.nonEmpty).filterNot(_._2.isBlank) - val columnNames = properties(COLUMN_NAME_KEY).split(",").toSeq - if (columnNames.isEmpty) - throw new IllegalArgumentException("columns must be defined") - val pks = properties - .get(PRIMARY_KEY_KEY) - .map(_.split(",").toSet) - .getOrElse(columnNames.toSet) - val types = properties - .get(COLUMN_TYPES_KEY) - .map(s => - s.split(",") - .map(t => DataType.of(t)) - .toSeq - ) - .getOrElse(Seq.empty) - Metadata( - sourcePath = properties(SOURCE_PATH_KEY), - checkpoint = properties.getOrElse( - CHECKPOINT_KEY, - Files.createTempDirectory("astraea-etl").toString - ), - columns = columnNames.zipWithIndex.map { case (c, index) => - DataColumn( - name = c, - isPk = pks.contains(c), - dataType = if (types.isEmpty) DataType.StringType else types(index) - ) - }, - kafkaBootstrapServers = properties(KAFKA_BOOTSTRAP_SERVERS_KEY), - topicName = - properties.getOrElse(TOPIC_NAME_KEY, "astraea-etl-" + Math.random()), - topicConfigs = properties - .get(TOPIC_CONFIGS_KEY) - .map(s => - s.split(",") - .map(item => item.split("=")(0) -> item.split("=")(1)) - .toMap - ) - .getOrElse(Map.empty), - numberOfPartitions = properties - .get(TOPIC_PARTITIONS_KEY) - .map(_.toInt) - .getOrElse(DEFAULT_PARTITIONS), - numberOfReplicas = properties - .get(TOPIC_REPLICAS_KEY) - .map(_.toShort) - .getOrElse(DEFAULT_REPLICAS), - cleanSource = properties.getOrElse(CLEAN_SOURCE, DEFAULT_CLEAN_SOURCE), - recursiveFile = properties.getOrElse(RECURSIVE_FILE, DEFAULT_RECURSIVE), - archivePath = properties.getOrElse(ARCHIVE_PATH, "") - ) - } - - private[this] def readProp(path: Path): Properties = { - val properties = new Properties() - Using(scala.io.Source.fromInputStream(Files.newInputStream(path))) { - bufferedSource => - properties.load(bufferedSource.reader()) - } - properties - } -} diff --git a/etl/src/main/scala/org/astraea/etl/Spark2Kafka.scala b/etl/src/main/scala/org/astraea/etl/Spark2Kafka.scala deleted file mode 100644 index 6e7afeae6d..0000000000 --- a/etl/src/main/scala/org/astraea/etl/Spark2Kafka.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -import org.apache.spark.sql.SparkSession -import org.astraea.common.admin.Admin - -import java.io.File -import java.nio.file.Path -import scala.concurrent.duration.Duration -import scala.jdk.CollectionConverters._ -import scala.util.Using - -object Spark2Kafka { - def executor( - sparkSession: SparkSession, - metadata: Metadata, - duration: Duration - ): Unit = { - Using(Admin.of(metadata.kafkaBootstrapServers))( - _.creator() - .topic(metadata.topicName) - .configs(metadata.topicConfigs.asJava) - .numberOfPartitions(metadata.numberOfPartitions) - .numberOfReplicas(metadata.numberOfReplicas) - .run() - .toCompletableFuture - .join() - ) - - DataFrameProcessor - .fromLocalCsv(sparkSession, metadata) - .csvToJSON(metadata.columns) - .toKafkaWriterBuilder(metadata) - .start(duration) - } - - def main(args: Array[String]): Unit = { - executor( - SparkSession - .builder() - .appName("astraea etl") - .getOrCreate(), - Metadata.of(Path.of(args(0))), - Duration("1000 seconds") - ) - } -} diff --git a/etl/src/main/scala/org/astraea/etl/SparkStreamWriter.scala b/etl/src/main/scala/org/astraea/etl/SparkStreamWriter.scala deleted file mode 100644 index 12d8fa2051..0000000000 --- a/etl/src/main/scala/org/astraea/etl/SparkStreamWriter.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.spark.sql.Row -import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode} - -import scala.concurrent.duration.Duration -class SparkStreamWriter[T](dataStreamWriter: DataStreamWriter[T]) { - def start(duration: Duration): Boolean = { - dataStreamWriter - .start() - .awaitTermination(duration.toMillis) - } -} - -object SparkStreamWriter { - def builder(dataFrameProcessor: DataFrameProcessor) = - new Builder(dataFrameProcessor) - - def writeToKafka( - dataFrameProcessor: DataFrameProcessor, - metadata: Metadata - ): SparkStreamWriter[Row] = { - builder(dataFrameProcessor) - .target(metadata.topicName) - .checkpoint(metadata.checkpoint) - .buildToKafka(metadata.kafkaBootstrapServers) - } - class Builder( - var dataFrameProcessor: DataFrameProcessor - ) { - private var _target: String = "" - private var _checkpoint: String = "" - - def buildToKafka( - bootstrap: String - ): SparkStreamWriter[Row] = { - new SparkStreamWriter[Row]( - dataFrameProcessor - .dataFrame() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .writeStream - .outputMode(OutputMode.Append()) - .format("kafka") - .option( - "kafka." + - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - bootstrap - ) - // Spark to kafka transfer support for StringSerializer and ByteSerializer in spark 3.3.0 . - .option( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringSerializer" - ) - .option( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringSerializer" - ) - .option("topic", _target) - .option(ProducerConfig.ACKS_CONFIG, "all") - .option(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") - .option("checkpointLocation", _checkpoint) - ) - } - - /** Target represents the destination of the data, for Kafka it represents - * the topic, for other targets it can represent the data path. - * - * @param target - * destination of the data - * @return - * Writer - */ - def target(target: String): Builder = { - _target = target - this - } - - def checkpoint( - checkpoint: String - ): Builder = { - _checkpoint = checkpoint - this - } - } -} diff --git a/etl/src/main/scala/org/astraea/etl/Utils.scala b/etl/src/main/scala/org/astraea/etl/Utils.scala deleted file mode 100644 index 84849758e7..0000000000 --- a/etl/src/main/scala/org/astraea/etl/Utils.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -import org.astraea.common.admin.Admin - -import java.awt.geom.IllegalPathStateException -import java.io.File -import java.nio.file.{Files, Path} -import java.util.concurrent.CompletionStage -import scala.concurrent.{Future, Promise} -import scala.util.control.NonFatal -import scala.jdk.CollectionConverters._ - -object Utils { - def requireFolder(path: String): Path = { - val file = Path.of(path) - if (!Files.isDirectory(file)) { - throw new IllegalPathStateException( - s"$path is not a folder. The path should be a folder." - ) - } - file - } - - def requireFile(path: String): Path = { - val file = Path.of(path) - if (!Files.isRegularFile(file)) { - throw new IllegalPathStateException( - s"$path is not a file. The file does not exist." - ) - } - file - } - -} diff --git a/etl/src/test/scala/org/astraea/etl/DataFrameProcessorTest.scala b/etl/src/test/scala/org/astraea/etl/DataFrameProcessorTest.scala deleted file mode 100644 index 0bbd6ee6a6..0000000000 --- a/etl/src/test/scala/org/astraea/etl/DataFrameProcessorTest.scala +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SparkSession} -import org.astraea.etl.DataType.{IntegerType, StringType} -import org.astraea.etl.FileCreator.{createCSV, generateCSVF, getCSVFile} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.Test - -import java.io._ -import java.nio.file.{Files, Path} -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.Duration -import scala.jdk.CollectionConverters._ - -class DataFrameProcessorTest { - @Test - def skipBlankLineTest(): Unit = { - val sourceDir = Files.createTempDirectory("source") - val dataDir = Files.createTempDirectory("data") - val checkoutDir = Files.createTempDirectory("checkpoint") - - val columnOne: List[String] = - List("A1", "B1", null, "D1") - val columnTwo: List[String] = - List("52", "36", null, "25") - val columnThree: List[String] = - List("fghgh", "gjgbn", null, "dfjf") - - val row = columnOne - .zip(columnTwo.zip(columnThree)) - .foldLeft(List.empty[List[String]]) { case (acc, (a, (b, c))) => - List(a, b, c) +: acc - } - .reverse - - createCSV(sourceDir, row, 0) - - val df = DataFrameProcessor.fromLocalCsv( - createSpark(), - Metadata( - sourcePath = sourceDir.toAbsolutePath.toString, - checkpoint = "", - columns = Seq( - DataColumn("RecordNumber", true, StringType), - DataColumn("Size", true, StringType), - DataColumn("Type", true, StringType) - ), - kafkaBootstrapServers = "", - topicName = "", - topicConfigs = Map.empty, - numberOfPartitions = 10, - numberOfReplicas = 1, - cleanSource = "delete", - recursiveFile = "true", - archivePath = "" - ) - ) - - df.dataFrame() - .writeStream - .format("csv") - .option("path", dataDir.toAbsolutePath.toString) - .option("checkpointLocation", checkoutDir.toAbsolutePath.toString) - .outputMode("append") - .start() - .awaitTermination(Duration(20, TimeUnit.SECONDS).toMillis) - - val writeFile = getCSVFile(dataDir).head - val lines = Files.readAllLines(writeFile) - - assertEquals("A1,52,fghgh", lines.get(0)) - assertEquals("B1,36,gjgbn", lines.get(1)) - assertEquals("D1,25,dfjf", lines.get(2)) - } - - @Test - def sparkReadCSVTest(): Unit = { - val sourceDir = Files.createTempDirectory("source") - generateCSVF(sourceDir, rows) - - val checkoutDir = Files.createTempDirectory("checkpoint") - val dataDir = Files.createTempDirectory("data") - - val csvDF = DataFrameProcessor.fromLocalCsv( - createSpark(), - Metadata( - sourcePath = sourceDir.toAbsolutePath.toString, - checkpoint = "", - columns = Seq( - DataColumn("RecordNumber", true, StringType), - DataColumn("Size", true, StringType), - DataColumn("Type", true, StringType) - ), - kafkaBootstrapServers = "", - topicName = "", - topicConfigs = Map.empty, - numberOfPartitions = 10, - numberOfReplicas = 1, - cleanSource = "delete", - recursiveFile = "true", - archivePath = "" - ) - ) - - assertTrue( - csvDF.dataFrame().isStreaming, - "sessions must be a streaming Dataset" - ) - - csvDF - .dataFrame() - .writeStream - .format("csv") - .option("path", dataDir.toAbsolutePath.toString) - .option("checkpointLocation", checkoutDir.toAbsolutePath.toString) - .outputMode("append") - .start() - .awaitTermination(Duration(20, TimeUnit.SECONDS).toMillis) - - val writeFile = getCSVFile(dataDir).head - val lines = Files.readAllLines(writeFile) - assertEquals("A1,52,fghgh", lines.get(0)) - assertEquals("B1,36,gjgbn", lines.get(1)) - assertEquals("C1,45,fgbhjf", lines.get(2)) - assertEquals("D1,25,dfjf", lines.get(3)) - } - - @Test - def csvToJSONTest(): Unit = { - val spark = SparkSession - .builder() - .master("local[2]") - .appName("Astraea ETL") - .getOrCreate() - import spark.implicits._ - - val columns = Seq( - DataColumn("name", isPk = true, dataType = StringType), - DataColumn("age", isPk = false, dataType = IntegerType) - ) - - val result = new DataFrameProcessor( - Seq(("Michael", 29)).toDF().toDF("name", "age") - ).csvToJSON(columns) - .dataFrame() - .collectAsList() - .asScala - .map(row => (row.getAs[String]("key"), row.getAs[String]("value"))) - .toMap - - assertEquals(1, result.size) - assertEquals( - "{\"age\":\"29\",\"name\":\"Michael\"}", - result("{\"name\":\"Michael\"}") - ) - - val resultExchange = new DataFrameProcessor( - Seq((29, "Michael")).toDF().toDF("age", "name") - ).csvToJSON(columns) - .dataFrame() - .collectAsList() - .asScala - .map(row => (row.getAs[String]("key"), row.getAs[String]("value"))) - .toMap - - assertEquals(1, resultExchange.size) - assertEquals( - "{\"age\":\"29\",\"name\":\"Michael\"}", - resultExchange("{\"name\":\"Michael\"}") - ) - } - - @Test - def csvToJsonMulKeysTest(): Unit = { - val spark = SparkSession - .builder() - .master("local[2]") - .appName("Astraea ETL") - .getOrCreate() - import spark.implicits._ - val columns = Seq( - DataColumn("firstName", isPk = true, DataType.StringType), - DataColumn("secondName", isPk = true, DataType.StringType), - DataColumn("age", isPk = false, dataType = IntegerType) - ) - val result = new DataFrameProcessor( - Seq(("Michael", "A", 29)).toDF().toDF("firstName", "secondName", "age") - ).csvToJSON(columns) - .dataFrame() - .collectAsList() - .asScala - .map(row => (row.getAs[String]("key"), row.getAs[String]("value"))) - .toMap - - assertEquals(1, result.size) - assertEquals( - "{\"age\":\"29\",\"firstName\":\"Michael\",\"secondName\":\"A\"}", - result("{\"firstName\":\"Michael\",\"secondName\":\"A\"}") - ) - } - - @Test def csvToJsonNullTest(): Unit = { - val spark = SparkSession - .builder() - .master("local[2]") - .appName("Astraea ETL") - .getOrCreate() - import spark.implicits._ - val columns = Seq( - DataColumn("firstName", isPk = true, DataType.StringType), - DataColumn("secondName", isPk = true, DataType.StringType), - DataColumn("age", isPk = false, dataType = IntegerType) - ) - val result = new DataFrameProcessor( - Seq(("Michael", "A", null)).toDF().toDF("firstName", "secondName", "age") - ).csvToJSON(columns) - .dataFrame() - .collectAsList() - .asScala - .map(row => (row.getAs[String]("key"), row.getAs[String]("value"))) - .toMap - - assertEquals(1, result.size) - assertEquals( - "{\"firstName\":\"Michael\",\"secondName\":\"A\"}", - result("{\"firstName\":\"Michael\",\"secondName\":\"A\"}") - ) - } - - @Test - def jsonToByteTest(): Unit = { - val spark = SparkSession - .builder() - .master("local[2]") - .appName("Astraea ETL") - .getOrCreate() - - var data = Seq(Row("A1", 52, "fghgh", "sfjojs", "zzz", "final", 5)) - (0 to 10000).iterator.foreach(_ => - data = data ++ Seq(Row("A1", 52, "fghgh", "sfjojs", "zzz", "final", 5)) - ) - - val structType = new StructType() - .add("name", "string") - .add("age", "integer") - .add("xx", "string") - .add("yy", "string") - .add("zz", "string") - .add("f", "string") - .add("fInt", "integer") - - val columns = Seq( - DataColumn("name", isPk = false, dataType = StringType), - DataColumn("age", isPk = false, dataType = IntegerType), - DataColumn("xx", isPk = false, dataType = StringType), - DataColumn("yy", isPk = false, dataType = StringType), - DataColumn("zz", isPk = false, dataType = StringType), - DataColumn("f", isPk = false, dataType = StringType), - DataColumn("fInt", isPk = false, dataType = IntegerType) - ) - - val json = new DataFrameProcessor( - spark.createDataFrame(spark.sparkContext.parallelize(data), structType) - ).csvToJSON(columns) - .dataFrame() - .withColumn("byte", col("value").cast("Byte")) - .selectExpr("CAST(byte AS BYTE)") - val head = json.head() - assertTrue(json.filter(_ != head).isEmpty) - } - - private[this] def rows: List[List[String]] = { - val columnOne: List[String] = - List("A1", "B1", "C1", "D1") - val columnTwo: List[String] = - List("52", "36", "45", "25") - val columnThree: List[String] = - List("fghgh", "gjgbn", "fgbhjf", "dfjf") - - columnOne - .zip(columnTwo.zip(columnThree)) - .foldLeft(List.empty[List[String]]) { case (acc, (a, (b, c))) => - List(a, b, c) +: acc - } - .reverse - } - - private def createSpark(): SparkSession = { - SparkSession - .builder() - .master("local[2]") - .appName("Astraea ETL") - .getOrCreate() - } -} diff --git a/etl/src/test/scala/org/astraea/etl/FileCreator.scala b/etl/src/test/scala/org/astraea/etl/FileCreator.scala deleted file mode 100644 index 2cdecc08db..0000000000 --- a/etl/src/test/scala/org/astraea/etl/FileCreator.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -import org.astraea.common.csv.CsvWriter - -import java.io.{BufferedWriter, FileWriter} -import java.nio.file.{Files, Path} -import java.util.concurrent.TimeUnit -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future -import scala.concurrent.duration.Duration -import scala.jdk.CollectionConverters._ -import scala.util.{Failure, Try} -object FileCreator { - def generateCSVF( - sourceDir: Path, - rows: List[List[String]] - ): Future[Boolean] = { - Future { generateCSV(sourceDir, rows) } - } - - def generateCSV(sourceDir: Path, rows: List[List[String]]): Boolean = { - createCSV(sourceDir, rows, 0) - Thread.sleep(Duration(12, TimeUnit.SECONDS).toMillis) - createCSV(sourceDir, rows, 1) - true - } - - def createCSV( - sourceDir: Path, - rows: List[List[String]], - int: Int - ): Try[Unit] = { - val str = - sourceDir.toAbsolutePath.toString + "/local_kafka" + "-" + int + ".csv" - val fileCSV2 = Files.createFile(Path.of(str)) - writeCsvFile(fileCSV2.toAbsolutePath.toString, rows) - } - - def writeCsvFile( - path: String, - rows: List[List[String]] - ): Try[Unit] = - Try( - CsvWriter - .builder(new BufferedWriter(new FileWriter(path))) - .build() - ).flatMap((csvWriter: CsvWriter) => - Try { - rows.map(_.asJava).foreach(csvWriter.append) - csvWriter.close() - } match { - case f @ Failure(_) => - Try(csvWriter.close()).recoverWith { case _ => - f - } - case success => - success - } - ) - - def getCSVFile(file: Path): Seq[Path] = { - Files - .list(file) - .filter(f => Files.isRegularFile(f)) - .filter(f => f.getFileName.toString.endsWith(".csv")) - .iterator() - .asScala - .toSeq - } -} diff --git a/etl/src/test/scala/org/astraea/etl/MetadataTest.scala b/etl/src/test/scala/org/astraea/etl/MetadataTest.scala deleted file mode 100644 index bd6f4ae8b4..0000000000 --- a/etl/src/test/scala/org/astraea/etl/MetadataTest.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -import org.junit.jupiter.api.{Assertions, Test} - -import java.io.{File, FileOutputStream} -import java.nio.file.{Files, Path} -import java.util.Properties -import scala.util.Using - -class MetadataTest { - - @Test - def testTopicConfigs(): Unit = { - val path = save( - Map( - Metadata.SOURCE_PATH_KEY -> "/tmp/aa", - Metadata.TOPIC_NAME_KEY -> "bb", - Metadata.COLUMN_NAME_KEY -> "a,b", - Metadata.KAFKA_BOOTSTRAP_SERVERS_KEY -> "host:1122", - Metadata.TOPIC_CONFIGS_KEY -> "a=b,c=d" - ) - ) - val metadata = Metadata.of(path) - Assertions.assertEquals(2, metadata.topicConfigs.size) - Assertions.assertEquals("b", metadata.topicConfigs("a")) - Assertions.assertEquals("d", metadata.topicConfigs("c")) - } - - @Test - def testDefault(): Unit = { - val path = save( - Map( - Metadata.SOURCE_PATH_KEY -> "/tmp/aa", - Metadata.TOPIC_NAME_KEY -> "bb", - Metadata.COLUMN_NAME_KEY -> "a,b", - Metadata.KAFKA_BOOTSTRAP_SERVERS_KEY -> "host:1122" - ) - ) - val metadata = Metadata.of(path) - - Assertions.assertEquals("/tmp/aa", metadata.sourcePath) - Assertions.assertEquals("host:1122", metadata.kafkaBootstrapServers) - Assertions.assertEquals("bb", metadata.topicName) - Assertions.assertEquals(2, metadata.columns.size) - Assertions.assertEquals("a", metadata.columns.head.name) - Assertions.assertTrue(metadata.columns.head.isPk) - Assertions.assertEquals(DataType.StringType, metadata.columns.head.dataType) - Assertions.assertEquals("b", metadata.columns(1).name) - Assertions.assertTrue(metadata.columns(1).isPk) - Assertions.assertEquals(DataType.StringType, metadata.columns(1).dataType) - } - - private def save(props: Map[String, String]): Path = { - val prop = new Properties - props.foreach { case (k, v) => - prop.put(k, v) - } - val file = Files.createTempFile("test", "props") - Using(Files.newOutputStream(file)) { output => - prop.store(output, null) - } - file - } -} diff --git a/etl/src/test/scala/org/astraea/etl/Spark2KafkaTest.scala b/etl/src/test/scala/org/astraea/etl/Spark2KafkaTest.scala deleted file mode 100644 index 68ef733061..0000000000 --- a/etl/src/test/scala/org/astraea/etl/Spark2KafkaTest.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.spark.sql.SparkSession -import org.astraea.common.consumer.{Consumer, Deserializer} -import org.astraea.etl.FileCreator.generateCSVF -import org.astraea.etl.Metadata.{ - ARCHIVE_PATH, - DEFAULT_RECURSIVE, - RECURSIVE_FILE -} -import org.astraea.etl.Spark2KafkaTest.{COL_NAMES, rows} -import org.astraea.it.Service -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.{AfterAll, BeforeAll, Test} - -import java.nio.file.Files -import java.util -import scala.collection.immutable -import scala.concurrent.duration.Duration -import scala.jdk.CollectionConverters._ - -class Spark2KafkaTest { - @Test - def consumerDataTest(): Unit = { - val topic = new util.HashSet[String] - topic.add("testTopic") - - val consumer = - Consumer - .forTopics(topic) - .bootstrapServers(Spark2KafkaTest.SERVICE.bootstrapServers()) - .keyDeserializer(Deserializer.STRING) - .valueDeserializer(Deserializer.STRING) - .configs( - Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest").asJava - ) - .build() - - val records = - Range - .inclusive(0, 5) - .flatMap(_ => consumer.poll(java.time.Duration.ofSeconds(1)).asScala) - .map(record => (record.key(), record.value())) - .toMap - - val rowData = s2kType(rows) - - records.foreach(records => assertEquals(records._2, rowData(records._1))) - } - - def s2kType(rows: List[List[String]]): Map[String, String] = { - val colNames = - COL_NAMES.split(",").map(_.split("=")).map(elem => elem(0)).toSeq - Range - .inclusive(0, 3) - .map(i => - ( - s"""{"${colNames.head}":"${rows( - i - ).head}","${colNames(1)}":"${rows(i)(1)}"}""", - s"""{"${colNames(2)}":"${rows( - i - )( - 2 - )}","${colNames.head}":"${rows( - i - ).head}","${colNames(1)}":"${rows(i)(1)}"}""" - ) - ) - .toMap - } -} - -object Spark2KafkaTest { - private val SERVICE = Service.builder.numberOfBrokers(3).build() - - @AfterAll - private def closeService(): Unit = SERVICE.close() - - private val COL_NAMES = - "FirstName=string,SecondName=string,Age=integer" - - @BeforeAll - def setup(): Unit = { - val sourceDir = Files.createTempDirectory("source") - val checkoutDir = Files.createTempDirectory("checkpoint") - generateCSVF(sourceDir, rows) - - val metadata = Metadata( - sourcePath = sourceDir.toAbsolutePath.toString, - checkpoint = checkoutDir.toAbsolutePath.toString, - columns = immutable.Seq( - DataColumn( - name = "FirstName", - isPk = true, - dataType = DataType.StringType - ), - DataColumn( - name = "SecondName", - isPk = true, - dataType = DataType.StringType - ), - DataColumn(name = "Age", dataType = DataType.StringType) - ), - kafkaBootstrapServers = SERVICE.bootstrapServers(), - topicName = "testTopic", - topicConfigs = Map("compression.type" -> "lz4"), - numberOfPartitions = 10, - numberOfReplicas = 1, - cleanSource = "delete", - recursiveFile = "true", - archivePath = "" - ) - - Spark2Kafka.executor( - SparkSession - .builder() - .master("local[2]") - .getOrCreate(), - metadata, - Duration("10 seconds") - ) - } - - private def rows: List[List[String]] = { - val columnOne: List[String] = - List("Michael", "Andy", "Justin", "") - val columnTwo: List[String] = - List("A.K", "B.C", "C.L", "") - val columnThree: List[String] = - List("29", "30", "19", "") - - columnOne - .zip(columnTwo.zip(columnThree)) - .foldLeft(List.empty[List[String]]) { case (acc, (a, (b, c))) => - List(a, b, c) +: acc - } - .reverse - } -} diff --git a/etl/src/test/scala/org/astraea/etl/UtilsTest.scala b/etl/src/test/scala/org/astraea/etl/UtilsTest.scala deleted file mode 100644 index 99b9fee374..0000000000 --- a/etl/src/test/scala/org/astraea/etl/UtilsTest.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.etl - -import org.junit.jupiter.api.Assertions.assertThrows -import org.junit.jupiter.api.Test - -import java.awt.geom.IllegalPathStateException -class UtilsTest { - @Test def requireFolderTest(): Unit = { - assertThrows( - classOf[IllegalPathStateException], - () => Utils.requireFolder("??") - ) - } -} diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 87cd3985e7..af67be773c 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -32,9 +32,7 @@ def versions = [ kafka : project.properties['kafka.version'] ?: "3.7.0", mockito : project.properties['mockito.version'] ?: "5.11.0", "mockito-inline" : project.properties['mockito-inline.version'] ?: "5.2.0", - scala : project.properties['scala.version'] ?: "2.13.13", slf4j : project.properties['slf4j.version'] ?: "2.0.12", - spark : project.properties['spark.version'] ?: "3.5.1", hadoop : project.properties["hadoop.version"] ?: "3.3.6", ] @@ -59,10 +57,7 @@ libs += [ "kafka-metadata" : "org.apache.kafka:kafka-metadata:${versions["kafka"]}", "mockito-core" : "org.mockito:mockito-core:${versions["mockito"]}", "mockito-inline" : "org.mockito:mockito-inline:${versions["mockito-inline"]}", - scala : "org.scala-lang:scala-library:${versions["scala"]}", "slf4j-nop" : "org.slf4j:slf4j-nop:${versions["slf4j"]}", - "spark-kafka" : "org.apache.spark:spark-sql-kafka-0-10_2.13:${versions["spark"]}", - "spark-sql" : "org.apache.spark:spark-sql_2.13:${versions["spark"]}", "hadoop-common" : "org.apache.hadoop:hadoop-common:${versions["hadoop"]}", "hadoop-client" : "org.apache.hadoop:hadoop-client:${versions["hadoop"]}", "hadoop-minicluster" : "org.apache.hadoop:hadoop-minicluster:${versions["hadoop"]}", diff --git a/settings.gradle b/settings.gradle index 2af44f31e2..a227433177 100644 --- a/settings.gradle +++ b/settings.gradle @@ -19,4 +19,4 @@ pluginManagement { } } rootProject.name = 'astraea' -include('app', "common", "connector", "etl", "fs", "gui", "it") +include('app', "common", "connector", "fs", "gui", "it")