Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/di class #260

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
{
"machine_info": {
"node": "Woile-MacBook-Pro.local",
"processor": "arm",
"machine": "arm64",
"python_compiler": "Clang 16.0.6 ",
"python_implementation": "CPython",
"python_implementation_version": "3.11.10",
"python_version": "3.11.10",
"python_build": [
"main",
"Sep 7 2024 01:03:31"
],
"release": "24.2.0",
"system": "Darwin",
"cpu": {
"python_version": "3.11.10.final.0 (64 bit)",
"cpuinfo_version": [
9,
0,
0
],
"cpuinfo_version_string": "9.0.0",
"arch": "ARM_8",
"bits": 64,
"count": 12,
"arch_string_raw": "arm64",
"brand_raw": "Apple M3 Pro"
}
},
"commit_info": {
"id": "ce85f3617c6291590250df4dd5247280067a4773",
"time": "2024-12-30T18:30:06+01:00",
"author_time": "2024-12-30T18:30:06+01:00",
"dirty": true,
"project": "kstreams",
"branch": "feat/di-class"
},
"benchmarks": [
{
"group": null,
"name": "test_startup_and_processing_single_consumer_record",
"fullname": "tests/test_benchmarks.py::test_startup_and_processing_single_consumer_record",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 4.329101648181677e-05,
"max": 0.010624374961480498,
"mean": 0.00010476869028665672,
"stddev": 0.00014913532189803988,
"rounds": 5273,
"median": 0.0001015830785036087,
"iqr": 5.8636273024603724e-05,
"q1": 7.308297790586948e-05,
"q3": 0.0001317192509304732,
"iqr_outliers": 11,
"stddev_outliers": 11,
"outliers": "11;11",
"ld15iqr": 4.329101648181677e-05,
"hd15iqr": 0.0002988340565934777,
"ops": 9544.836317643263,
"total": 0.5524453038815409,
"iterations": 1
}
},
{
"group": null,
"name": "test_startup_and_inject_all",
"fullname": "tests/test_benchmarks.py::test_startup_and_inject_all",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 4.475004971027374e-05,
"max": 0.019209666061215103,
"mean": 0.00021860343115481218,
"stddev": 0.00027677358158200696,
"rounds": 14520,
"median": 0.00020762498024851084,
"iqr": 0.0001650420017540455,
"q1": 0.0001278329873457551,
"q3": 0.0002928749890998006,
"iqr_outliers": 37,
"stddev_outliers": 46,
"outliers": "46;37",
"ld15iqr": 4.475004971027374e-05,
"hd15iqr": 0.0005416660569608212,
"ops": 4574.493614840898,
"total": 3.1741218203678727,
"iterations": 1
}
},
{
"group": null,
"name": "test_consume_many",
"fullname": "tests/test_benchmarks.py::test_consume_many",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 0.0006425828905776143,
"max": 0.0007407079683616757,
"mean": 0.0006802422760321444,
"stddev": 2.059963208525067e-05,
"rounds": 1207,
"median": 0.0006902500754222274,
"iqr": 3.9405771531164646e-05,
"q1": 0.0006585522787645459,
"q3": 0.0006979580502957106,
"iqr_outliers": 0,
"stddev_outliers": 496,
"outliers": "496;0",
"ld15iqr": 0.0006425828905776143,
"hd15iqr": 0.0007407079683616757,
"ops": 1470.064468549358,
"total": 0.8210524271707982,
"iterations": 1
}
}
],
"datetime": "2024-12-30T17:33:48.146167+00:00",
"version": "5.1.0"
}
13 changes: 1 addition & 12 deletions .github/workflows/bench-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,8 @@ jobs:
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: '3.13'
python-version: '3.10'
architecture: x64
- name: Set Cache
uses: actions/cache@v4
id: cache # name for referring later
with:
path: .venv/
# The cache key depends on poetry.lock
key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }}
restore-keys: |
${{ runner.os }}-cache-
${{ runner.os }}-

- name: Install Dependencies
# if: steps.cache.outputs.cache-hit != 'true'
run: |
Expand Down
12 changes: 1 addition & 11 deletions .github/workflows/pr-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,8 @@ jobs:
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: '3.13'
python-version: '3.10'
architecture: x64
- name: Set Cache
uses: actions/cache@v4
id: cache # name for referring later
with:
path: .venv/
# The cache key depends on poetry.lock
key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }}
restore-keys: |
${{ runner.os }}-cache-
${{ runner.os }}-
- name: Install Dependencies
# if: steps.cache.outputs.cache-hit != 'true'
run: |
Expand Down
59 changes: 48 additions & 11 deletions kstreams/middleware/udf_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,66 @@
return await async_gen.__anext__()


class UdfParam(typing.NamedTuple):
annotation: type
args: typing.Tuple[typing.Any]
is_generic: bool = False


def build_params(signature: inspect.Signature) -> typing.List[UdfParam]:
return [

Check warning on line 24 in kstreams/middleware/udf_middleware.py

View check run for this annotation

Codecov / codecov/patch

kstreams/middleware/udf_middleware.py#L24

Added line #L24 was not covered by tests
UdfParam(
typing.get_origin(param.annotation) or param.annotation,
typing.get_args(param.annotation),
typing.get_origin(param.annotation) is not None,
)
for param in signature.parameters.values()
]


class UdfHandler(BaseMiddleware):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
signature = inspect.signature(self.next_call)
self.params: typing.List[typing.Any] = [
typing.get_origin(param.annotation) or param.annotation
self.params: typing.List[UdfParam] = [
UdfParam(
typing.get_origin(param.annotation) or param.annotation,
typing.get_args(param.annotation),
typing.get_origin(param.annotation) is not None,
)
for param in signature.parameters.values()
]
self.type: UDFType = setup_type(self.params)

def get_type(self) -> UDFType:
return self.type
self.type: UDFType = setup_type([p.annotation for p in self.params])

def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List:
# NOTE: When `no typing` support is deprecated then this can
# be more eficient as the CR will be always there.
ANNOTATIONS_TO_PARAMS = {
types.ConsumerRecord: cr,
self.ANNOTATIONS_TO_PARAMS: dict[type, typing.Any] = {
types.ConsumerRecord: None,
Stream: self.stream,
types.Send: self.send,
}

return [ANNOTATIONS_TO_PARAMS[param_type] for param_type in self.params]
def get_type(self) -> UDFType:
"""Used by the stream_engine to know whether to call this middleware or not."""
return self.type

def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List:
self.ANNOTATIONS_TO_PARAMS[types.ConsumerRecord] = cr

args = []
for param in self.params:
if param.annotation is types.ConsumerRecord and param.is_generic:
if len(param.args) == 2:
cr_type = param.args[1]

# Check if it's compatible with a pydantic model
if hasattr(cr_type, "model_validate"):
pydantic_value = cr_type.model_validate(cr.value)
self.ANNOTATIONS_TO_PARAMS[types.ConsumerRecord] = cr._replace(
value=pydantic_value
)
args.append(self.ANNOTATIONS_TO_PARAMS[param.annotation])

return args

async def __call__(self, cr: types.ConsumerRecord) -> typing.Any:
"""
Expand Down
2 changes: 1 addition & 1 deletion kstreams/streams_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# NOTE: remove this module when Stream with `no typing` support is deprecated


def setup_type(params: List[inspect.Parameter]) -> UDFType:
def setup_type(params: List[type]) -> UDFType:
"""
Inspect the user defined function (coroutine) to get the proper way to call it

Expand Down
19 changes: 19 additions & 0 deletions kstreams/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ def __call__(

@dataclass
class ConsumerRecord(typing.Generic[KT, VT]):
"""
ConsumerRecord represents a record received from a Kafka topic.

Attributes:
topic (str): The topic this record is received from.
partition (int): The partition from which this record is received.
offset (int): The position of this record in the corresponding Kafka partition.
timestamp (int): The timestamp of this record.
timestamp_type (int): The timestamp type of this record.
key (Optional[KT]): The key (or `None` if no key is specified).
value (Optional[VT]): The value.
checksum (Optional[int]): Deprecated.
serialized_key_size (int): The size of the serialized,
uncompressed key in bytes.
serialized_value_size (int): The size of the serialized,
uncompressed value in bytes.
headers (EncodedHeaders): The headers.
"""

topic: str
"The topic this record is received from"

Expand Down
60 changes: 60 additions & 0 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from unittest import mock

import pytest
from pydantic import BaseModel

from kstreams import ConsumerRecord, Send, TopicPartition
from kstreams.clients import Consumer, Producer
Expand Down Expand Up @@ -124,6 +125,65 @@ async def stream(cr: ConsumerRecord[str, bytes]):
await stream.stop()


@pytest.mark.asyncio
async def test_stream_generic_cr_with_pydantic_type(
stream_engine: StreamEngine, consumer_record_factory
):
"""Allow to use Pydantic models as generic types in ConsumerRecord.

```python
class Customer(BaseModel):
id: int

@stream_engine.stream("local--kstreams")
async def stream(cr: ConsumerRecord[str, Customer]):
assert cr.value.id == 1
```
"""

class Profile(BaseModel):
name: str

class Customer(BaseModel):
id: int
profile: Profile

data = {"id": 1, "profile": {"name": "John"}}
topic_name = "local--kstreams"
value = "John"

async def getone(_):
return consumer_record_factory(value=data)

with mock.patch.multiple(
Consumer,
start=mock.DEFAULT,
subscribe=mock.DEFAULT,
getone=getone,
):

@stream_engine.stream(topic_name)
async def stream(cr: ConsumerRecord[str, Customer]):
if cr.value is None:
raise ValueError("Value is None")
assert cr.value.profile.name == value
await asyncio.sleep(0.1)

assert stream.consumer is None
assert stream.topics == [topic_name]

with contextlib.suppress(TimeoutErrorException):
# now it is possible to run a stream directly, so we need
# to stop the `forever` consumption
await asyncio.wait_for(stream.start(), timeout=0.1)

assert stream.consumer
Consumer.subscribe.assert_called_once_with(
topics=[topic_name], listener=stream.rebalance_listener, pattern=None
)
await stream.stop()


@pytest.mark.asyncio
async def test_stream_cr_and_stream_with_typing(
stream_engine: StreamEngine, consumer_record_factory
Expand Down
Loading