Simple library to dispatch work from a large line-oriented file (jsonl) for distributed workers without pre-apportioning work.
Dispatcher is ideal for batch inference workloads where individual requests may take varying amounts of time, but you want to keep all workers busy and avoid the long tails that you might run into by dividing the work up beforehand.
Dispatcher guarantees that each completed work item will be persisted to disk only once, but items may be processed more than once, so it is inappropriate for work that changes state in external systems or is otherwise not idempotent.
Work is checkpointed so that if a job ends unexpectedly, work can begin where it left off with minimal lost work (specifically, only work which is cached waiting to be written because it has been completed out of order will be lost.)
In order to work efficiently with large data files, ensure each item is written only once, and avoid costly scans and reconciliation on restart, the dispatcher works on a line-per-line basis, each nth line of input will correspond with the nth line of output. On restarting, we only need to determine where we left off to begin again.
This means the dispatcher must cache out of order work until the work can be written contiguously in the output file. Work that has been issued but not completed will be reissued again after a timeout to avoid unbounded memory growth, but in certain pathological situations (a "query of death") this could still cause an out of memory situation.
Probably we should time out incomplete work after a certain number of retries and write it to a rejected file, but that is not yet implemented.
pip install -e .[dev]
python -m dispatcher.server --infile path/to/input.jsonl --outfile path/to/output.jsonl
# or
dispatcher-server --infile path/to/input.jsonl --outfile path/to/output.jsonl
import time
from dispatcher.client import WorkClient
from dispatcher.models import WorkStatus
import json
client = WorkClient("http://127.0.0.1:8000")
while True:
resp = client.get_work(batch_size=5)
if resp.status == WorkStatus.ALL_WORK_COMPLETE:
print("All work complete. Exiting.")
break
elif resp.status == WorkStatus.RETRY:
print(f"No work available; retry in {resp.retry_in} seconds.")
time.sleep(resp.retry_in)
continue
elif resp.status == WorkStatus.SERVER_UNAVAILABLE:
# The server is not running.
# The server exits once all work is complete, so let's assume that's the case here.
print("Server is unavailable. Exiting.")
break
elif resp.status == WorkStatus.OK:
batch = resp.items
for work in batch:
print(f"Got work: work_id={work.work_id}, content='{work.content}'")
# Process the work
# NOTE: work.content is still plain text here. If it contains JSON,
# you'll still need to parse it.
#content = json.loads(work.content)
# do actual work here
work.set_result(f"processed_{work.work_id}")
# TODO error check here??
submit_resp = client.submit_result(batch)
print(f"Submitted {submit_resp.count} items, status={submit_resp.status}")
else:
print(f"Unexpected status: {resp.status}")
break
If you have too much work expiring because inferences are taking a long time, you can update the work timeout without restarting the job like this:
HOSTNAME=...
curl -X POST -H "Content-Type: application/json" \
-d '{"timeout": 1600}' \
http://${HOSTNAME}:8000/work_timeout