Skip to content

Commit

Permalink
deploy: 5b7df38
Browse files Browse the repository at this point in the history
  • Loading branch information
phacops committed Sep 13, 2023
0 parents commit 6f28cf4
Show file tree
Hide file tree
Showing 91 changed files with 9,982 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .buildinfo
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Sphinx build info version 1
# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done.
config: 9db9da0647afe63ce71f2b065b7306e5
tags: 645f666f9bcd5a90fca523b33c5a78b7
Binary file added .doctrees/architecture.doctree
Binary file not shown.
Binary file added .doctrees/backpressure.doctree
Binary file not shown.
Binary file added .doctrees/dlqs.doctree
Binary file not shown.
Binary file added .doctrees/environment.pickle
Binary file not shown.
Binary file added .doctrees/getstarted.doctree
Binary file not shown.
Binary file added .doctrees/index.doctree
Binary file not shown.
Binary file added .doctrees/intro.doctree
Binary file not shown.
Binary file added .doctrees/metrics.doctree
Binary file not shown.
Binary file added .doctrees/offsets.doctree
Binary file not shown.
Binary file added .doctrees/strategies/batching.doctree
Binary file not shown.
Binary file added .doctrees/strategies/commit_offsets.doctree
Binary file not shown.
Binary file added .doctrees/strategies/filter.doctree
Binary file not shown.
Binary file added .doctrees/strategies/healthcheck.doctree
Binary file not shown.
Binary file added .doctrees/strategies/index.doctree
Binary file not shown.
Binary file added .doctrees/strategies/produce.doctree
Binary file not shown.
Binary file added .doctrees/strategies/reduce.doctree
Binary file not shown.
Binary file added .doctrees/strategies/run_task.doctree
Binary file not shown.
Binary file added .doctrees/strategies/run_task_in_threads.doctree
Binary file not shown.
Binary file not shown.
Binary file added .doctrees/strategies/unfold.doctree
Binary file not shown.
Binary file added .doctrees/what_for.doctree
Binary file not shown.
Empty file added .nojekyll
Empty file.
Binary file added _images/arroyo-banner.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added _images/arroyo_processing.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added _images/consumer_groups.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added _images/kafka_producer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
120 changes: 120 additions & 0 deletions _sources/architecture.rst.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
===================
Arroyo Architecture
===================

Arroyo is a set of high level abstractions to interact with Kafka.
These are meant to help the developer in writing performant consumers with
specific delivery guarantees.

Common problems addressed by Arroyo are guaranteeing at-least-once delivery,
providing a dead letter queue abstraction, support parallel (multi-processing)
message processing, etc.

The library is divided into three layers: the basic Kafka connectivity, the
streaming engine and the high level abstractions.

The basic connectivity layer is a simple wrapper around the Confluent python
library, which is itself based on librdkafka. Besides some cosmetic changes,
this level provides a Fake in memory broker and consumer to make unit test quick
to run.

The streaming engine provides an asynchronous processing interface to write
consumers. The consumer is written as a pipeline where each segment is an
asynchronous operation. The streaming engine implements the main consumer loop
and delegates the processing to the pipeline.

On top of the streaming engine, the library provides high-level abstractions that
are common when writing Kafka consumers like: *map*, *reduce*, *filter* together
with some common messaging application patterns like the dead letter queue.

Streaming Interface and Streaming Engine
----------------------------------------

A Kafka consumer is built as a pipeline where each segment processes messages in
an asynchronous way. The Streaming engine provides a message to a segment. The
segment is not supposed to execute small CPU work in a blocking way or do IO in a
non-blocking way. We generally use futures for this, and heavier CPU work in a
separate process.

Arroyo provides an interface to implement to write a pipeline segment.
The segment interface is called *ProcessingStrategy* and is in
`this module <https://github.com/getsentry/arroyo/blob/main/arroyo/processing/strategies/abstract.py>`_.
(TODO: bring the docstrings to the docs and reference that).

In most cases, when developing a consumer, the developer would not implement
that interface directly. A higher level abstraction would be used.

.. figure:: _static/diagrams/arroyo_processing.png

The main consumer loop is managed by the `stream engine <https://github.com/getsentry/arroyo/blob/main/arroyo/processing/processor.py>`_.
These are the phases:

* Poll from the Kafka consumer through the basic library. If a message is there
proceed or repeat.

* Submit the message to the first *ProcessingStrategy*. This is supposed to deliver
work for the strategy to do. It is not supposed to be a blocking operation. The
strategy should return immediately.

* Poll the strategy to execute work or to forward results to the following step
in the pipeline. Ideally all IO should be done in separate threads and heavy cpu
work should be done in separate processes so the *poll* method should check for
completed work, dispatch to the next step and return. In practice, work is executed
here in a blocking way if the overhead of offloading the work is too high.

The *ProcessingStrategy* may decide not to take the message and instead apply back-pressure.
This is done by raising the *MessageRejected* exception. In this case, the streaming
engine pauses the consumer till the strategy is ready to accept the message.

The *ProcessingStrategy* decides when it is time to commit a message. This is done
through a commit callback provided to the strategy when it is instantiated.

The streaming engine orchestrates the life cycle of the *ProcessingStrategy*, thus
when it thinks it is time to shut the strategy down it would wait for all in-flight
work to be completed and then destroy the strategy.

There are two scenarios where this can happen:

* The consumer is being terminated.
* A rebalancing happened. A rebalancing revokes partitions and assigns new ones.
After a rebalancing is complete it is impossible to commit a message from a partition
that was revoked. In order to ensure the consumer behaves in a consistent way,
upon rebalancing, the streaming engine destroys the strategy and builds a new one.
This allows the strategy to complete all in-flight work before being terminated.

High level strategies
-----------------------

Most consumers follow the same few patterns, so Arroyo provides abstractions that
are based on the *ProcessingStrategy* but are simpler to implement for the common
use cases.

Common examples are:

* ``run task, run task in threads, run task with multiprocessing``. The run task
set of strategies are designed to be the most flexible and simple to use. They take
a function provided by the user and execute it on every message, passing the output
to the next step. The library includes synchronous and asynchronous versions depending
on the kind of concurrency required by the user.

* ``filter, map and forward``. This type of consumer inspects a message, decides
whether to process it or discard it, transforms its content, and produces the result
on a new topic. In this case, Arroyo provides three implementations of the
*ProcessingStrategy*: *filter*, *transform*, and *produce*. The developer only needs
to wire them together and provide the map and filtering logic.

* ``consume, apply side effects, produce``. This is a variation of the one above.
In this case, the transform operation can have side-effects like storing the content
of the message somewhere.

* ``high throughput cpu intensive transform``. The python GIL does not allow CPU intensive
work to take advantage of parallelism. Arroyo provides an implementation of the *map*
pattern that batches messages and dispatches the work to separate processes via shared
memory. This is largely transparent to the developers.

* ``map, reduce and store``. The reduce function is carried out by the *Collector*, which
batches messages and executes some logic with side-effects when the batch is full.
This is a typical way to write messages on a storages in batches to reduce the
round trips.

All strategies included with Arroyo are in `the strategies module <https://github.com/getsentry/arroyo/tree/main/arroyo/processing/strategies>`_.
43 changes: 43 additions & 0 deletions _sources/backpressure.rst.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
Backpressure
============

.. py:currentmodule:: arroyo.processing.strategies
Arroyo's own processing strategies internally apply backpressure by raising
:py:class:`~abstract.MessageRejected`. Most
consumers do not require additional work to deal with backpressure correctly.

If you want to slow down the consumer based on some external signal or
condition, you can achieve that most effectively by raising the same exception
from within a callback passed to :py:class:`~run_task.RunTask` while the
consumer is supposed to be paused

.. code-block:: Python
class ConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(self):
self.is_paused = False
def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
def handle_message(message: Message[KafkaPayload]) -> Message[KafkaPayload]:
if self.is_paused:
raise MessageRejected()
print(f"MSG: {message.payload}")
return message
return RunTask(handle_message, CommitOffsets(commit))
It is not recommended to apply backpressure by just ``sleep()``-ing in
:py:class:`~abstract.ProcessingStrategy.submit` (or, in this example,
``handle_message``) for more than a few milliseconds. While this definitely
pauses the consumer, it will block the main thread for too long and and prevent
things like consumer rebalancing from occuring.

A 0.01 second sleep is applied each time :py:class:`~abstract.MessageRejected` is
raised to prevent the main thread spinning at 100% CPU. However background thread
performance may be impacted during this time.
21 changes: 21 additions & 0 deletions _sources/dlqs.rst.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
==================
Dead letter queues
==================

.. warning::
Dead letter queues should be used with caution as they break some of the ordering guarantees
otherwise offered by Arroyo and Kafka consumer code. In particular, it must be safe for the
consumer to drop a message. If replaying or later re-processing of the DLQ'ed messages is done,
it is critical that ordering is not a requirement in the relevant downstream code.

Arroyo provides support for routing invalid messages to dead letter queues in consumers.
Dead letter queues are critical in some applications because messages are ordered in Kafka
and a single invalid message can cause a consumer to crash and every subsequent message to
not be processed.

The dead letter queue configuration is passed to the `StreamProcessor` and, if provided, any
`InvalidMessage` raise by a strategy will be produced to the dead letter queue.


.. automodule:: arroyo.dlq
:members: InvalidMessage, DlqLimit, DlqPolicy, DlqProducer, KafkaDlqProducer, NoopDlqProducer
Loading

0 comments on commit 6f28cf4

Please sign in to comment.