From 291a08b584f3ddec1b46fad0bc0a676f672a29d8 Mon Sep 17 00:00:00 2001 From: Marcos Schroh Date: Mon, 15 Jul 2024 14:58:21 +0200 Subject: [PATCH] feat: subscribe topics by pattern --- README.md | 1 + docs/stream.md | 15 +- docs/test_client.md | 55 +++ .../subscribe-topics-by-pattern/README.md | 47 +++ .../subscribe-topics-by-pattern/poetry.lock | 346 ++++++++++++++++++ .../pyproject.toml | 19 + .../subscribe_topics_by_pattern/__init__.py | 0 .../subscribe_topics_by_pattern/app.py | 51 +++ kstreams/engine.py | 2 + kstreams/streams.py | 121 +++++- kstreams/test_utils/test_clients.py | 13 +- kstreams/test_utils/test_utils.py | 3 +- kstreams/test_utils/topics.py | 13 +- scripts/cluster/start | 2 + tests/__init__.py | 7 + tests/test_client.py | 48 ++- tests/test_stream_engine.py | 13 - tests/test_streams.py | 127 ++++++- 18 files changed, 822 insertions(+), 61 deletions(-) create mode 100644 examples/subscribe-topics-by-pattern/README.md create mode 100644 examples/subscribe-topics-by-pattern/poetry.lock create mode 100644 examples/subscribe-topics-by-pattern/pyproject.toml create mode 100644 examples/subscribe-topics-by-pattern/subscribe_topics_by_pattern/__init__.py create mode 100644 examples/subscribe-topics-by-pattern/subscribe_topics_by_pattern/app.py diff --git a/README.md b/README.md index b0749031..cfbb6dc1 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,7 @@ if __name__ == "__main__": - [x] Produce events - [x] Consumer events with `Streams` +- [x] Subscribe to topics by `pattern` - [x] `Prometheus` metrics and custom monitoring - [x] TestClient - [x] Custom Serialization and Deserialization diff --git a/docs/stream.md b/docs/stream.md index be95bfcc..5e10de50 100644 --- a/docs/stream.md +++ b/docs/stream.md @@ -8,7 +8,7 @@ Consuming can be done using `kstreams.Stream`. You only need to decorate a `coro options: show_root_heading: true docstring_section_style: table - show_signature_annotations: false + show_source: false members: - @@ -187,19 +187,6 @@ Traceback (most recent call last): AttributeError: 'ConsumerRecord' object has no attribute 'payload' ``` -## Consuming from multiple topics - -Consuming from multiple topics using one `stream` is possible. A `List[str]` of topics must be provided. - -```python title="Consume from multiple topics" -stream_engine = create_engine(title="my-stream-engine") - - -@stream_engine.stream(["local--kstreams", "local--hello-world"], group_id="example-group") -async def consume(cr: ConsumerRecord) -> None: - print(f"Event consumed from topic {cr.topic}: headers: {cr.headers}, payload: {cr.value}") -``` - ## Changing consumer behavior Most of the time you will only set the `topic` and the `group_id` to the `consumer`, but sometimes you might want more control over it, for example changing the `policy for resetting offsets on OffsetOutOfRange errors` or `session timeout`. To do this, you have to use the same `kwargs` as the [aiokafka consumer](https://aiokafka.readthedocs.io/en/stable/api.html#aiokafkaconsumer-class) API diff --git a/docs/test_client.md b/docs/test_client.md index 16901c9e..3284b83f 100644 --- a/docs/test_client.md +++ b/docs/test_client.md @@ -353,6 +353,61 @@ async with client: assert event.key == key ``` +## Topics subscribed by pattern + +When a `Stream` is using `pattern` subscription it is not possible to know before hand how many topics the `Stream` will consume from. +To solve this problem the `topics` must be pre defined using the `extra topics` features from the `TestClient`: + +In the following example we have a `Stream` that will consume from topics that match the regular expression `^dev--customer-.*$`, for example `dev--customer-invoice` and `dev--customer-profile`. + +```python +# app.py +from kstreams import ConsumerRecord + +stream_engine = create_engine(title="my-stream-engine") + + +@stream_engine.stream(topics="^dev--customer-.*$", subscribe_by_pattern=True) +async def stream(cr: ConsumerRecord): + if cr.topic == customer_invoice_topic: + assert cr.value == invoice_event + elif cr.topic == customer_profile_topic: + assert cr.value == profile_event + else: + raise ValueError(f"Invalid topic {cr.topic}") +``` + +Then to test our `Stream`, we need to pre define the topics: + +```python +# test_stream.py +import pytest +from kstreams.test_utils import TestStreamClient + +from app import stream_engine + + +@pytest.mark.asyncio +async def test_consume_events_topics_by_pattern(): + """ + This test shows the possibility to subscribe to multiple topics using a pattern + """ + customer_invoice_topic = "dev--customer-invoice" + customer_profile_topic = "dev--customer-profile" + + client = TestStreamClient( + stream_engine, topics=[customer_invoice_topic, customer_profile_topic] + ) + + async with client: + await client.send(customer_invoice_topic, value=b"invoice-1", key="1") + await client.send(customer_profile_topic, value=b"profile-1", key="1") + + # give some time to consume all the events + await asyncio.sleep(0.1) + assert TopicManager.all_messages_consumed() +``` + ## Disabling monitoring during testing Monitoring streams and producers is vital for streaming application but it requires extra effort. Sometimes during testing, diff --git a/examples/subscribe-topics-by-pattern/README.md b/examples/subscribe-topics-by-pattern/README.md new file mode 100644 index 00000000..404aff12 --- /dev/null +++ b/examples/subscribe-topics-by-pattern/README.md @@ -0,0 +1,47 @@ +# Subscribe topics by pattern + +In the following example we have a `Stream` that will consume from topics that match the regular expression `^local--customer-.*$`, for example +`local--customer-invoice` and `local--customer-profile`. + +## Requirements + +python 3.8+, poetry, docker-compose + +### Installation + +```bash +poetry install +``` + +## Usage + +1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start` +2. Inside this folder execute `poetry run app` + +The app publishes events to the topics `local--customer-invoice` and `local--customer-profile`, then the events are consumed by the `stream` that has subscribed them using the pattern `^local--customer-.*$`. + +You should see something similar to the following logs: + +```bash +❯ poetry run app + +INFO:aiokafka.consumer.consumer:Subscribed to topic pattern: re.compile('^local--customer-.*$') +INFO:kstreams.prometheus.monitor:Starting Prometheus Monitoring started... +INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'local--customer-profile', 'local--customer-invoice'}) +INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 1 for group topics-by-pattern-group +INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group topics-by-pattern-group +INFO:aiokafka.consumer.group_coordinator:(Re-)joining group topics-by-pattern-group +INFO:aiokafka.consumer.group_coordinator:Joined group 'topics-by-pattern-group' (generation 7) with member_id aiokafka-0.11.0-d4e8d901-666d-4286-8c6c-621a12b7216f +INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using roundrobin +INFO:aiokafka.consumer.group_coordinator:Successfully synced group topics-by-pattern-group with generation 7 +INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='local--customer-profile', partition=0), TopicPartition(topic='local--customer-invoice', partition=0)} for group topics-by-pattern-group +INFO:subscribe_topics_by_pattern.app:Event b'profile-1' consumed from topic local--customer-profile +INFO:subscribe_topics_by_pattern.app:Event b'profile-1' consumed from topic local--customer-profile +INFO:subscribe_topics_by_pattern.app:Event b'invoice-1' consumed from topic local--customer-invoice +INFO:subscribe_topics_by_pattern.app:Event b'invoice-1' consumed from topic local--customer-invoice +``` + +## Note + +If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where +`kstreams` is pointing to the parent folder. You will have to set the latest version. diff --git a/examples/subscribe-topics-by-pattern/poetry.lock b/examples/subscribe-topics-by-pattern/poetry.lock new file mode 100644 index 00000000..b06c15d3 --- /dev/null +++ b/examples/subscribe-topics-by-pattern/poetry.lock @@ -0,0 +1,346 @@ +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. + +[[package]] +name = "aiokafka" +version = "0.11.0" +description = "Kafka integration with asyncio" +optional = false +python-versions = ">=3.8" +files = [ + {file = "aiokafka-0.11.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:926f93fb6a39891fd4364494432b479c0602f9cac708778d4a262a2c2e20d3b4"}, + {file = "aiokafka-0.11.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:38e1917e706c1158d5e1f612d1fc1b40f706dc46c534e73ab4de8ae2868a31be"}, + {file = "aiokafka-0.11.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:516e1d68d9a377860b2e17453580afe304605bc71894f684d3e7b6618f6f939f"}, + {file = "aiokafka-0.11.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:acfd0a5d0aec762ba73eeab73b23edce14f315793f063b6a4b223b6f79e36bb8"}, + {file = "aiokafka-0.11.0-cp310-cp310-win32.whl", hash = "sha256:0d80590c4ef0ba546a299cee22ea27c3360c14241ec43a8e6904653f7b22d328"}, + {file = "aiokafka-0.11.0-cp310-cp310-win_amd64.whl", hash = "sha256:1d519bf9875ac867fb19d55de3750833b1eb6379a08de29a68618e24e6a49fc0"}, + {file = "aiokafka-0.11.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:0e957b42ae959365efbb45c9b5de38032c573608553c3670ad8695cc210abec9"}, + {file = "aiokafka-0.11.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:224db2447f6c1024198d8342e7099198f90401e2fa29c0762afbc51eadf5c490"}, + {file = "aiokafka-0.11.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ef3e7c8a923e502caa4d24041f2be778fd7f9ee4587bf0bcb4f74cac05122fa"}, + {file = "aiokafka-0.11.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:59f4b935589ebb244620afad8bf3320e3bc86879a8b1c692ad06bd324f6c6127"}, + {file = "aiokafka-0.11.0-cp311-cp311-win32.whl", hash = "sha256:560839ae6bc13e71025d71e94df36980f5c6e36a64916439e598b6457267a37f"}, + {file = "aiokafka-0.11.0-cp311-cp311-win_amd64.whl", hash = "sha256:1f8ae91f0373830e4664376157fe61b611ca7e573d8a559b151aef5bf53df46c"}, + {file = "aiokafka-0.11.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:4e0cc080a7f4c659ee4e1baa1c32adedcccb105a52156d4909f357d76fac0dc1"}, + {file = "aiokafka-0.11.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:55a07a39d82c595223a17015ea738d152544cee979d3d6d822707a082465621c"}, + {file = "aiokafka-0.11.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3711fa64ee8640dcd4cb640f1030f9439d02e85acd57010d09053017092d8cc2"}, + {file = "aiokafka-0.11.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:818a6f8e44b02113b9e795bee6029c8a4e525ab38f29d7adb0201f3fec74c808"}, + {file = "aiokafka-0.11.0-cp312-cp312-win32.whl", hash = "sha256:8ba981956243767b37c929845c398fda2a2e35a4034d218badbe2b62e6f98f96"}, + {file = "aiokafka-0.11.0-cp312-cp312-win_amd64.whl", hash = "sha256:9a478a14fd23fd1ffe9c7a21238d818b5f5e0626f7f06146b687f3699298391b"}, + {file = "aiokafka-0.11.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:0973a245b8b9daf8ef6814253a80a700f1f54d2da7d88f6fe479f46e0fd83053"}, + {file = "aiokafka-0.11.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ee0c61a2dcabbe4474ff237d708f9bd663dd2317e03a9cb7239a212c9ee05b12"}, + {file = "aiokafka-0.11.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:230170ce2e8a0eb852e2e8b78b08ce2e29b77dfe2c51bd56f5ab4be0f332a63b"}, + {file = "aiokafka-0.11.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eac78a009b713e28b5b4c4daae9d062acbf2b7980e5734467643a810134583b5"}, + {file = "aiokafka-0.11.0-cp38-cp38-win32.whl", hash = "sha256:73584be8ba7906e3f33ca0f08f6af21a9ae31b86c6b635b93db3b1e6f452657b"}, + {file = "aiokafka-0.11.0-cp38-cp38-win_amd64.whl", hash = "sha256:d724b6fc484e453b373052813e4e543fc028a22c3fbda10e13b6829740000b8a"}, + {file = "aiokafka-0.11.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:419dd28c8ed6e926061bdc60929af08a6b52f1721e1179d9d21cc72ae28fd6f6"}, + {file = "aiokafka-0.11.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f1c85f66eb3564c5e74d8e4c25df4ac1fd94f1a6f6e66f005aafa6f791bde215"}, + {file = "aiokafka-0.11.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eaafe134de57b184f3c030e1a11051590caff7953c8bf58048eefd8d828e39d7"}, + {file = "aiokafka-0.11.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:807f699cf916369b1a512e4f2eaec714398c202d8803328ef8711967d99a56ce"}, + {file = "aiokafka-0.11.0-cp39-cp39-win32.whl", hash = "sha256:d59fc7aec088c9ffc02d37e61591f053459bd11912cf04c70ac4f7e60405667d"}, + {file = "aiokafka-0.11.0-cp39-cp39-win_amd64.whl", hash = "sha256:702aec15b63bad5e4476294bcb1cb177559149fce3e59335794f004c279cbd6a"}, + {file = "aiokafka-0.11.0.tar.gz", hash = "sha256:f2def07fe1720c4fe37c0309e355afa9ff4a28e0aabfe847be0692461ac69352"}, +] + +[package.dependencies] +async-timeout = "*" +packaging = "*" +typing-extensions = ">=4.10.0" + +[package.extras] +all = ["cramjam (>=2.8.0)", "gssapi"] +gssapi = ["gssapi"] +lz4 = ["cramjam (>=2.8.0)"] +snappy = ["cramjam"] +zstd = ["cramjam"] + +[[package]] +name = "aiorun" +version = "2024.5.1" +description = "Boilerplate for asyncio applications" +optional = false +python-versions = ">=3.7" +files = [ + {file = "aiorun-2024.5.1-py3-none-any.whl", hash = "sha256:4613a4fb8c23fff0c49a19ee6100953e7447e20e0ca14f21abffa4ef765e68c7"}, + {file = "aiorun-2024.5.1.tar.gz", hash = "sha256:c9911e414b6eb4f32ed50c964c4b8fded072dfd09b7fae6b953b9694bf7d0deb"}, +] + +[package.extras] +dev = ["pytest", "pytest-cov"] + +[[package]] +name = "annotated-types" +version = "0.7.0" +description = "Reusable constraint types to use with typing.Annotated" +optional = false +python-versions = ">=3.8" +files = [ + {file = "annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53"}, + {file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"}, +] + +[package.dependencies] +typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.9\""} + +[[package]] +name = "async-timeout" +version = "4.0.3" +description = "Timeout context manager for asyncio programs" +optional = false +python-versions = ">=3.7" +files = [ + {file = "async-timeout-4.0.3.tar.gz", hash = "sha256:4640d96be84d82d02ed59ea2b7105a0f7b33abe8703703cd0ab0bf87c427522f"}, + {file = "async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028"}, +] + +[[package]] +name = "future" +version = "0.18.3" +description = "Clean single-source support for Python 3 and 2" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" +files = [ + {file = "future-0.18.3.tar.gz", hash = "sha256:34a17436ed1e96697a86f9de3d15a3b0be01d8bc8de9c1dffd59fb8234ed5307"}, +] + +[[package]] +name = "kstreams" +version = "0.20.0" +description = "Build simple kafka streams applications" +optional = false +python-versions = "^3.8" +files = [] +develop = true + +[package.dependencies] +aiokafka = "<1.0" +future = "^0.18.2" +prometheus-client = "<1.0" +pydantic = ">=2.0.0,<3.0.0" +PyYAML = ">=5.4,<7.0.0" + +[package.source] +type = "directory" +url = "../.." + +[[package]] +name = "packaging" +version = "24.1" +description = "Core utilities for Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124"}, + {file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"}, +] + +[[package]] +name = "prometheus-client" +version = "0.20.0" +description = "Python client for the Prometheus monitoring system." +optional = false +python-versions = ">=3.8" +files = [ + {file = "prometheus_client-0.20.0-py3-none-any.whl", hash = "sha256:cde524a85bce83ca359cc837f28b8c0db5cac7aa653a588fd7e84ba061c329e7"}, + {file = "prometheus_client-0.20.0.tar.gz", hash = "sha256:287629d00b147a32dcb2be0b9df905da599b2d82f80377083ec8463309a4bb89"}, +] + +[package.extras] +twisted = ["twisted"] + +[[package]] +name = "pydantic" +version = "2.8.2" +description = "Data validation using Python type hints" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pydantic-2.8.2-py3-none-any.whl", hash = "sha256:73ee9fddd406dc318b885c7a2eab8a6472b68b8fb5ba8150949fc3db939f23c8"}, + {file = "pydantic-2.8.2.tar.gz", hash = "sha256:6f62c13d067b0755ad1c21a34bdd06c0c12625a22b0fc09c6b149816604f7c2a"}, +] + +[package.dependencies] +annotated-types = ">=0.4.0" +pydantic-core = "2.20.1" +typing-extensions = [ + {version = ">=4.12.2", markers = "python_version >= \"3.13\""}, + {version = ">=4.6.1", markers = "python_version < \"3.13\""}, +] + +[package.extras] +email = ["email-validator (>=2.0.0)"] + +[[package]] +name = "pydantic-core" +version = "2.20.1" +description = "Core functionality for Pydantic validation and serialization" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pydantic_core-2.20.1-cp310-cp310-macosx_10_12_x86_64.whl", hash = "sha256:3acae97ffd19bf091c72df4d726d552c473f3576409b2a7ca36b2f535ffff4a3"}, + {file = "pydantic_core-2.20.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:41f4c96227a67a013e7de5ff8f20fb496ce573893b7f4f2707d065907bffdbd6"}, + {file = "pydantic_core-2.20.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5f239eb799a2081495ea659d8d4a43a8f42cd1fe9ff2e7e436295c38a10c286a"}, + {file = "pydantic_core-2.20.1-cp310-cp310-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:53e431da3fc53360db73eedf6f7124d1076e1b4ee4276b36fb25514544ceb4a3"}, + {file = "pydantic_core-2.20.1-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f1f62b2413c3a0e846c3b838b2ecd6c7a19ec6793b2a522745b0869e37ab5bc1"}, + {file = "pydantic_core-2.20.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:5d41e6daee2813ecceea8eda38062d69e280b39df793f5a942fa515b8ed67953"}, + {file = "pydantic_core-2.20.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3d482efec8b7dc6bfaedc0f166b2ce349df0011f5d2f1f25537ced4cfc34fd98"}, + {file = "pydantic_core-2.20.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:e93e1a4b4b33daed65d781a57a522ff153dcf748dee70b40c7258c5861e1768a"}, + {file = "pydantic_core-2.20.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:e7c4ea22b6739b162c9ecaaa41d718dfad48a244909fe7ef4b54c0b530effc5a"}, + {file = "pydantic_core-2.20.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:4f2790949cf385d985a31984907fecb3896999329103df4e4983a4a41e13e840"}, + {file = "pydantic_core-2.20.1-cp310-none-win32.whl", hash = "sha256:5e999ba8dd90e93d57410c5e67ebb67ffcaadcea0ad973240fdfd3a135506250"}, + {file = "pydantic_core-2.20.1-cp310-none-win_amd64.whl", hash = "sha256:512ecfbefef6dac7bc5eaaf46177b2de58cdf7acac8793fe033b24ece0b9566c"}, + {file = "pydantic_core-2.20.1-cp311-cp311-macosx_10_12_x86_64.whl", hash = "sha256:d2a8fa9d6d6f891f3deec72f5cc668e6f66b188ab14bb1ab52422fe8e644f312"}, + {file = "pydantic_core-2.20.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:175873691124f3d0da55aeea1d90660a6ea7a3cfea137c38afa0a5ffabe37b88"}, + {file = "pydantic_core-2.20.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:37eee5b638f0e0dcd18d21f59b679686bbd18917b87db0193ae36f9c23c355fc"}, + {file = "pydantic_core-2.20.1-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:25e9185e2d06c16ee438ed39bf62935ec436474a6ac4f9358524220f1b236e43"}, + {file = "pydantic_core-2.20.1-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:150906b40ff188a3260cbee25380e7494ee85048584998c1e66df0c7a11c17a6"}, + {file = "pydantic_core-2.20.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:8ad4aeb3e9a97286573c03df758fc7627aecdd02f1da04516a86dc159bf70121"}, + {file = "pydantic_core-2.20.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d3f3ed29cd9f978c604708511a1f9c2fdcb6c38b9aae36a51905b8811ee5cbf1"}, + {file = "pydantic_core-2.20.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:b0dae11d8f5ded51699c74d9548dcc5938e0804cc8298ec0aa0da95c21fff57b"}, + {file = "pydantic_core-2.20.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:faa6b09ee09433b87992fb5a2859efd1c264ddc37280d2dd5db502126d0e7f27"}, + {file = "pydantic_core-2.20.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:9dc1b507c12eb0481d071f3c1808f0529ad41dc415d0ca11f7ebfc666e66a18b"}, + {file = "pydantic_core-2.20.1-cp311-none-win32.whl", hash = "sha256:fa2fddcb7107e0d1808086ca306dcade7df60a13a6c347a7acf1ec139aa6789a"}, + {file = "pydantic_core-2.20.1-cp311-none-win_amd64.whl", hash = "sha256:40a783fb7ee353c50bd3853e626f15677ea527ae556429453685ae32280c19c2"}, + {file = "pydantic_core-2.20.1-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:595ba5be69b35777474fa07f80fc260ea71255656191adb22a8c53aba4479231"}, + {file = "pydantic_core-2.20.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a4f55095ad087474999ee28d3398bae183a66be4823f753cd7d67dd0153427c9"}, + {file = "pydantic_core-2.20.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f9aa05d09ecf4c75157197f27cdc9cfaeb7c5f15021c6373932bf3e124af029f"}, + {file = "pydantic_core-2.20.1-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:e97fdf088d4b31ff4ba35db26d9cc472ac7ef4a2ff2badeabf8d727b3377fc52"}, + {file = "pydantic_core-2.20.1-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:bc633a9fe1eb87e250b5c57d389cf28998e4292336926b0b6cdaee353f89a237"}, + {file = "pydantic_core-2.20.1-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d573faf8eb7e6b1cbbcb4f5b247c60ca8be39fe2c674495df0eb4318303137fe"}, + {file = "pydantic_core-2.20.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:26dc97754b57d2fd00ac2b24dfa341abffc380b823211994c4efac7f13b9e90e"}, + {file = "pydantic_core-2.20.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:33499e85e739a4b60c9dac710c20a08dc73cb3240c9a0e22325e671b27b70d24"}, + {file = "pydantic_core-2.20.1-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:bebb4d6715c814597f85297c332297c6ce81e29436125ca59d1159b07f423eb1"}, + {file = "pydantic_core-2.20.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:516d9227919612425c8ef1c9b869bbbee249bc91912c8aaffb66116c0b447ebd"}, + {file = "pydantic_core-2.20.1-cp312-none-win32.whl", hash = "sha256:469f29f9093c9d834432034d33f5fe45699e664f12a13bf38c04967ce233d688"}, + {file = "pydantic_core-2.20.1-cp312-none-win_amd64.whl", hash = "sha256:035ede2e16da7281041f0e626459bcae33ed998cca6a0a007a5ebb73414ac72d"}, + {file = "pydantic_core-2.20.1-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:0827505a5c87e8aa285dc31e9ec7f4a17c81a813d45f70b1d9164e03a813a686"}, + {file = "pydantic_core-2.20.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:19c0fa39fa154e7e0b7f82f88ef85faa2a4c23cc65aae2f5aea625e3c13c735a"}, + {file = "pydantic_core-2.20.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4aa223cd1e36b642092c326d694d8bf59b71ddddc94cdb752bbbb1c5c91d833b"}, + {file = "pydantic_core-2.20.1-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:c336a6d235522a62fef872c6295a42ecb0c4e1d0f1a3e500fe949415761b8a19"}, + {file = "pydantic_core-2.20.1-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:7eb6a0587eded33aeefea9f916899d42b1799b7b14b8f8ff2753c0ac1741edac"}, + {file = "pydantic_core-2.20.1-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:70c8daf4faca8da5a6d655f9af86faf6ec2e1768f4b8b9d0226c02f3d6209703"}, + {file = "pydantic_core-2.20.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e9fa4c9bf273ca41f940bceb86922a7667cd5bf90e95dbb157cbb8441008482c"}, + {file = "pydantic_core-2.20.1-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:11b71d67b4725e7e2a9f6e9c0ac1239bbc0c48cce3dc59f98635efc57d6dac83"}, + {file = "pydantic_core-2.20.1-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:270755f15174fb983890c49881e93f8f1b80f0b5e3a3cc1394a255706cabd203"}, + {file = "pydantic_core-2.20.1-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:c81131869240e3e568916ef4c307f8b99583efaa60a8112ef27a366eefba8ef0"}, + {file = "pydantic_core-2.20.1-cp313-none-win32.whl", hash = "sha256:b91ced227c41aa29c672814f50dbb05ec93536abf8f43cd14ec9521ea09afe4e"}, + {file = "pydantic_core-2.20.1-cp313-none-win_amd64.whl", hash = "sha256:65db0f2eefcaad1a3950f498aabb4875c8890438bc80b19362cf633b87a8ab20"}, + {file = "pydantic_core-2.20.1-cp38-cp38-macosx_10_12_x86_64.whl", hash = "sha256:4745f4ac52cc6686390c40eaa01d48b18997cb130833154801a442323cc78f91"}, + {file = "pydantic_core-2.20.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:a8ad4c766d3f33ba8fd692f9aa297c9058970530a32c728a2c4bfd2616d3358b"}, + {file = "pydantic_core-2.20.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:41e81317dd6a0127cabce83c0c9c3fbecceae981c8391e6f1dec88a77c8a569a"}, + {file = "pydantic_core-2.20.1-cp38-cp38-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:04024d270cf63f586ad41fff13fde4311c4fc13ea74676962c876d9577bcc78f"}, + {file = "pydantic_core-2.20.1-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:eaad4ff2de1c3823fddf82f41121bdf453d922e9a238642b1dedb33c4e4f98ad"}, + {file = "pydantic_core-2.20.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:26ab812fa0c845df815e506be30337e2df27e88399b985d0bb4e3ecfe72df31c"}, + {file = "pydantic_core-2.20.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3c5ebac750d9d5f2706654c638c041635c385596caf68f81342011ddfa1e5598"}, + {file = "pydantic_core-2.20.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:2aafc5a503855ea5885559eae883978c9b6d8c8993d67766ee73d82e841300dd"}, + {file = "pydantic_core-2.20.1-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:4868f6bd7c9d98904b748a2653031fc9c2f85b6237009d475b1008bfaeb0a5aa"}, + {file = "pydantic_core-2.20.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:aa2f457b4af386254372dfa78a2eda2563680d982422641a85f271c859df1987"}, + {file = "pydantic_core-2.20.1-cp38-none-win32.whl", hash = "sha256:225b67a1f6d602de0ce7f6c1c3ae89a4aa25d3de9be857999e9124f15dab486a"}, + {file = "pydantic_core-2.20.1-cp38-none-win_amd64.whl", hash = "sha256:6b507132dcfc0dea440cce23ee2182c0ce7aba7054576efc65634f080dbe9434"}, + {file = "pydantic_core-2.20.1-cp39-cp39-macosx_10_12_x86_64.whl", hash = "sha256:b03f7941783b4c4a26051846dea594628b38f6940a2fdc0df00b221aed39314c"}, + {file = "pydantic_core-2.20.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:1eedfeb6089ed3fad42e81a67755846ad4dcc14d73698c120a82e4ccf0f1f9f6"}, + {file = "pydantic_core-2.20.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:635fee4e041ab9c479e31edda27fcf966ea9614fff1317e280d99eb3e5ab6fe2"}, + {file = "pydantic_core-2.20.1-cp39-cp39-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:77bf3ac639c1ff567ae3b47f8d4cc3dc20f9966a2a6dd2311dcc055d3d04fb8a"}, + {file = "pydantic_core-2.20.1-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:7ed1b0132f24beeec5a78b67d9388656d03e6a7c837394f99257e2d55b461611"}, + {file = "pydantic_core-2.20.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:c6514f963b023aeee506678a1cf821fe31159b925c4b76fe2afa94cc70b3222b"}, + {file = "pydantic_core-2.20.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10d4204d8ca33146e761c79f83cc861df20e7ae9f6487ca290a97702daf56006"}, + {file = "pydantic_core-2.20.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:2d036c7187b9422ae5b262badb87a20a49eb6c5238b2004e96d4da1231badef1"}, + {file = "pydantic_core-2.20.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:9ebfef07dbe1d93efb94b4700f2d278494e9162565a54f124c404a5656d7ff09"}, + {file = "pydantic_core-2.20.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:6b9d9bb600328a1ce523ab4f454859e9d439150abb0906c5a1983c146580ebab"}, + {file = "pydantic_core-2.20.1-cp39-none-win32.whl", hash = "sha256:784c1214cb6dd1e3b15dd8b91b9a53852aed16671cc3fbe4786f4f1db07089e2"}, + {file = "pydantic_core-2.20.1-cp39-none-win_amd64.whl", hash = "sha256:d2fe69c5434391727efa54b47a1e7986bb0186e72a41b203df8f5b0a19a4f669"}, + {file = "pydantic_core-2.20.1-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:a45f84b09ac9c3d35dfcf6a27fd0634d30d183205230a0ebe8373a0e8cfa0906"}, + {file = "pydantic_core-2.20.1-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:d02a72df14dfdbaf228424573a07af10637bd490f0901cee872c4f434a735b94"}, + {file = "pydantic_core-2.20.1-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d2b27e6af28f07e2f195552b37d7d66b150adbaa39a6d327766ffd695799780f"}, + {file = "pydantic_core-2.20.1-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:084659fac3c83fd674596612aeff6041a18402f1e1bc19ca39e417d554468482"}, + {file = "pydantic_core-2.20.1-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:242b8feb3c493ab78be289c034a1f659e8826e2233786e36f2893a950a719bb6"}, + {file = "pydantic_core-2.20.1-pp310-pypy310_pp73-musllinux_1_1_aarch64.whl", hash = "sha256:38cf1c40a921d05c5edc61a785c0ddb4bed67827069f535d794ce6bcded919fc"}, + {file = "pydantic_core-2.20.1-pp310-pypy310_pp73-musllinux_1_1_x86_64.whl", hash = "sha256:e0bbdd76ce9aa5d4209d65f2b27fc6e5ef1312ae6c5333c26db3f5ade53a1e99"}, + {file = "pydantic_core-2.20.1-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:254ec27fdb5b1ee60684f91683be95e5133c994cc54e86a0b0963afa25c8f8a6"}, + {file = "pydantic_core-2.20.1-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:407653af5617f0757261ae249d3fba09504d7a71ab36ac057c938572d1bc9331"}, + {file = "pydantic_core-2.20.1-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:c693e916709c2465b02ca0ad7b387c4f8423d1db7b4649c551f27a529181c5ad"}, + {file = "pydantic_core-2.20.1-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5b5ff4911aea936a47d9376fd3ab17e970cc543d1b68921886e7f64bd28308d1"}, + {file = "pydantic_core-2.20.1-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:177f55a886d74f1808763976ac4efd29b7ed15c69f4d838bbd74d9d09cf6fa86"}, + {file = "pydantic_core-2.20.1-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:964faa8a861d2664f0c7ab0c181af0bea66098b1919439815ca8803ef136fc4e"}, + {file = "pydantic_core-2.20.1-pp39-pypy39_pp73-musllinux_1_1_aarch64.whl", hash = "sha256:4dd484681c15e6b9a977c785a345d3e378d72678fd5f1f3c0509608da24f2ac0"}, + {file = "pydantic_core-2.20.1-pp39-pypy39_pp73-musllinux_1_1_x86_64.whl", hash = "sha256:f6d6cff3538391e8486a431569b77921adfcdef14eb18fbf19b7c0a5294d4e6a"}, + {file = "pydantic_core-2.20.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:a6d511cc297ff0883bc3708b465ff82d7560193169a8b93260f74ecb0a5e08a7"}, + {file = "pydantic_core-2.20.1.tar.gz", hash = "sha256:26ca695eeee5f9f1aeeb211ffc12f10bcb6f71e2989988fda61dabd65db878d4"}, +] + +[package.dependencies] +typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" + +[[package]] +name = "pyyaml" +version = "6.0.1" +description = "YAML parser and emitter for Python" +optional = false +python-versions = ">=3.6" +files = [ + {file = "PyYAML-6.0.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d858aa552c999bc8a8d57426ed01e40bef403cd8ccdd0fc5f6f04a00414cac2a"}, + {file = "PyYAML-6.0.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:fd66fc5d0da6d9815ba2cebeb4205f95818ff4b79c3ebe268e75d961704af52f"}, + {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"}, + {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"}, + {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"}, + {file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"}, + {file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"}, + {file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"}, + {file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"}, + {file = "PyYAML-6.0.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:f003ed9ad21d6a4713f0a9b5a7a0a79e08dd0f221aff4525a2be4c346ee60aab"}, + {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"}, + {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"}, + {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"}, + {file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"}, + {file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"}, + {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, + {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, + {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, + {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, + {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, + {file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"}, + {file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"}, + {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"}, + {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"}, + {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:afd7e57eddb1a54f0f1a974bc4391af8bcce0b444685d936840f125cf046d5bd"}, + {file = "PyYAML-6.0.1-cp36-cp36m-win32.whl", hash = "sha256:fca0e3a251908a499833aa292323f32437106001d436eca0e6e7833256674585"}, + {file = "PyYAML-6.0.1-cp36-cp36m-win_amd64.whl", hash = "sha256:f22ac1c3cac4dbc50079e965eba2c1058622631e526bd9afd45fedd49ba781fa"}, + {file = "PyYAML-6.0.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:b1275ad35a5d18c62a7220633c913e1b42d44b46ee12554e5fd39c70a243d6a3"}, + {file = "PyYAML-6.0.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:18aeb1bf9a78867dc38b259769503436b7c72f7a1f1f4c93ff9a17de54319b27"}, + {file = "PyYAML-6.0.1-cp37-cp37m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:596106435fa6ad000c2991a98fa58eeb8656ef2325d7e158344fb33864ed87e3"}, + {file = "PyYAML-6.0.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:baa90d3f661d43131ca170712d903e6295d1f7a0f595074f151c0aed377c9b9c"}, + {file = "PyYAML-6.0.1-cp37-cp37m-win32.whl", hash = "sha256:9046c58c4395dff28dd494285c82ba00b546adfc7ef001486fbf0324bc174fba"}, + {file = "PyYAML-6.0.1-cp37-cp37m-win_amd64.whl", hash = "sha256:4fb147e7a67ef577a588a0e2c17b6db51dda102c71de36f8549b6816a96e1867"}, + {file = "PyYAML-6.0.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:1d4c7e777c441b20e32f52bd377e0c409713e8bb1386e1099c2415f26e479595"}, + {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"}, + {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"}, + {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"}, + {file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"}, + {file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"}, + {file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"}, + {file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"}, + {file = "PyYAML-6.0.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c8098ddcc2a85b61647b2590f825f3db38891662cfc2fc776415143f599bb859"}, + {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"}, + {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"}, + {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"}, + {file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"}, + {file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"}, + {file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"}, + {file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"}, +] + +[[package]] +name = "typing-extensions" +version = "4.12.2" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +files = [ + {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, + {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, +] + +[metadata] +lock-version = "2.0" +python-versions = "^3.8" +content-hash = "cff8e570c3ee81ef8ffdc352574c282aaf1a51261d4d57cd51985cbd2d9fcdda" diff --git a/examples/subscribe-topics-by-pattern/pyproject.toml b/examples/subscribe-topics-by-pattern/pyproject.toml new file mode 100644 index 00000000..6478f4b0 --- /dev/null +++ b/examples/subscribe-topics-by-pattern/pyproject.toml @@ -0,0 +1,19 @@ +[tool.poetry] +name = "subscribe-topics-by-pattern" +version = "0.1.0" +description = "" +authors = ["Marcos Schroh "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.8" +aiorun = "^2024.5.1" +kstreams = { path = "../../.", develop = true } + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + + +[tool.poetry.scripts] +app = "subscribe_topics_by_pattern.app:main" diff --git a/examples/subscribe-topics-by-pattern/subscribe_topics_by_pattern/__init__.py b/examples/subscribe-topics-by-pattern/subscribe_topics_by_pattern/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/subscribe-topics-by-pattern/subscribe_topics_by_pattern/app.py b/examples/subscribe-topics-by-pattern/subscribe_topics_by_pattern/app.py new file mode 100644 index 00000000..fda06f82 --- /dev/null +++ b/examples/subscribe-topics-by-pattern/subscribe_topics_by_pattern/app.py @@ -0,0 +1,51 @@ +import asyncio +import logging + +import aiorun + +import kstreams + +logger = logging.getLogger(__name__) + +stream_engine = kstreams.create_engine(title="my-stream-engine") + +customer_invoice_topic = "local--customer-invoice" +customer_profile_topic = "local--customer-profile" +invoice_event = b"invoice-1" +profile_event = b"profile-1" + + +@stream_engine.stream( + topics="^local--customer-.*$", + subscribe_by_pattern=True, + group_id="topics-by-pattern-group", +) +async def stream(cr: kstreams.ConsumerRecord): + if cr.topic == customer_invoice_topic: + assert cr.value == invoice_event + elif cr.topic == customer_profile_topic: + assert cr.value == profile_event + else: + raise ValueError(f"Invalid topic {cr.topic}") + + logger.info(f"Event {cr.value} consumed from topic {cr.topic}") + + +@stream_engine.after_startup +async def produce(): + for event_number in range(0, 2): + await stream_engine.send(customer_invoice_topic, value=invoice_event) + await stream_engine.send(customer_profile_topic, value=profile_event) + + +async def start(): + await stream_engine.start() + + +async def stop(loop: asyncio.AbstractEventLoop): + await stream_engine.stop() + + +def main(): + logging.basicConfig(level=logging.INFO) + aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=stop) diff --git a/kstreams/engine.py b/kstreams/engine.py index d59a5371..36d6e8a8 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -384,6 +384,7 @@ def stream( initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None, rebalance_listener: typing.Optional[RebalanceListener] = None, middlewares: typing.Optional[typing.List[Middleware]] = None, + subscribe_by_pattern: bool = False, **kwargs, ) -> typing.Callable[[StreamFunc], Stream]: def decorator(func: StreamFunc) -> Stream: @@ -394,6 +395,7 @@ def decorator(func: StreamFunc) -> Stream: initial_offsets=initial_offsets, rebalance_listener=rebalance_listener, middlewares=middlewares, + subscribe_by_pattern=subscribe_by_pattern, **kwargs, )(func) self.add_stream(stream_from_func) diff --git a/kstreams/streams.py b/kstreams/streams.py index 256a613c..50bd5001 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -34,6 +34,7 @@ class Stream: Attributes: name str: Stream name topics List[str]: List of topics to consume + subscribe_by_pattern bool: Whether subscribe to topics by pattern backend kstreams.backends.Kafka: backend kstreams.backends.kafka.Kafka: Backend to connect. Default `Kafka` func Callable[["Stream"], Awaitable[Any]]: Coroutine fucntion or generator @@ -48,29 +49,93 @@ class Stream: rebalance_listener kstreams.rebalance_listener.RebalanceListener: Listener callbacks when partition are assigned or revoked + ## Subscribe to a topic + !!! Example - ```python title="Usage" + ```python import aiorun from kstreams import create_engine, ConsumerRecord stream_engine = create_engine(title="my-stream-engine") - # here you can add any other AIOKafkaConsumer config - @stream_engine.stream("local--kstreams", group_id="de-my-partition") + @stream_engine.stream("local--kstreams", group_id="my-group-id") async def stream(cr: ConsumerRecord) -> None: print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}") async def start(): await stream_engine.start() - await produce() async def shutdown(loop): await stream_engine.stop() + if __name__ == "__main__": + aiorun.run( + start(), + stop_on_unhandled_errors=True, + shutdown_callback=shutdown + ) + ``` + + ## Subscribe to multiple topics + + Consuming from multiple topics using one `stream` is possible. A `List[str]` + of topics must be provided. + + !!! Example + ```python + import aiorun + from kstreams import create_engine, ConsumerRecord + + stream_engine = create_engine(title="my-stream-engine") + + + @stream_engine.stream( + ["local--kstreams", "local--hello-world"], + group_id="my-group-id", + ) + async def consume(cr: ConsumerRecord) -> None: + print(f"Event from {cr.topic}: headers: {cr.headers}, payload: {cr.value}") + ``` + + ## Subscribe to topics by pattern + + In the following example the stream will subscribe to any topic that matches + the regex `^dev--customer-.*`, for example `dev--customer-invoice` or + `dev--customer-profile`. The `subscribe_by_pattern` flag must be set to `True`. + + !!! Example + ```python + import aiorun + from kstreams import create_engine, ConsumerRecord + + stream_engine = create_engine(title="my-stream-engine") + + + @stream_engine.stream( + topics="^dev--customer-.*$", + subscribe_by_pattern=True, + group_id="my-group-id", + ) + async def stream(cr: ConsumerRecord) -> None: + if cr.topic == "dev--customer-invoice": + print("Event from topic dev--customer-invoice" + elif cr.topic == "dev--customer-profile": + print("Event from topic dev--customer-profile" + else: + raise ValueError(f"Invalid topic {cr.topic}") + + + async def start(): + await stream_engine.start() + + async def shutdown(loop): + await stream_engine.stop() + + if __name__ == "__main__": aiorun.run( start(), @@ -84,6 +149,7 @@ def __init__( self, topics: typing.Union[typing.List[str], str], *, + subscribe_by_pattern: bool = False, func: StreamFunc, backend: typing.Optional[Kafka] = None, consumer_class: typing.Type[Consumer] = Consumer, @@ -108,11 +174,8 @@ def __init__( self.rebalance_listener = rebalance_listener self.middlewares = middlewares or [] self.udf_handler = UdfHandler(handler=func, stream=self) - - # aiokafka expects topic names as arguments, meaning that - # can receive N topics -> N arguments, - # so we always create a list and then we expand it with *topics self.topics = [topics] if isinstance(topics, str) else topics + self.subscribe_by_pattern = subscribe_by_pattern def _create_consumer(self) -> Consumer: if self.backend is None: @@ -136,11 +199,49 @@ async def stop(self) -> None: ) def subscribe(self) -> None: + """ + Create Consumer and subscribe to topics + + Subsciptions uses cases: + + Case 1: + self.topics Topics is a List, which means that the end user wants to + subscribe to multiple topics explicitly: + + Stream(topics=["local--hello-kpn", "local--hello-kpn-2", ...], ...) + + Case 2: + self.topics is a string which represents a single topic (explicit) + to subscribe: + + Stream(topics="local--hello-kpn", ...) + + It is also possible to use the `subscribe_by_pattern` but in practice + it does not have any difference because the pattern will match + explicitly the topic name: + + Stream(topics="local--hello-kpn", subscribe_by_pattern=True, ...) + + Case 3: + self.topics is a pattern, then we subscribe to N number + of topics. The flag `self.subscribe_by_pattern` must be True + + Stream(topics="^dev--customer-.*$", subscribe_by_pattern=True, ...) + + It is important to notice that in `aiokafka` both `topics` and `pattern` + can not be used at the same time, so when calling self.consumer.subscribe(...) + we set only one of them according to the flag `self.subscribe_by_pattern`. + """ + if self.consumer is None: # Only create a consumer if it was not previously created self.consumer = self._create_consumer() - self.consumer.subscribe(topics=self.topics, listener=self.rebalance_listener) + self.consumer.subscribe( + topics=self.topics if not self.subscribe_by_pattern else None, + listener=self.rebalance_listener, + pattern=self.topics[0] if self.subscribe_by_pattern else None, + ) async def commit( self, offsets: typing.Optional[typing.Dict[TopicPartition, int]] = None @@ -350,6 +451,7 @@ async def consume(cr: ConsumerRecord, stream: Stream): def stream( topics: typing.Union[typing.List[str], str], *, + subscribe_by_pattern: bool = False, name: typing.Optional[str] = None, deserializer: typing.Optional[Deserializer] = None, initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None, @@ -366,6 +468,7 @@ def decorator(func: StreamFunc) -> Stream: initial_offsets=initial_offsets, rebalance_listener=rebalance_listener, middlewares=middlewares, + subscribe_by_pattern=subscribe_by_pattern, config=kwargs, ) update_wrapper(s, func) diff --git a/kstreams/test_utils/test_clients.py b/kstreams/test_utils/test_clients.py index 043c0341..901b7f5f 100644 --- a/kstreams/test_utils/test_clients.py +++ b/kstreams/test_utils/test_clients.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Any, Coroutine, Dict, List, Optional, Set, Tuple +from typing import Any, Coroutine, Dict, List, Optional, Sequence, Set from kstreams import ConsumerRecord, RebalanceListener, TopicPartition from kstreams.clients import Consumer, Producer @@ -66,7 +66,7 @@ class TestConsumer(Base, Consumer): def __init__(self, group_id: Optional[str] = None, **kwargs) -> None: # copy the aiokafka behavior - self.topics: Optional[Tuple[str]] = None + self.topics: Optional[Sequence[str]] = None self._group_id: Optional[str] = group_id self._assignment: List[TopicPartition] = [] self.partitions_committed: Dict[TopicPartition, int] = {} @@ -78,11 +78,16 @@ def __init__(self, group_id: Optional[str] = None, **kwargs) -> None: def subscribe( self, *, - topics: Tuple[str], + topics: Optional[Sequence[str]] = None, listener: RebalanceListener, - **kwargs, + pattern: Optional[str] = None, ) -> None: self.topics = topics + if topics is None: + # then it is a pattern subscription, we need to get the current + # topics (pre created) from the topic manager + assert pattern + topics = TopicManager.get_topics_by_pattern(pattern=pattern) for topic_name in topics: topic, created = TopicManager.get_or_create(topic_name, consumer=self) diff --git a/kstreams/test_utils/test_utils.py b/kstreams/test_utils/test_utils.py index 2b0b6c4f..34161a3e 100644 --- a/kstreams/test_utils/test_utils.py +++ b/kstreams/test_utils/test_utils.py @@ -65,6 +65,8 @@ def __init__( if not monitoring_enabled: self.stream_engine.monitor = TestMonitor() + self.create_extra_topics() + def mock_streams(self) -> None: streams: List[Stream] = self.stream_engine._streams for stream in streams: @@ -83,7 +85,6 @@ def create_extra_topics(self) -> None: async def start(self) -> None: self.setup_mocks() - self.create_extra_topics() await self.stream_engine.start() async def stop(self) -> None: diff --git a/kstreams/test_utils/topics.py b/kstreams/test_utils/topics.py index f5984861..fbfb10da 100644 --- a/kstreams/test_utils/topics.py +++ b/kstreams/test_utils/topics.py @@ -1,7 +1,8 @@ import asyncio +import re from collections import defaultdict from dataclasses import dataclass, field -from typing import ClassVar, DefaultDict, Dict, Optional, Tuple +from typing import ClassVar, DefaultDict, Dict, Optional, Sequence, Tuple from kstreams import ConsumerRecord @@ -91,6 +92,16 @@ def get(cls, name: str) -> Topic: "and the topic has events." ) + @classmethod + def get_topics_by_pattern(cls, pattern: str) -> Sequence[str]: + compile_expression = re.compile(pattern) + + return tuple( + topic_name + for topic_name in cls.topics + if compile_expression.match(topic_name) + ) + @classmethod def create( cls, name: str, consumer: Optional["test_clients.Consumer"] = None diff --git a/scripts/cluster/start b/scripts/cluster/start index 0146abd4..a96277f2 100755 --- a/scripts/cluster/start +++ b/scripts/cluster/start @@ -7,5 +7,7 @@ scripts/cluster/topics/create "local--sse" scripts/cluster/topics/create "local--avro-user" scripts/cluster/topics/create "local--avro-address" scripts/cluster/topics/create "kstreams--dlq-topic" +scripts/cluster/topics/create "local--customer-invoice" +scripts/cluster/topics/create "local--customer-profile" scripts/cluster/logs diff --git a/tests/__init__.py b/tests/__init__.py index e69de29b..81a971ca 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,7 @@ +import asyncio +import sys + +if sys.version_info < (3, 11): + TimeoutErrorException = asyncio.TimeoutError +else: + TimeoutErrorException = TimeoutError diff --git a/tests/test_client.py b/tests/test_client.py index 49a81a73..c79636a9 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,6 +1,6 @@ import asyncio +import contextlib import importlib -import sys from typing import Set from unittest.mock import Mock, call @@ -14,11 +14,7 @@ TestStreamClient, TopicManager, ) - -if sys.version_info < (3, 11): - TimeoutErrorException = asyncio.TimeoutError -else: - TimeoutErrorException = TimeoutError +from tests import TimeoutErrorException topic = "local--kstreams-consumer" tp0 = TopicPartition(topic=topic, partition=0) @@ -204,6 +200,40 @@ async def consume(stream): assert metadata.offset == 0 +@pytest.mark.asyncio +async def test_consume_events_topics_by_pattern(stream_engine: StreamEngine): + """ + This test shows the possibility to subscribe to multiple topics using a pattern + """ + pattern = "^dev--customer-.*$" + customer_invoice_topic = "dev--customer-invoice" + customer_profile_topic = "dev--customer-profile" + invoice_event = b"invoice-1" + profile_event = b"profile-1" + customer_id = "1" + + client = TestStreamClient( + stream_engine, topics=[customer_invoice_topic, customer_profile_topic] + ) + + @stream_engine.stream(topics=pattern, subscribe_by_pattern=True) + async def stream(cr: ConsumerRecord): + if cr.topic == customer_invoice_topic: + assert cr.value == invoice_event + elif cr.topic == customer_profile_topic: + assert cr.value == profile_event + else: + raise ValueError(f"Invalid topic {cr.topic}") + + async with client: + await client.send(customer_invoice_topic, value=invoice_event, key=customer_id) + await client.send(customer_profile_topic, value=profile_event, key=customer_id) + + # give some time to consume all the events + await asyncio.sleep(0.1) + assert TopicManager.all_messages_consumed() + + @pytest.mark.asyncio async def test_topic_created(stream_engine: StreamEngine): topic_name = "local--kstreams" @@ -540,12 +570,10 @@ async def func_stream(consumer: Stream): ) stream_engine.add_stream(stream) - try: + with contextlib.suppress(TimeoutErrorException): # now it is possible to run a stream directly, so we need - # to stop the `forever` consumetion + # to stop the `forever` consumption await asyncio.wait_for(stream.start(), timeout=1.0) - except TimeoutErrorException: - ... # simulate partitions assigned on rebalance await stream.rebalance_listener.on_partitions_assigned(assigned=assignments) diff --git a/tests/test_stream_engine.py b/tests/test_stream_engine.py index 0fc1257b..e14986d9 100644 --- a/tests/test_stream_engine.py +++ b/tests/test_stream_engine.py @@ -38,19 +38,6 @@ async def my_stream(_): pass -@pytest.mark.asyncio -async def test_add_stream_multiple_topics(stream_engine: StreamEngine): - topics = ["local--hello-kpn", "local--hello-kpn-2"] - - @stream_engine.stream(topics, name="my-stream") - async def stream(_): - pass - - stream_instance = stream_engine.get_stream("my-stream") - assert stream_instance == stream - assert stream_instance.topics == topics - - @pytest.mark.asyncio async def test_add_stream_as_instance(stream_engine: StreamEngine): topics = ["local--hello-kpn", "local--hello-kpn-2"] diff --git a/tests/test_streams.py b/tests/test_streams.py index c8a36271..cbd45e39 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -1,4 +1,5 @@ import asyncio +import contextlib from typing import Callable, Set from unittest import mock @@ -9,6 +10,115 @@ from kstreams.engine import Stream, StreamEngine from kstreams.streams import stream from kstreams.structs import TopicPartitionOffset +from tests import TimeoutErrorException + + +@pytest.mark.asyncio +async def test_stream(stream_engine: StreamEngine, consumer_record_factory): + topic_name = "local--kstreams" + stream_name = "my-stream" + value = b"test" + + async def getone(_): + return consumer_record_factory(value=value) + + with mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + getone=getone, + ): + + @stream_engine.stream(topic_name, name=stream_name) + async def stream(cr: ConsumerRecord): + assert cr.value == value + await asyncio.sleep(0.2) + + assert stream.consumer is None + assert stream.topics == [topic_name] + + with contextlib.suppress(TimeoutErrorException): + # now it is possible to run a stream directly, so we need + # to stop the `forever` consumption + await asyncio.wait_for(stream.start(), timeout=0.1) + + assert stream.consumer + Consumer.subscribe.assert_called_once_with( + topics=[topic_name], listener=stream.rebalance_listener, pattern=None + ) + await stream.stop() + + +@pytest.mark.asyncio +async def test_stream_multiple_topics(stream_engine: StreamEngine): + topics = ["local--hello-kpn", "local--hello-kpn-2"] + + with mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + ): + + @stream_engine.stream(topics, name="my-stream") + async def stream(_): + ... + + assert stream.topics == topics + + await stream.start() + Consumer.subscribe.assert_called_once_with( + topics=topics, listener=stream.rebalance_listener, pattern=None + ) + + +@pytest.mark.asyncio +async def test_stream_subscribe_topics_pattern(stream_engine: StreamEngine): + pattern = "^dev--customer-.*$" + + with mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + ): + + @stream_engine.stream(topics=pattern, subscribe_by_pattern=True) + async def stream(_): + ... + + assert stream.topics == [pattern] + assert stream.subscribe_by_pattern + + await stream.start() + Consumer.subscribe.assert_called_once_with( + topics=None, listener=stream.rebalance_listener, pattern=pattern + ) + + +@pytest.mark.asyncio +async def test_stream_subscribe_topics_only_one_pattern(stream_engine: StreamEngine): + """ + We can use only one pattern, so we use the first one + """ + patterns = ["^dev--customer-.*$", "^acc--customer-.*$"] + + with mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + getone=mock.DEFAULT, + ): + + @stream_engine.stream(topics=patterns, subscribe_by_pattern=True) + async def stream(_): + ... + + assert stream.topics == patterns + assert stream.subscribe_by_pattern + + await stream.start() + Consumer.subscribe.assert_called_once_with( + topics=None, listener=stream.rebalance_listener, pattern=patterns[0] + ) @pytest.mark.asyncio @@ -22,17 +132,16 @@ async def test_stream_custom_conf(stream_engine: StreamEngine): async def stream(_): ... - stream_instance = stream_engine.get_stream("stream-hello-kpn") - - with mock.patch.multiple(Consumer, start=mock.DEFAULT, stop=mock.DEFAULT): - with mock.patch.multiple(Producer, start=mock.DEFAULT, stop=mock.DEFAULT): - await stream_engine.start_streams() + with mock.patch.multiple( + Consumer, start=mock.DEFAULT, stop=mock.DEFAULT + ), mock.patch.multiple(Producer, start=mock.DEFAULT, stop=mock.DEFAULT): + await stream_engine.start_streams() - # switch the current Task to the one running in background - await asyncio.sleep(0.1) + # switch the current Task to the one running in background + await asyncio.sleep(0.1) - assert stream_instance.consumer._auto_offset_reset == "earliest" - assert not stream_instance.consumer._enable_auto_commit + assert stream.consumer._auto_offset_reset == "earliest" + assert not stream.consumer._enable_auto_commit @pytest.mark.asyncio