Skip to content

Commit

Permalink
Merge pull request #248 from kpn/fix/generic-consumer-record
Browse files Browse the repository at this point in the history
Fix/generic consumer record
  • Loading branch information
woile authored Nov 27, 2024
2 parents 3cafa58 + 122b4ca commit 3280be0
Show file tree
Hide file tree
Showing 16 changed files with 383 additions and 113 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
{
"machine_info": {
"node": "Woile-MacBook-Pro.local",
"processor": "arm",
"machine": "arm64",
"python_compiler": "Clang 16.0.6 ",
"python_implementation": "CPython",
"python_implementation_version": "3.11.10",
"python_version": "3.11.10",
"python_build": [
"main",
"Sep 7 2024 01:03:31"
],
"release": "24.1.0",
"system": "Darwin",
"cpu": {
"python_version": "3.11.10.final.0 (64 bit)",
"cpuinfo_version": [
9,
0,
0
],
"cpuinfo_version_string": "9.0.0",
"arch": "ARM_8",
"bits": 64,
"count": 12,
"arch_string_raw": "arm64",
"brand_raw": "Apple M3 Pro"
}
},
"commit_info": {
"id": "81eb783e807835e049815241011f694a54404471",
"time": "2024-11-27T14:40:28+01:00",
"author_time": "2024-11-27T14:40:28+01:00",
"dirty": true,
"project": "kstreams",
"branch": "fix/generic-consumer-record"
},
"benchmarks": [
{
"group": null,
"name": "test_startup_and_processing_single_consumer_record",
"fullname": "tests/test_benchmarks.py::test_startup_and_processing_single_consumer_record",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 4.199999966658652e-05,
"max": 0.012051916011841968,
"mean": 9.800166512286035e-05,
"stddev": 0.0001816612665091389,
"rounds": 4471,
"median": 9.42080223467201e-05,
"iqr": 4.929174610879272e-05,
"q1": 6.958325684536248e-05,
"q3": 0.0001188750029541552,
"iqr_outliers": 14,
"stddev_outliers": 9,
"outliers": "9;14",
"ld15iqr": 4.199999966658652e-05,
"hd15iqr": 0.00020029101870022714,
"ops": 10203.90825754179,
"total": 0.43816544476430863,
"iterations": 1
}
},
{
"group": null,
"name": "test_startup_and_inject_all",
"fullname": "tests/test_benchmarks.py::test_startup_and_inject_all",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 4.3625012040138245e-05,
"max": 0.01566049997927621,
"mean": 0.00022631162852908016,
"stddev": 0.0002073705067274279,
"rounds": 15019,
"median": 0.0002195000124629587,
"iqr": 0.0001750317678670399,
"q1": 0.00013429098180495203,
"q3": 0.0003093227496719919,
"iqr_outliers": 19,
"stddev_outliers": 133,
"outliers": "133;19",
"ld15iqr": 4.3625012040138245e-05,
"hd15iqr": 0.0005731249984819442,
"ops": 4418.685891217931,
"total": 3.398974348878255,
"iterations": 1
}
},
{
"group": null,
"name": "test_consume_many",
"fullname": "tests/test_benchmarks.py::test_consume_many",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 0.0006483749893959612,
"max": 0.0008417500066570938,
"mean": 0.0006934208655618783,
"stddev": 2.4773934466283123e-05,
"rounds": 943,
"median": 0.0006997500022407621,
"iqr": 3.609351551858708e-05,
"q1": 0.0006723854967276566,
"q3": 0.0007084790122462437,
"iqr_outliers": 16,
"stddev_outliers": 246,
"outliers": "246;16",
"ld15iqr": 0.0006483749893959612,
"hd15iqr": 0.0007627500162925571,
"ops": 1442.1256262453264,
"total": 0.6538958762248512,
"iterations": 1
}
}
],
"datetime": "2024-11-27T14:06:35.464861+00:00",
"version": "5.1.0"
}
4 changes: 2 additions & 2 deletions .github/workflows/bench-release.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Bump version
name: Benchmark latest release

on:
push:
Expand Down Expand Up @@ -46,5 +46,5 @@ jobs:
git config --global user.email "[email protected]"
git config --global user.name "GitHub Action"
git add .benchmarks/
git commit -m "bench: bench: add benchmark current release"
git commit -m "bench: current release"
git push origin master
41 changes: 32 additions & 9 deletions .github/workflows/pr-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ on:
required: true

jobs:
build_test_bench:
test:
runs-on: ubuntu-latest
strategy:
matrix:
Expand All @@ -31,7 +31,6 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
architecture: x64

- name: Set Cache
uses: actions/cache@v4
id: cache # name for referring later
Expand All @@ -42,29 +41,53 @@ jobs:
restore-keys: |
${{ runner.os }}-cache-
${{ runner.os }}-
- name: Install Dependencies
# if: steps.cache.outputs.cache-hit != 'true'
run: |
python -m pip install -U pip poetry
poetry --version
poetry config --local virtualenvs.in-project true
poetry install
- name: Test and Lint
run: |
git config --global user.email "[email protected]"
git config --global user.name "GitHub Action"
./scripts/test
- name: Benchmark regression test
run: |
./scripts/bench-compare
- name: Upload coverage to Codecov
uses: codecov/[email protected]
with:
file: ./coverage.xml
name: kstreams
fail_ci_if_error: true
token: ${{secrets.CODECOV_TOKEN}}
bench:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: '3.13'
architecture: x64
- name: Set Cache
uses: actions/cache@v4
id: cache # name for referring later
with:
path: .venv/
# The cache key depends on poetry.lock
key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }}
restore-keys: |
${{ runner.os }}-cache-
${{ runner.os }}-
- name: Install Dependencies
# if: steps.cache.outputs.cache-hit != 'true'
run: |
python -m pip install -U pip poetry
poetry --version
poetry config --local virtualenvs.in-project true
poetry install
- name: Benchmark regression test
run: |
./scripts/bench-compare
14 changes: 14 additions & 0 deletions kstreams/consts.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import enum

APPLICATION_X_AVRO_BINARY = "application/x-avro-binary"
APPLICATION_X_AVRO_JSON = "application/x-avro-json"
APPLICATION_X_JSON_SCHEMA = "application/x-json-schema"
Expand All @@ -11,3 +13,15 @@
APPLICATION_X_JSON_SCHEMA,
APPLICATION_JSON,
)


class UDFType(str, enum.Enum):
NO_TYPING = "NO_TYPING"
WITH_TYPING = "WITH_TYPING"


class StreamErrorPolicy(str, enum.Enum):
RESTART = "RESTART"
STOP = "STOP"
STOP_ENGINE = "STOP_ENGINE"
STOP_APPLICATION = "STOP_APPLICATION"
2 changes: 1 addition & 1 deletion kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .backends.kafka import Kafka
from .clients import Consumer, Producer
from .consts import StreamErrorPolicy, UDFType
from .exceptions import DuplicateStreamException, EngineNotStartedException
from .middleware import Middleware
from .middleware.udf_middleware import UdfHandler
Expand All @@ -17,7 +18,6 @@
from .serializers import Deserializer, Serializer
from .streams import Stream, StreamFunc
from .streams import stream as stream_func
from .streams_utils import StreamErrorPolicy, UDFType
from .types import Deprecated, EngineHooks, Headers, NextMiddlewareCall
from .utils import encode_headers, execute_hooks

Expand Down
24 changes: 22 additions & 2 deletions kstreams/middleware/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import typing

from kstreams import types
from kstreams.streams_utils import StreamErrorPolicy
from kstreams.consts import StreamErrorPolicy, UDFType

if typing.TYPE_CHECKING:
from kstreams import Stream, StreamEngine # pragma: no cover
Expand All @@ -14,6 +14,10 @@


class MiddlewareProtocol(typing.Protocol):
next_call: types.NextMiddlewareCall
send: types.Send
stream: "Stream"

def __init__(
self,
*,
Expand Down Expand Up @@ -45,6 +49,10 @@ def __repr__(self) -> str:


class BaseMiddleware:
next_call: types.NextMiddlewareCall
send: types.Send
stream: "Stream"

def __init__(
self,
*,
Expand Down Expand Up @@ -92,7 +100,7 @@ async def __call__(self, cr: types.ConsumerRecord) -> typing.Any:

async def cleanup_policy(self, exc: Exception) -> None:
"""
Execute clenup policicy according to the Stream configuration.
Execute cleanup policy according to the Stream configuration.
At this point we are inside the asyncio.Lock `is_processing`
as an event is being processed and an exeption has occured.
Expand Down Expand Up @@ -145,3 +153,15 @@ async def cleanup_policy(self, exc: Exception) -> None:
await self.engine.stop()
await self.stream.is_processing.acquire()
signal.raise_signal(signal.SIGTERM)


class BaseDependcyMiddleware(MiddlewareProtocol, typing.Protocol):
"""Base class for Dependency Injection Middleware.
`get_type` is used to identify the way to call the user defined function,
whether to use DI or not.
On top of that, this middleware helps **avoid circular dependencies**.
"""

def get_type(self) -> UDFType: ...
10 changes: 8 additions & 2 deletions kstreams/middleware/udf_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ class UdfHandler(BaseMiddleware):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
signature = inspect.signature(self.next_call)
self.params = list(signature.parameters.values())
self.params: typing.List[typing.Any] = [
typing.get_origin(param.annotation) or param.annotation
for param in signature.parameters.values()
]
self.type: UDFType = setup_type(self.params)

def get_type(self) -> UDFType:
return self.type

def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List:
# NOTE: When `no typing` support is deprecated then this can
# be more eficient as the CR will be always there.
Expand All @@ -30,7 +36,7 @@ def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List:
types.Send: self.send,
}

return [ANNOTATIONS_TO_PARAMS[param.annotation] for param in self.params]
return [ANNOTATIONS_TO_PARAMS[param_type] for param_type in self.params]

async def __call__(self, cr: types.ConsumerRecord) -> typing.Any:
"""
Expand Down
Loading

0 comments on commit 3280be0

Please sign in to comment.