diff --git a/README.md b/README.md index 6397df3a..d1523bc8 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ if __name__ == "__main__": - [ ] Store (kafka streams pattern) - [ ] Stream Join - [ ] Windowing +- [ ] PEP 593 ## Development diff --git a/kstreams/__init__.py b/kstreams/__init__.py index e59965d4..60a38543 100644 --- a/kstreams/__init__.py +++ b/kstreams/__init__.py @@ -1,9 +1,11 @@ -from aiokafka.structs import ConsumerRecord - +from .backends.kafka import Kafka from .clients import Consumer, ConsumerType, Producer, ProducerType from .create import StreamEngine, create_engine +from .dependencies.core import StreamDependencyManager +from .parameters import FromHeader, Header from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType from .streams import Stream, stream +from .types import ConsumerRecord __all__ = [ "Consumer", @@ -17,4 +19,8 @@ "Stream", "stream", "ConsumerRecord", + "Kafka", + "StreamDependencyManager", + "FromHeader", + "Header", ] diff --git a/kstreams/binders/api.py b/kstreams/binders/api.py new file mode 100644 index 00000000..ba499192 --- /dev/null +++ b/kstreams/binders/api.py @@ -0,0 +1,69 @@ +import inspect +from typing import Any, AsyncIterator, Awaitable, Protocol, TypeVar, Union + +from di.api.dependencies import CacheKey +from di.dependant import Dependant, Marker + +from kstreams.types import ConsumerRecord + + +class ExtractorTrait(Protocol): + """Implement to extract data from incoming `ConsumerRecord`. + + Consumers will always work with a consumer Record. + Implementing this would let you extract information from the `ConsumerRecord`. + """ + + def __hash__(self) -> int: + """Required by di in order to cache the deps""" + ... + + def __eq__(self, __o: object) -> bool: + """Required by di in order to cache the deps""" + ... + + async def extract( + self, consumer_record: ConsumerRecord + ) -> Union[Awaitable[Any], AsyncIterator[Any]]: + """This is where the magic should happen. + + For example, you could "extract" here a json from the `ConsumerRecord.value` + """ + ... + + +T = TypeVar("T", covariant=True) + + +class MarkerTrait(Protocol[T]): + def register_parameter(self, param: inspect.Parameter) -> T: + ... + + +class Binder(Dependant[Any]): + def __init__( + self, + *, + extractor: ExtractorTrait, + ) -> None: + super().__init__(call=extractor.extract, scope="consumer_record") + self.extractor = extractor + + @property + def cache_key(self) -> CacheKey: + return self.extractor + + +class BinderMarker(Marker): + """Bind together the different dependencies. + + NETX: Add asyncapi marker here, like `MarkerTrait[AsyncApiTrait]`. + Recommendation to wait until 3.0: + - [#618](https://github.com/asyncapi/spec/issues/618) + """ + + def __init__(self, *, extractor_marker: MarkerTrait[ExtractorTrait]) -> None: + self.extractor_marker = extractor_marker + + def register_parameter(self, param: inspect.Parameter) -> Binder: + return Binder(extractor=self.extractor_marker.register_parameter(param)) diff --git a/kstreams/binders/header.py b/kstreams/binders/header.py new file mode 100644 index 00000000..d6d8dec1 --- /dev/null +++ b/kstreams/binders/header.py @@ -0,0 +1,52 @@ +import inspect +from typing import Any, NamedTuple, Optional + +from kstreams.exceptions import HeaderNotFound +from kstreams.types import ConsumerRecord + + +class HeaderExtractor(NamedTuple): + name: str + + def __hash__(self) -> int: + return hash((self.__class__, self.name)) + + def __eq__(self, __o: object) -> bool: + return isinstance(__o, HeaderExtractor) and __o.name == self.name + + async def extract(self, consumer_record: ConsumerRecord) -> Any: + if isinstance(consumer_record.headers, dict): + headers = tuple(consumer_record.headers.items()) + # NEXT: Check also if it is a sequence, if not it means + # someone modified the headers in the serializer in a way + # that we cannot extract it, raise a readable error + else: + headers = consumer_record.headers + found_headers = [value for header, value in headers if header == self.name] + + try: + header = found_headers.pop() + except IndexError as e: + message = ( + f"No header `{self.name}` found.\n" + "Check if your broker is sending the header.\n" + "Try adding a default value to your parameter like `None`.\n" + "Or set `convert_underscores = False`." + ) + raise HeaderNotFound(message) from e + else: + return header + + +class HeaderMarker(NamedTuple): + alias: Optional[str] + convert_underscores: bool + + def register_parameter(self, param: inspect.Parameter) -> HeaderExtractor: + if self.alias is not None: + name = self.alias + elif self.convert_underscores: + name = param.name.replace("_", "-") + else: + name = param.name + return HeaderExtractor(name=name) diff --git a/kstreams/dependencies/core.py b/kstreams/dependencies/core.py new file mode 100644 index 00000000..e05c544e --- /dev/null +++ b/kstreams/dependencies/core.py @@ -0,0 +1,85 @@ +from typing import Any, Callable, Optional + +from di.container import Container, bind_by_type +from di.dependant import Dependant +from di.executors import AsyncExecutor + +from kstreams.types import ConsumerRecord + + +class StreamDependencyManager: + """Core of dependency injection on kstreams. + + Attributes: + container: dependency store. + + Usage + + ```python + consumer_record = ConsumerRecord(...) + def user_func(event_type: FromHeader[str]): + ... + + sdm = StreamDependencyManager() + sdm.solve(user_func) + sdm.execute(consumer_record) + ``` + """ + + container: Container + + def __init__(self, container: Optional[Container] = None): + self.container = container or Container() + + def _register_framework_deps(self): + """Register with the container types that belong to kstream. + + These types can be injectable things available on startup. + But also they can be things created on a per connection basis. + They must be available at the time of executing the users' function. + + For example: + + - App + - ConsumerRecord + - Consumer + - KafkaBackend + + Here we are just "registering", that's why we use `bind_by_type`. + And eventually, when "executing", we should inject all the values we + want to be "available". + """ + self.container.bind( + bind_by_type( + Dependant(ConsumerRecord, scope="consumer_record", wire=False), + ConsumerRecord, + ) + ) + # NEXT: Add Consumer as a dependency, so it can be injected. + + def build(self, user_fn: Callable[..., Any]) -> None: + """Build the dependency graph for the given function. + + Attributes: + user_fn: user defined function, using allowed kstreams params + """ + self._register_framework_deps() + + self.solved_user_fn = self.container.solve( + Dependant(user_fn, scope="consumer_record"), + scopes=["consumer_record"], + ) + + async def execute(self, consumer_record: ConsumerRecord) -> Any: + """Execute the dependencies graph with external values. + + Attributes: + consumer_record: A kafka record containing `values`, `headers`, etc. + """ + with self.container.enter_scope("consumer_record") as state: + return await self.container.execute_async( + self.solved_user_fn, + values={ConsumerRecord: consumer_record}, + executor=AsyncExecutor(), + state=state, + ) diff --git a/kstreams/exceptions.py b/kstreams/exceptions.py index 17172475..bf6e73bf 100644 --- a/kstreams/exceptions.py +++ b/kstreams/exceptions.py @@ -26,3 +26,7 @@ def __str__(self) -> str: class BackendNotSet(StreamException): ... + + +class HeaderNotFound(StreamException): + ... diff --git a/kstreams/parameters.py b/kstreams/parameters.py new file mode 100644 index 00000000..d8068eff --- /dev/null +++ b/kstreams/parameters.py @@ -0,0 +1,33 @@ +from typing import Optional, TypeVar + +from kstreams.binders.api import BinderMarker +from kstreams.binders.header import HeaderMarker +from kstreams.typing import Annotated + + +def Header( + *, alias: Optional[str] = None, convert_underscores: bool = True +) -> BinderMarker: + """Construct another type from the headers of a kafka record. + + Usage: + + ```python + from kstream import Header, Annotated + + def user_fn(event_type: Annotated[str, Header(alias="EventType")]): + ... + ``` + """ + header_marker = HeaderMarker(alias=alias, convert_underscores=convert_underscores) + binder = BinderMarker(extractor_marker=header_marker) + return binder + + +T = TypeVar("T") + +FromHeader = Annotated[T, Header(alias=None)] +FromHeader.__doc__ = """General purpose convenient header type. + +Use `Annotated` to provide custom params. +""" diff --git a/kstreams/serializers.py b/kstreams/serializers.py index 6de10e97..e8c2f9a2 100644 --- a/kstreams/serializers.py +++ b/kstreams/serializers.py @@ -1,6 +1,6 @@ from typing import Any, Dict, Optional, Protocol -from kstreams import ConsumerRecord +from kstreams.types import ConsumerRecord from .types import Headers diff --git a/kstreams/types.py b/kstreams/types.py index e4e54118..93491f99 100644 --- a/kstreams/types.py +++ b/kstreams/types.py @@ -1,4 +1,10 @@ from typing import Dict, Sequence, Tuple +from aiokafka.structs import ConsumerRecord as AIOConsumerRecord + Headers = Dict[str, str] EncodedHeaders = Sequence[Tuple[str, bytes]] + + +class ConsumerRecord(AIOConsumerRecord): + ... diff --git a/kstreams/typing.py b/kstreams/typing.py new file mode 100644 index 00000000..af5f5fd4 --- /dev/null +++ b/kstreams/typing.py @@ -0,0 +1,7 @@ +"""Remove this file when python3.8 support is dropped.""" +import sys + +if sys.version_info < (3, 9): + from typing_extensions import Annotated as Annotated # noqa: F401 +else: + from typing import Annotated as Annotated # noqa: F401 diff --git a/poetry.lock b/poetry.lock index ff873d6b..414b9aee 100644 --- a/poetry.lock +++ b/poetry.lock @@ -182,6 +182,32 @@ category = "dev" optional = false python-versions = ">=3.6" +[[package]] +name = "di" +version = "0.70.3" +description = "Dependency injection toolkit" +category = "main" +optional = false +python-versions = ">=3.7,<4" + +[package.dependencies] +graphlib2 = ">=0.4.1,<0.5.0" +typing-extensions = {version = ">=3", markers = "python_version < \"3.9\""} + +[package.extras] +anyio = ["anyio (>=3.5.0)"] + +[[package]] +name = "faker" +version = "14.2.0" +description = "Faker is a Python package that generates fake data for you." +category = "dev" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +python-dateutil = ">=2.4" + [[package]] name = "fastapi" version = "0.75.2" @@ -235,6 +261,14 @@ python-dateutil = ">=2.8.1" [package.extras] dev = ["twine", "markdown", "flake8", "wheel"] +[[package]] +name = "graphlib2" +version = "0.4.6" +description = "Rust port of the Python stdlib graphlib modules" +category = "main" +optional = false +python-versions = ">=3.7" + [[package]] name = "griffe" version = "0.22.0" @@ -928,7 +962,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest- [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "c37f15d935a8e484529cdb190e17433ce3aceec670297b66d07b627069bd7d37" +content-hash = "68d41b2e7b88f6066cef45c7eda8ec71936540d6189bfc0cee6a582a3ecc0afd" [metadata.files] aiokafka = [ @@ -1017,6 +1051,14 @@ decli = [ {file = "decli-0.5.2-py3-none-any.whl", hash = "sha256:d3207bc02d0169bf6ed74ccca09ce62edca0eb25b0ebf8bf4ae3fb8333e15ca0"}, {file = "decli-0.5.2.tar.gz", hash = "sha256:f2cde55034a75c819c630c7655a844c612f2598c42c21299160465df6ad463ad"}, ] +di = [ + {file = "di-0.70.3-py3-none-any.whl", hash = "sha256:5779ff1b605a56a98e0cf744bdac4bd0873370ad95bb4e10fd2ea95b25f508e7"}, + {file = "di-0.70.3.tar.gz", hash = "sha256:7689bc0b2468d3cfff456bba9ffc957ae16204204a67f697d34a39b21993d712"}, +] +faker = [ + {file = "Faker-14.2.0-py3-none-any.whl", hash = "sha256:e02c55a5b0586caaf913cc6c254b3de178e08b031c5922e590fd033ebbdbfd02"}, + {file = "Faker-14.2.0.tar.gz", hash = "sha256:6db56e2c43a2b74250d1c332ef25fef7dc07dcb6c5fab5329dd7b4467b8ed7b9"}, +] fastapi = [] flake8 = [ {file = "flake8-4.0.1-py2.py3-none-any.whl", hash = "sha256:479b1304f72536a55948cb40a32dce8bb0ffe3501e26eaf292c7e60eb5e0428d"}, @@ -1029,6 +1071,33 @@ ghp-import = [ {file = "ghp-import-2.1.0.tar.gz", hash = "sha256:9c535c4c61193c2df8871222567d7fd7e5014d835f97dc7b7439069e2413d343"}, {file = "ghp_import-2.1.0-py3-none-any.whl", hash = "sha256:8337dd7b50877f163d4c0289bc1f1c7f127550241988d568c1db512c4324a619"}, ] +graphlib2 = [ + {file = "graphlib2-0.4.6-cp37-abi3-macosx_10_7_x86_64.whl", hash = "sha256:9b830d8be636daf4b746cfa0a71c787e4c607e7c48d14681e0df53d109928107"}, + {file = "graphlib2-0.4.6-cp37-abi3-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:31a3ced8907c449c088f2ab1b0bed871f90704f81fb9cd6e1d82fe4dad302161"}, + {file = "graphlib2-0.4.6-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a14bba640889ff20924384231008a1bfcbe9f8300ccd7dbeac16d7e4c14493ff"}, + {file = "graphlib2-0.4.6-cp37-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:40c29119df2402380b7f9eb7cfecac7ab8b671a08d7496c1601cefb27a02d7da"}, + {file = "graphlib2-0.4.6-cp37-abi3-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:a5d53a2463eb88177c4ef0bb2895e4b634f2ad181fbcebbaf0dbd1ed30894167"}, + {file = "graphlib2-0.4.6-cp37-abi3-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:6f0877595efaa35535fed1ac1bd984efafbd95a2c659e4801a6f38216f9471a3"}, + {file = "graphlib2-0.4.6-cp37-abi3-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:17a9933e57d2f6a5e841b866c14b150f5ad9cd57f2cc9a93f350910716501293"}, + {file = "graphlib2-0.4.6-cp37-abi3-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:11890ad25fa90e84cbc5e0b856efd15da984ca4978d62fd34bfb3adce097702e"}, + {file = "graphlib2-0.4.6-cp37-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:8dd60ba6cd73755fa5634573ebb0d9bc5445348d34da6114f8587542381ae894"}, + {file = "graphlib2-0.4.6-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:4982247cb04c0b31b0d531263eb6294ed8bc1db9ccb52992c3f7e8c35bdda29b"}, + {file = "graphlib2-0.4.6-cp37-abi3-musllinux_1_2_armv7l.whl", hash = "sha256:01f25c6b349847dd5a9098a4602161f50457b4c0d8cee0c0f28787b69d255834"}, + {file = "graphlib2-0.4.6-cp37-abi3-musllinux_1_2_i686.whl", hash = "sha256:b874c18d873c5f08020fb9b13dda46d0f0efe05daf33304f23568993b99daba1"}, + {file = "graphlib2-0.4.6-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:a69eafdcfc568451a8f2395dee5f8f302ff23a847dd998aba9e21b63daf0a886"}, + {file = "graphlib2-0.4.6-cp37-abi3-win32.whl", hash = "sha256:60dde7893f951988c7d65c729477866618904d1c2038e0a6875d515cae974145"}, + {file = "graphlib2-0.4.6-cp37-abi3-win_amd64.whl", hash = "sha256:5f2cd98b628e7660b9579ac60e49e20e71b79bd4901149a33c8d1246d06d7864"}, + {file = "graphlib2-0.4.6-pp37-pypy37_pp73-macosx_10_7_x86_64.whl", hash = "sha256:c3a8e6a9e7c968cbe50ddf2a851eb51ab683c2874c57332b8d63b72d3bb27f03"}, + {file = "graphlib2-0.4.6-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e7766aed186430727641ad02981a178651d4ed1d39600eb7acb9d5a1463e33a7"}, + {file = "graphlib2-0.4.6-pp37-pypy37_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:ce16a44a58be7b5e6c723d96fbf2aa34e15df1c31f2d101303b8eb78590676ee"}, + {file = "graphlib2-0.4.6-pp38-pypy38_pp73-macosx_10_7_x86_64.whl", hash = "sha256:7d9187428126b95a11fc2364f95a516071a581e75c3df84c49b557b518160c7a"}, + {file = "graphlib2-0.4.6-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:aa4cc08842c6de16998bc4b6f8414e284c183f2636fa6ead420ddfa900fb7cf4"}, + {file = "graphlib2-0.4.6-pp38-pypy38_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:01b35f8dd6a5dba8eedfe88cd202fc7db19109b98e6ad07f0eb2151f5b9bcf39"}, + {file = "graphlib2-0.4.6-pp39-pypy39_pp73-macosx_10_7_x86_64.whl", hash = "sha256:1eabf5ea43a7e0229610b32a21fd67887b8254f623b06c4ff7fb9da24d52fae7"}, + {file = "graphlib2-0.4.6-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:00412566ad83b8c5d354fc8af5cc9d2db169d2b53a86b4dfc1397951712c9713"}, + {file = "graphlib2-0.4.6-pp39-pypy39_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:80b23463413fa71f915156efcbba1787bd5af88887ef592af6c43dc3a9f16639"}, + {file = "graphlib2-0.4.6.tar.gz", hash = "sha256:d6f4c9670fbb1a12d6fc271a5ac0b7e19385e0a069b1c696aecdb0433b2a3743"}, +] griffe = [] h11 = [ {file = "h11-0.13.0-py3-none-any.whl", hash = "sha256:8ddd78563b633ca55346c8cd41ec0af27d3c79931828beffb46ce70a379e7442"}, diff --git a/pyproject.toml b/pyproject.toml index ef102ff5..fef7a71b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ prometheus-client = "<1.0" future = "^0.18.2" PyYAML = "^5.4.1" pydantic = "^1.9.0" +di = "^0.70.3" [tool.poetry.dev-dependencies] pytest = "^6.1" @@ -44,6 +45,7 @@ codecov = "^2.1.12" black = "^22.6.0" flake8 = "^4.0.1" mkdocstrings = {version = "^0.19.0", extras = ["python"]} +Faker = "^14.2.0" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/tests/conftest.py b/tests/conftest.py index f049b8fe..92bc5b2c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,14 +1,20 @@ +import logging from collections import namedtuple from dataclasses import field from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Tuple import pytest import pytest_asyncio +from faker import Faker from pytest_httpserver import HTTPServer -from kstreams import clients, create_engine +from kstreams import ConsumerRecord, clients, create_engine from kstreams.utils import create_ssl_context_from_mem +# Silence faker DEBUG logs +logger = logging.getLogger("faker") +logger.setLevel(logging.INFO) + class RecordMetadata(NamedTuple): offset: int = 1 @@ -180,3 +186,43 @@ class ConsumerRecord(NamedTuple): ) return consumer_record + + +@pytest.fixture +def fake(): + return Faker() + + +@pytest.fixture() +def rand_consumer_record(fake: Faker): + """A random consumer record""" + + def generate( + topic: Optional[str] = None, + headers: Optional[Sequence[Tuple[str, bytes]]] = None, + partition: Optional[int] = None, + offset: Optional[int] = None, + timestamp: Optional[int] = None, + timestamp_type: Optional[int] = None, + key: Optional[Any] = None, + value: Optional[Any] = None, + checksum: Optional[int] = None, + serialized_key_size: Optional[int] = None, + serialized_value_size: Optional[int] = None, + ) -> ConsumerRecord: + + return ConsumerRecord( + topic=topic or fake.slug(), + headers=headers or tuple(), + partition=partition or fake.pyint(max_value=10), + offset=offset or fake.pyint(max_value=99999999), + timestamp=timestamp or fake.unix_time(), + timestamp_type=timestamp_type or 1, + key=key or fake.pystr(), + value=value or fake.pystr().encode(), + checksum=checksum or fake.pystr(), + serialized_key_size=serialized_key_size or fake.pyint(max_value=10), + serialized_value_size=serialized_value_size or fake.pyint(max_value=10), + ) + + return generate diff --git a/tests/test_param_headers.py b/tests/test_param_headers.py new file mode 100644 index 00000000..8d647a4b --- /dev/null +++ b/tests/test_param_headers.py @@ -0,0 +1,73 @@ +import pytest + +from kstreams import FromHeader, Header, StreamDependencyManager +from kstreams.exceptions import HeaderNotFound +from kstreams.typing import Annotated + + +@pytest.mark.asyncio +async def test_from_headers_ok(rand_consumer_record): + rand_consumer_record.headers = (("event-type", "hello"),) + + async def user_fn(event_type: FromHeader[str]) -> str: + return event_type + + stream_manager = StreamDependencyManager() + stream_manager.build(user_fn=user_fn) + header_content = await stream_manager.execute(rand_consumer_record) + assert header_content == "hello" + + +@pytest.mark.asyncio +async def test_from_header_not_found(rand_consumer_record): + rand_consumer_record.headers = (("event_type", "hello"),) + + def user_fn(a_header: FromHeader[str]) -> str: + return a_header + + stream_manager = StreamDependencyManager() + stream_manager.build(user_fn=user_fn) + with pytest.raises(HeaderNotFound): + await stream_manager.execute(rand_consumer_record) + + +@pytest.mark.asyncio +@pytest.mark.xfail(reason="not implemenetd yet") +async def test_from_headers_numbers(rand_consumer_record): + rand_consumer_record.headers = (("event-type", "1"),) + + async def user_fn(event_type: FromHeader[int]) -> int: + return event_type + + stream_manager = StreamDependencyManager() + stream_manager.build(user_fn=user_fn) + header_content = await stream_manager.execute(rand_consumer_record) + assert header_content == 1 + + +@pytest.mark.asyncio +async def test_headers_alias(rand_consumer_record): + rand_consumer_record.headers = (("EventType", "hello"),) + + async def user_fn(event_type: Annotated[int, Header(alias="EventType")]) -> int: + return event_type + + stream_manager = StreamDependencyManager() + stream_manager.build(user_fn=user_fn) + header_content = await stream_manager.execute(rand_consumer_record) + assert header_content == "hello" + + +@pytest.mark.asyncio +async def test_headers_convert_underscores(rand_consumer_record): + rand_consumer_record.headers = (("event_type", "hello"),) + + async def user_fn( + event_type: Annotated[int, Header(convert_underscores=False)] + ) -> int: + return event_type + + stream_manager = StreamDependencyManager() + stream_manager.build(user_fn=user_fn) + header_content = await stream_manager.execute(rand_consumer_record) + assert header_content == "hello"