Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/generic consumer record #248

Merged
merged 6 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
{
"machine_info": {
"node": "Woile-MacBook-Pro.local",
"processor": "arm",
"machine": "arm64",
"python_compiler": "Clang 16.0.6 ",
"python_implementation": "CPython",
"python_implementation_version": "3.11.10",
"python_version": "3.11.10",
"python_build": [
"main",
"Sep 7 2024 01:03:31"
],
"release": "24.1.0",
"system": "Darwin",
"cpu": {
"python_version": "3.11.10.final.0 (64 bit)",
"cpuinfo_version": [
9,
0,
0
],
"cpuinfo_version_string": "9.0.0",
"arch": "ARM_8",
"bits": 64,
"count": 12,
"arch_string_raw": "arm64",
"brand_raw": "Apple M3 Pro"
}
},
"commit_info": {
"id": "81eb783e807835e049815241011f694a54404471",
"time": "2024-11-27T14:40:28+01:00",
"author_time": "2024-11-27T14:40:28+01:00",
"dirty": true,
"project": "kstreams",
"branch": "fix/generic-consumer-record"
},
"benchmarks": [
{
"group": null,
"name": "test_startup_and_processing_single_consumer_record",
"fullname": "tests/test_benchmarks.py::test_startup_and_processing_single_consumer_record",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 4.199999966658652e-05,
"max": 0.012051916011841968,
"mean": 9.800166512286035e-05,
"stddev": 0.0001816612665091389,
"rounds": 4471,
"median": 9.42080223467201e-05,
"iqr": 4.929174610879272e-05,
"q1": 6.958325684536248e-05,
"q3": 0.0001188750029541552,
"iqr_outliers": 14,
"stddev_outliers": 9,
"outliers": "9;14",
"ld15iqr": 4.199999966658652e-05,
"hd15iqr": 0.00020029101870022714,
"ops": 10203.90825754179,
"total": 0.43816544476430863,
"iterations": 1
}
},
{
"group": null,
"name": "test_startup_and_inject_all",
"fullname": "tests/test_benchmarks.py::test_startup_and_inject_all",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 4.3625012040138245e-05,
"max": 0.01566049997927621,
"mean": 0.00022631162852908016,
"stddev": 0.0002073705067274279,
"rounds": 15019,
"median": 0.0002195000124629587,
"iqr": 0.0001750317678670399,
"q1": 0.00013429098180495203,
"q3": 0.0003093227496719919,
"iqr_outliers": 19,
"stddev_outliers": 133,
"outliers": "133;19",
"ld15iqr": 4.3625012040138245e-05,
"hd15iqr": 0.0005731249984819442,
"ops": 4418.685891217931,
"total": 3.398974348878255,
"iterations": 1
}
},
{
"group": null,
"name": "test_consume_many",
"fullname": "tests/test_benchmarks.py::test_consume_many",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 0.0006483749893959612,
"max": 0.0008417500066570938,
"mean": 0.0006934208655618783,
"stddev": 2.4773934466283123e-05,
"rounds": 943,
"median": 0.0006997500022407621,
"iqr": 3.609351551858708e-05,
"q1": 0.0006723854967276566,
"q3": 0.0007084790122462437,
"iqr_outliers": 16,
"stddev_outliers": 246,
"outliers": "246;16",
"ld15iqr": 0.0006483749893959612,
"hd15iqr": 0.0007627500162925571,
"ops": 1442.1256262453264,
"total": 0.6538958762248512,
"iterations": 1
}
}
],
"datetime": "2024-11-27T14:06:35.464861+00:00",
"version": "5.1.0"
}
4 changes: 2 additions & 2 deletions .github/workflows/bench-release.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Bump version
name: Benchmark latest release

on:
push:
Expand Down Expand Up @@ -46,5 +46,5 @@ jobs:
git config --global user.email "[email protected]"
git config --global user.name "GitHub Action"
git add .benchmarks/
git commit -m "bench: bench: add benchmark current release"
git commit -m "bench: current release"
git push origin master
41 changes: 32 additions & 9 deletions .github/workflows/pr-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ on:
required: true

jobs:
build_test_bench:
test:
runs-on: ubuntu-latest
strategy:
matrix:
Expand All @@ -31,7 +31,6 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
architecture: x64

- name: Set Cache
uses: actions/cache@v4
id: cache # name for referring later
Expand All @@ -42,29 +41,53 @@ jobs:
restore-keys: |
${{ runner.os }}-cache-
${{ runner.os }}-

- name: Install Dependencies
# if: steps.cache.outputs.cache-hit != 'true'
run: |
python -m pip install -U pip poetry
poetry --version
poetry config --local virtualenvs.in-project true
poetry install

- name: Test and Lint
run: |
git config --global user.email "[email protected]"
git config --global user.name "GitHub Action"
./scripts/test

- name: Benchmark regression test
run: |
./scripts/bench-compare

- name: Upload coverage to Codecov
uses: codecov/[email protected]
with:
file: ./coverage.xml
name: kstreams
fail_ci_if_error: true
token: ${{secrets.CODECOV_TOKEN}}
bench:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: '3.13'
architecture: x64
- name: Set Cache
uses: actions/cache@v4
id: cache # name for referring later
with:
path: .venv/
# The cache key depends on poetry.lock
key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }}
restore-keys: |
${{ runner.os }}-cache-
${{ runner.os }}-
- name: Install Dependencies
# if: steps.cache.outputs.cache-hit != 'true'
run: |
python -m pip install -U pip poetry
poetry --version
poetry config --local virtualenvs.in-project true
poetry install
- name: Benchmark regression test
run: |
./scripts/bench-compare
14 changes: 14 additions & 0 deletions kstreams/consts.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import enum

APPLICATION_X_AVRO_BINARY = "application/x-avro-binary"
APPLICATION_X_AVRO_JSON = "application/x-avro-json"
APPLICATION_X_JSON_SCHEMA = "application/x-json-schema"
Expand All @@ -11,3 +13,15 @@
APPLICATION_X_JSON_SCHEMA,
APPLICATION_JSON,
)


class UDFType(str, enum.Enum):
NO_TYPING = "NO_TYPING"
WITH_TYPING = "WITH_TYPING"


class StreamErrorPolicy(str, enum.Enum):
RESTART = "RESTART"
STOP = "STOP"
STOP_ENGINE = "STOP_ENGINE"
STOP_APPLICATION = "STOP_APPLICATION"
2 changes: 1 addition & 1 deletion kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .backends.kafka import Kafka
from .clients import Consumer, Producer
from .consts import StreamErrorPolicy, UDFType
from .exceptions import DuplicateStreamException, EngineNotStartedException
from .middleware import Middleware
from .middleware.udf_middleware import UdfHandler
Expand All @@ -17,7 +18,6 @@
from .serializers import Deserializer, Serializer
from .streams import Stream, StreamFunc
from .streams import stream as stream_func
from .streams_utils import StreamErrorPolicy, UDFType
from .types import Deprecated, EngineHooks, Headers, NextMiddlewareCall
from .utils import encode_headers, execute_hooks

Expand Down
24 changes: 22 additions & 2 deletions kstreams/middleware/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import typing

from kstreams import types
from kstreams.streams_utils import StreamErrorPolicy
from kstreams.consts import StreamErrorPolicy, UDFType

if typing.TYPE_CHECKING:
from kstreams import Stream, StreamEngine # pragma: no cover
Expand All @@ -14,6 +14,10 @@


class MiddlewareProtocol(typing.Protocol):
next_call: types.NextMiddlewareCall
send: types.Send
stream: "Stream"

def __init__(
self,
*,
Expand Down Expand Up @@ -45,6 +49,10 @@ def __repr__(self) -> str:


class BaseMiddleware:
next_call: types.NextMiddlewareCall
send: types.Send
stream: "Stream"

def __init__(
self,
*,
Expand Down Expand Up @@ -92,7 +100,7 @@ async def __call__(self, cr: types.ConsumerRecord) -> typing.Any:

async def cleanup_policy(self, exc: Exception) -> None:
"""
Execute clenup policicy according to the Stream configuration.
Execute cleanup policy according to the Stream configuration.

At this point we are inside the asyncio.Lock `is_processing`
as an event is being processed and an exeption has occured.
Expand Down Expand Up @@ -145,3 +153,15 @@ async def cleanup_policy(self, exc: Exception) -> None:
await self.engine.stop()
await self.stream.is_processing.acquire()
signal.raise_signal(signal.SIGTERM)


class BaseDependcyMiddleware(MiddlewareProtocol, typing.Protocol):
marcosschroh marked this conversation as resolved.
Show resolved Hide resolved
"""Base class for Dependency Injection Middleware.

`get_type` is used to identify the way to call the user defined function,
whether to use DI or not.

On top of that, this middleware helps **avoid circular dependencies**.
"""

def get_type(self) -> UDFType: ...
10 changes: 8 additions & 2 deletions kstreams/middleware/udf_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ class UdfHandler(BaseMiddleware):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
signature = inspect.signature(self.next_call)
self.params = list(signature.parameters.values())
self.params: typing.List[typing.Any] = [
typing.get_origin(param.annotation) or param.annotation
for param in signature.parameters.values()
]
self.type: UDFType = setup_type(self.params)

def get_type(self) -> UDFType:
return self.type

def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List:
# NOTE: When `no typing` support is deprecated then this can
# be more eficient as the CR will be always there.
Expand All @@ -30,7 +36,7 @@ def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List:
types.Send: self.send,
}

return [ANNOTATIONS_TO_PARAMS[param.annotation] for param in self.params]
return [ANNOTATIONS_TO_PARAMS[param_type] for param_type in self.params]

async def __call__(self, cr: types.ConsumerRecord) -> typing.Any:
"""
Expand Down
Loading
Loading