Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/generic consumer record #248

Merged
merged 6 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/bench-release.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Bump version
name: Benchmark latest release

on:
push:
Expand Down Expand Up @@ -46,5 +46,5 @@ jobs:
git config --global user.email "[email protected]"
git config --global user.name "GitHub Action"
git add .benchmarks/
git commit -m "bench: bench: add benchmark current release"
git commit -m "bench: current release"
git push origin master
41 changes: 32 additions & 9 deletions .github/workflows/pr-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ on:
required: true

jobs:
build_test_bench:
test:
runs-on: ubuntu-latest
strategy:
matrix:
Expand All @@ -31,7 +31,6 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
architecture: x64

- name: Set Cache
uses: actions/cache@v4
id: cache # name for referring later
Expand All @@ -42,29 +41,53 @@ jobs:
restore-keys: |
${{ runner.os }}-cache-
${{ runner.os }}-

- name: Install Dependencies
# if: steps.cache.outputs.cache-hit != 'true'
run: |
python -m pip install -U pip poetry
poetry --version
poetry config --local virtualenvs.in-project true
poetry install

- name: Test and Lint
run: |
git config --global user.email "[email protected]"
git config --global user.name "GitHub Action"
./scripts/test

- name: Benchmark regression test
run: |
./scripts/bench-compare

- name: Upload coverage to Codecov
uses: codecov/[email protected]
with:
file: ./coverage.xml
name: kstreams
fail_ci_if_error: true
token: ${{secrets.CODECOV_TOKEN}}
bench:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: '3.13'
architecture: x64
- name: Set Cache
uses: actions/cache@v4
id: cache # name for referring later
with:
path: .venv/
# The cache key depends on poetry.lock
key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }}
restore-keys: |
${{ runner.os }}-cache-
${{ runner.os }}-
- name: Install Dependencies
# if: steps.cache.outputs.cache-hit != 'true'
run: |
python -m pip install -U pip poetry
poetry --version
poetry config --local virtualenvs.in-project true
poetry install
- name: Benchmark regression test
run: |
./scripts/bench-compare
7 changes: 5 additions & 2 deletions kstreams/middleware/udf_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ class UdfHandler(BaseMiddleware):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
signature = inspect.signature(self.next_call)
self.params = list(signature.parameters.values())
self.params: typing.List[typing.Any] = [
typing.get_origin(param.annotation) or param.annotation
for param in signature.parameters.values()
]
self.type: UDFType = setup_type(self.params)

def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List:
Expand All @@ -30,7 +33,7 @@ def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List:
types.Send: self.send,
}

return [ANNOTATIONS_TO_PARAMS[param.annotation] for param in self.params]
return [ANNOTATIONS_TO_PARAMS[param_type] for param_type in self.params]

async def __call__(self, cr: types.ConsumerRecord) -> typing.Any:
"""
Expand Down
3 changes: 3 additions & 0 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@
self.subscribe_by_pattern = subscribe_by_pattern
self.error_policy = error_policy

def __name__(self) -> str:
return self.name

Check warning on line 181 in kstreams/streams.py

View check run for this annotation

Codecov / codecov/patch

kstreams/streams.py#L181

Added line #L181 was not covered by tests

def _create_consumer(self) -> Consumer:
if self.backend is None:
raise BackendNotSet("A backend has not been set for this stream")
Expand Down
8 changes: 3 additions & 5 deletions kstreams/streams_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ async def consume(cr: ConsumerRecord, stream: Stream, send: Send):
"""
from .streams import Stream

first_annotation = params[0].annotation

if first_annotation in (inspect._empty, Stream) and len(params) < 2:
# use case 1 NO_TYPING
no_type = len(params) == 1 and params[0] in (inspect._empty, Stream)
if no_type:
return UDFType.NO_TYPING
# typing cases

return UDFType.WITH_TYPING
3 changes: 1 addition & 2 deletions kstreams/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

Headers = typing.Dict[str, str]
EncodedHeaders = typing.Sequence[typing.Tuple[str, bytes]]
StreamFunc = typing.Callable

StreamFunc = typing.Callable[..., typing.Any]
EngineHooks = typing.Sequence[typing.Callable[[], typing.Any]]


Expand Down
2 changes: 1 addition & 1 deletion scripts/bench-compare
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ if [ -d '.venv' ] ; then
fi

# Commented out until after merge, so there will be date to compare with.
# ${PREFIX}pytest tests/test_benchmarks.py --benchmark-compare --benchmark-compare-fail=min:5%
${PREFIX}pytest tests/test_benchmarks.py --benchmark-compare --benchmark-compare-fail=mean:5%
137 changes: 74 additions & 63 deletions tests/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,62 +20,68 @@ async def my_coroutine(_):
stream_engine.add_stream(stream=stream)
await stream.start()

assert stream.consumer is not None
await stream_engine.monitor.generate_consumer_metrics(stream.consumer)
consumer = stream.consumer

for topic_partition in consumer.assignment():
# super ugly notation but for now is the only way to get the metrics
met_committed = (
stream_engine.monitor.MET_COMMITTED.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_COMMITTED.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_position = (
stream_engine.monitor.MET_POSITION.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_POSITION.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_highwater = (
stream_engine.monitor.MET_HIGHWATER.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_HIGHWATER.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_lag = (
stream_engine.monitor.MET_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_position_lag = (
stream_engine.monitor.MET_POSITION_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_POSITION_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)
Expand Down Expand Up @@ -135,56 +141,61 @@ async def my_coroutine(_):
for topic_partition in consumer.assignment():
# super ugly notation but for now is the only way to get the metrics
met_committed = (
stream_engine.monitor.MET_COMMITTED.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_COMMITTED.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_position = (
stream_engine.monitor.MET_POSITION.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_POSITION.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_highwater = (
stream_engine.monitor.MET_HIGHWATER.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_HIGHWATER.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_lag = (
stream_engine.monitor.MET_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)

met_position_lag = (
stream_engine.monitor.MET_POSITION_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
)
.collect()[0]
list(
stream_engine.monitor.MET_POSITION_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=consumer._group_id,
).collect()
)[0]
.samples[0]
.value
)
Expand All @@ -200,9 +211,9 @@ async def my_coroutine(_):
met_position_lag == consumer.highwater(topic_partition) - consumer_position
)

assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 2
assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 2
await stream_engine.remove_stream(stream)
assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0
assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 0


@pytest.mark.asyncio
Expand All @@ -223,6 +234,6 @@ async def my_coroutine(_):
stream_engine.add_stream(stream=stream)
await stream.start()

assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0
assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 0
await stream_engine.remove_stream(stream)
assert "Metrics for consumer with group-id: my-group not found" in caplog.text
Loading
Loading