Skip to content

Commit

Permalink
Add Kafka test
Browse files Browse the repository at this point in the history
Signed-off-by: Nijat Khanbabayev <[email protected]>
  • Loading branch information
NeejWeej committed Feb 7, 2025
1 parent 11377fb commit b1d9e48
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
12 changes: 7 additions & 5 deletions csp/tests/adapters/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ def kafkabroker():


@pytest.fixture(scope="module", autouse=True)
def kafkaadapter(kafkabroker):
group_id = "group.id123"
_kafkaadapter = KafkaAdapterManager(
broker=kafkabroker, group_id=group_id, rd_kafka_conf_options={"allow.auto.create.topics": "true"}
)
def kafkaadapterkwargs(kafkabroker):
return dict(broker=kafkabroker, group_id="group.id123", rd_kafka_conf_options={"allow.auto.create.topics": "true"})


@pytest.fixture(scope="module", autouse=True)
def kafkaadapter(kafkaadapterkwargs):
_kafkaadapter = KafkaAdapterManager(**kafkaadapterkwargs)
return _kafkaadapter
20 changes: 20 additions & 0 deletions csp/tests/adapters/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,23 @@ def graph():

# With bug this would deadlock
csp.run(graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
def test_meta_field_map_extract_timestamp_from_field(self, kafkaadapterkwargs):
class SubData(csp.Struct):
msg: str
dt: datetime

kafkaadapter1 = KafkaAdapterManager(**kafkaadapterkwargs)

def graph_sub():
return kafkaadapter1.subscribe(
ts_type=SubData,
msg_mapper=RawTextMessageMapper(),
meta_field_map={"timestamp": "dt"},
topic="foobar",
extract_timestamp_from_field="dt",
)

with pytest.raises(ValueError):
csp.run(graph_sub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

0 comments on commit b1d9e48

Please sign in to comment.