-
Notifications
You must be signed in to change notification settings - Fork 56
Home
This project implements a remote shuffle service for batch data processing of Flink. By adopting the storage and compute separation architecture, it brings several important benefits:
-
The scale up/down of computing resources and storage resources is now decoupled which means you can scale each up/down on demand freely.
-
Compute and storage stability never influence each other anymore. The remote shuffle service is free of user-code which can improve shuffle stability. For example, the termination of
TaskExecutor
s will not lead to data loss and the termination of remoteShuffleWorker
s is tolerable. -
By offloading the data shuffle work to the remote shuffle service, the computation resources can be released immediately after the upstream map tasks finish which can save resources.
In addition, the remote shuffle implementation borrows some good designs from Flink which can benefit both stability and performance, for example:
-
Managed memory is preferred. Both the storage and network memory are managed which can significantly solve the OutOfMemory issue.
-
The credit-based backpressure mechanism is adopted which is good for both network stability and performance.
-
The zero-copy network data transmission is implemented which can save memory and is also good for stability and performance.
Besides, there are other important optimizations like load balancing and better sequential IO ( benefiting from the centralized service per node), tcp connection reuse, shuffle data compression, adaptive execution (together with FLIP-187), etc.
The remote shuffle system can support standalone, yarn and k8s deployment. Before going open source, it has been used in production widely and behaves well on both stability and performance. Hope you enjoy it.
The remote shuffle service works together with Flink 1.14+. Some patches are needed to be applied to Flink to support lower Flink versions. If you need any help on that, please let us know, we can offer some help to prepare the patches for the Flink version you use.
To build this flink remote shuffle project from source, you should first clone the project:
git clone [email protected]:flink-extended/flink-remote-shuffle.git
Then you can build the project using maven (Maven and Java 8 required):
cd flink-remote-shuffle # switch to the remote shuffle project home directory
mvn package -DskipTests
After finish, you can find the target distribution in the build-target folder. Note that if you want to run tests locally, we suggest you to run mvn install -DskipTests
first to avoid potential failures.
For k8s deployment, you can run the following command to build the docker image (Docker required):
cd flink-remote-shuffle # switch to the remote shuffle project home directory
sh ./tools/build_docker_image.sh
You can also publish the docker image by running the following command. The script that publishes the docker image takes three arguments: the first one is the registry address (default value is 'docker.io'), the second one is the namespace (default value is 'flinkremoteshuffle'), the third one is the repository name (default value is 'flink-remote-shuffle').
cd flink-remote-shuffle # switch to the remote shuffle project home directory
sh ./tools/publish_docker_image.sh REGISTRY NAMESPACE REPOSITORY
After building the code from source, you can start and run a demo flink batch job using the remote shuffle service locally (Flink 1.14+ required):
As the first step, you can download the Flink distribution from the Flink's download page, for example, Apache Flink 1.14.0 for Scala 2.11:
wget https://dlcdn.apache.org/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.11.tgz
tar zxvf flink-1.14.0-bin-scala_2.11.tgz
Then after building the remote shuffle project from source, you can copy the shuffle plugin jar file from build-target/lib directory (for example, build-target/lib/shuffle-plugin-1.0-SNAPSHOT.jar) to the Flink lib directory and copy the build-in example job jar file to the flink home directory ( flink-1.14.0):
cp flink-remote-shuffle/build-target/lib/shuffle-plugin-1.0-SNAPSHOT.jar flink-1.14.0/lib/
cp flink-remote-shuffle/build-target/examples/BatchJobDemo.jar flink-1.14.0/
After that, you can start a local remote shuffle cluster by running the following command:
cd flink-remote-shuffle # switch to the remote shuffle project home directory
cd build-target # run after building from source
./bin/start-cluster.sh -D remote-shuffle.storage.local-data-dirs="[HDD]/tmp/" -D remote-shuffle.memory.data-writing-size=256m -D remote-shuffle.memory.data-reading-size=256m
Then you can start a local Flink cluster and config Flink to use the remote shuffle service by running the following command:
cd flink-1.14.0 # switch to the flink home directory
./bin/start-cluster.sh -D shuffle-service-factory.class=com.alibaba.flink.shuffle.plugin.RemoteShuffleServiceFactory -D remote-shuffle.manager.rpc-address=127.0.0.1
Finally, you can run the demo batch job:
cd flink-1.14.0 # switch to the flink home directory
bin/flink run -c com.alibaba.flink.shuffle.examples.BatchJobDemo ./BatchJobDemo.jar
To stop the local cluster, you can just run the stop-cluster.sh script in the bin directory:
cd flink-1.14.0 # switch to the flink home directory
bin/stop-cluster.sh
cd flink-remote-shuffle # switch to the remote shuffle project home directory
bin/stop-cluster.sh
Any feedback of this project is highly appreciated. You can report a bug by opening an issue on GitHub. You can also contribute any new features or improvements. See the contribution guide for more information.
We provide free support for users using this project. You can scan the following QR code to join the DingTalk user support group for further help and collaboration:
English:
Chinese:
Another way is to join the Slack channel by clicking this invitation .
This is a Flink ecosystem project. Apache Flink is an excellent unified stateful data processing engine. This project borrows some good designs (e.g. the credit-based backpressure) and building blocks (e.g. rpc and high availability) from Flink.