Skip to content

Commit

Permalink
Upgrade pre-commit hooks + Don't log message value in kafka broker (#14)
Browse files Browse the repository at this point in the history
* S top logging event/message in the kafka broker and upgrade pre-commit

- This might lead to data leaking into the logs. There's no reason for
the entire message json to be going to logs. At best we can figure out a way to just log the ID if needed.

- Handle generic ProducerError, instead of specific KafkaError
- Upgrade pre-commit hooks.
- Add further pre-commit changes

* Increase package version to 0.1.3
  • Loading branch information
sidmitra authored Jan 26, 2023
1 parent 38ea18d commit 3899156
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 44 deletions.
2 changes: 0 additions & 2 deletions .isort.cfg

This file was deleted.

32 changes: 14 additions & 18 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,6 @@ repos:
- id: pyupgrade
args: ["--py39-plus", "--keep-runtime-typing"]

- repo: https://github.com/asottile/seed-isort-config
rev: v2.2.0
hooks:
- id: seed-isort-config

- repo: https://github.com/pre-commit/mirrors-isort
rev: v5.10.1
hooks:
- id: isort
additional_dependencies:
- toml

- repo: https://github.com/ambv/black
rev: 22.12.0
hooks:
- id: black
args: [--line-length=88, --safe]

- repo: https://github.com/myint/autoflake
rev: v2.0.0
hooks:
Expand All @@ -52,6 +34,20 @@ repos:
args: [--extension-pkg-whitelist=confluent_kafka]
additional_dependencies: ["click", "confluent_kafka", "cotyledon", "pytest", "pytest_mock"]

- repo: https://github.com/pycqa/isort
rev: 5.11.4
hooks:
- id: isort
name: isort (python)
- id: isort
name: isort (pyi)
types: [pyi]

- repo: https://github.com/ambv/black
rev: 22.12.0
hooks:
- id: black

# - repo: https://github.com/pre-commit/mirrors-mypy
# rev: v0.902
# hooks:
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,15 @@ You can run workers for all the receivers
```bash
eventbusk worker -A eventbus:bus
```

## Contributing

You can first setup the project locally as follows

```bash
git clone [email protected]:Airbase/eventbusk.git
cd eventbusk
poetry shell
poetry install --no-root
pre-commit install
```
6 changes: 2 additions & 4 deletions eventbusk/brokers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
import logging

from .base import BaseConsumer, BaseProducer, DeliveryCallBackT
from .dummy import Consumer as DummyConsumer
from .dummy import Producer as DummyProducer
from .kafka import Consumer as KafkaConsumer
from .kafka import Producer as KafkaProducer
from .dummy import Consumer as DummyConsumer, Producer as DummyProducer
from .kafka import Consumer as KafkaConsumer, Producer as KafkaProducer

logger = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion eventbusk/brokers/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ def produce( # pylint: disable=too-many-arguments
logger.info(
f"Producing message {value=}.",
extra={
"flush": True,
"topic": topic,
"value": value,
"flush": True,
},
)
# TODO: call # on_delivery
13 changes: 7 additions & 6 deletions eventbusk/brokers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from types import TracebackType
from typing import TYPE_CHECKING, Optional, Union

from confluent_kafka import Consumer as CConsumer # type: ignore
from confluent_kafka import KafkaError
from confluent_kafka import Producer as CProducer
from confluent_kafka import ( # type: ignore
Consumer as CConsumer,
KafkaError,
Producer as CProducer,
)

from ..exceptions import ProducerError
from .base import BaseBrokerURI, BaseConsumer, BaseProducer
Expand Down Expand Up @@ -215,12 +217,11 @@ def produce( # pylint: disable=too-many-arguments
"""
Sends the message to a Kafka topic
"""
logger.info(
logger.debug(
"Producing message.",
extra={
"topic": topic,
"value": value,
"flush": flush,
"topic": topic,
},
)
try:
Expand Down
10 changes: 4 additions & 6 deletions eventbusk/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
from functools import wraps
from typing import Callable, Union

from confluent_kafka import KafkaError # type: ignore

from .brokers import Consumer, DeliveryCallBackT, Producer
from .exceptions import AlreadyRegistered, ConsumerError, UnknownEvent
from .exceptions import AlreadyRegistered, ConsumerError, ProducerError, UnknownEvent

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -135,19 +133,19 @@ def send(
# TODO: Ensure unknown event throws a error.
topic = self._event_to_topic[event_fqn]
data = json.dumps(asdict(event), cls=EventJsonEncoder).encode("utf-8")

try:
self.producer.produce(
topic=topic, value=data, flush=flush, on_delivery=on_delivery
)
# TODO: Replace with ProducerError
except KafkaError as exc:
except ProducerError as exc:
if fail_silently:
logger.warning(
"Error producing event.",
extra={
"event": event_fqn,
"topic": topic,
"event_id": event.event_id,
"topic": topic,
},
exc_info=True,
)
Expand Down
Empty file removed eventbusk/events.py
Empty file.
11 changes: 10 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "eventbusk"
version = "0.1.2"
version = "0.1.3"
description = "Event bus with Kafka"
authors = ["Airbase Inc <[email protected]>"]

Expand All @@ -27,7 +27,16 @@ taskipy = "1.10.3"
eventbusk = "eventbusk.cli:cli"

[tool.isort]
combine_as_imports = true
profile = "black"
src_paths = ["eventbusk"]

[tool.black]
extend-exclude = ''
include = '\.pyi?$'
line-length = 88
safe = true
target-version = ['py39']

[tool.mypy]
python_version = "3.9"
Expand Down
16 changes: 10 additions & 6 deletions tests/test_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@
from pytest_mock import MockerFixture

from eventbusk.brokers import Consumer, Producer
from eventbusk.brokers.dummy import BrokerURI as DummyBrokerURI
from eventbusk.brokers.dummy import Consumer as DummyConsumer
from eventbusk.brokers.dummy import Producer as DummyProducer
from eventbusk.brokers.kafka import BrokerURI as KafkaBrokerURI
from eventbusk.brokers.kafka import Consumer as KafkaConsumer
from eventbusk.brokers.kafka import Producer as KafkaProducer
from eventbusk.brokers.dummy import (
BrokerURI as DummyBrokerURI,
Consumer as DummyConsumer,
Producer as DummyProducer,
)
from eventbusk.brokers.kafka import (
BrokerURI as KafkaBrokerURI,
Consumer as KafkaConsumer,
Producer as KafkaProducer,
)
from eventbusk.exceptions import ProducerError


Expand Down

0 comments on commit 3899156

Please sign in to comment.