Trigger Luigi tasks on multiple worker machines. Python modules for tasks and their dependencies are installed automatically in virtualenvs on each worker.
The Luigi documentation states that
The most important aspect is that no execution is transferred. When you run a Luigi workflow, the worker schedules all tasks, and also executes the tasks within the process.
The benefit of this scheme is that it’s super easy to debug since all execution takes place in the process. It also makes deployment a non-event. During development, you typically run the Luigi workflow from the command line, whereas when you deploy it, you can trigger it using crontab or any other scheduler.
However, in practice this is what makes deployment hard since you have to figure out a way to install dependencies and code and to trigger your tasks on your worker nodes somehow. For non repeating pipelines (daily etc), this becomes increasingly complex.
Poltergust takes care of this for you! You run the poltergust main task on a set of machines (restarting it as soon as it finishes), and can then submit tasks to be run using files. These tasks consists of a luigi task to be run as well as the code to run it and any dependencies to be installed.
Poltergust supports all target types supported by luigi.contrib.opener, plus Google Cloud Storage targets (gs:// urls) for file storage.
Docker compose file to start poltergust and luigi:
version: "3.9"
services:
poltergust:
# build: .
image: redhogorg/poltergust:0.0.2
volumes:
- ~/.config/gcloud:/root/.config/gcloud
environment:
- PIPELINE_URL=gs://mybucket/pipeline
- SCHEDULER_URL=http://scheduler:8082
deploy:
replicas: 1 # Set this to how many workers you want
scheduler:
image: spotify/luigi
ports:
- 8082:8082
gs://mybucket/pipeline/mypipeline.config.yaml
:
environment: gs://mybucket/environment/myenv.yaml
task:
name: SomeTask
module: some_luigi_pipeline_module
some-task-argument: some-value
variables:
DB_HOST: database.com
gs://mybucket/environment/myenv.yaml
:
virtualenv:
python: python3.8
dependencies:
- pandas
- matplotlib
variables:
PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION: python
An environment file specifies a virtualenv to be created, with the
arguments specified (--python=python3.8), and a set of dependencies to
be installed using pip. Each dependency will be installed inside the
virtualenv with pip install, using the verbatim dependency string
given. It can optionally also specify environment variables to be set
using the key variables
.
A task file specifies an environment to run the task in, a luigi root
task name, and any other arguments to give to the luigi command (with
--
removed). Note: --scheduler-url
will be automatically added. It
can optionally also specify environment variables to be set using the
key variables
.
When a task is done gs://mybucket/pipeline/mypipeline.config.yaml
is
renamed to gs://mybucket/pipeline/mypipeline.done.yaml
(since a task
is run on multiple nodes: the first one to mark the task as done
renames the file). If the task fails, the config file is instead
renamed to gs://mybucket/pipeline/mypipeline.error.yaml
services:
poltergust:
environment:
- DB_URL=http://uiserver:8000/update
If you provide the optional environment variable DB_URL
, a GET request will be made to it when any task finishes.
It will be called with a query parameter pipeline_url
with the pipeline url of the task.
while true; do
luigi RunTasks --module poltergust --hostname="$(hostname -f)" --path=gs://mybucket/pipeline
sleep 1
done
If running mulitple docker service replicas, you have the option to download a virtual environment from GCS instead of builing from scratch. This can save time but make note of that in order to work, the machines runnings the replicas have to be exactly the same. the following environment variables control this behaviour:
DOWNLOAD_ENVIRONMENT
UPLOAD_ENVIRONMENT
Both of them are set to False by default.
Other environment variables used by poltergust:
ENVIRONMENTS_DIR
, directory to store virtual environmentsGITHUB_TOKEN_EMRLD
, Github token to pip install from emerald private repos
To create a cluster with 2 nodes and the name mycluster
:
cd clusters
gsutil mb gs://mycluster
./dataproc-create-cluster.sh mycluster 2
The above will open an ssh connection to the master node after creating the cluster, forwarding port 8082, so that you can view the cluster status in a browser at http://localhost:8082
- Build a docker image using
docker built --tag poltergust:0.0.1
- Modify the
docker-compose.yml
to set number of replicas and GCS bucket. - Deploy the docker stack using
docker stack deply poltergust -c docker-compose.yml
When writing your Luigi pipelines, it is sometimes necessary to yield tasks form the run
method. This can limit paralellization if done wrongly.
To make sure your pipeline works optimally, do not yeild a list of tasks, instead, yield a single task that in turn has those tasks returned by its require
method:
Replace:
class MainTask(luigi.Task):
def run(self):
....
listofnumbers = makeList()
yield [SubTask(number) for number in listofnumbers]
With:
class MiddleTask(luigi.Task):
listofnumbers = luigi.Parameter()
def run(self):
return [SubTask(number) for number in self.listofnumbers]
....
class MainTask(luigi.Task):
def run(self):
....
listofnumbers = makeList()
yield MiddleTask(listofnumbers=listofnumbers)