-
Notifications
You must be signed in to change notification settings - Fork 899
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Segmentation faults (and the like) and failure to get assignments in multithreaded/asyncio pytest environment #1797
Comments
I've seen similar in our testing setup. We use async subscribers from I also run our tests within a docker compose application, and have never seen it occur there.
A coworker not on macOS has reported the issue as well and it's also appeared in CI (GH Actions) so seemingly not macOS specific.
the test is basically this: message_received_event = Event()
async def subscriber(event: CloudEvent):
assert event.data
if mrid in event.data.get("request_body"):
message_received_event.set()
broker.subscriber(CONTROL_RESPONSES_TOPIC["name"], group_id="test_event_router")(subscriber)
# Start listening
await broker.start()
payload = get_control_response_payload(status=1, subject=mrid)
res = await async_der_client.post(url=url, content=payload)
assert res.status_code == 201
# Wait for the event to be set with a timeout to avoid hanging indefinitely
try:
await asyncio.wait_for(message_received_event.wait(), timeout=5.0)
except asyncio.TimeoutError:
log.error("Timed out waiting for message received event")
assert message_received_event.is_set()
await broker.close() |
Reproduced it with
|
Description
I have a pytest suite that will:
i. Create one Consumer object per topic in a threadpool, all with auto.offset.reset=earliest
ii. Subscribe to them in a threadpool.
iii. Wait for each Consumer to receive an assignment.
iv. In a threadpool, create a bunch of Producers, send messages to the topics, and flush.
v. Wait for each Consumer to process the correct number of messages.
vi. Finally, close all the consumers, regardless of exceptions raised elsewhere.
This test suite is notoriously prone to segmentation faults and the like that crash the entire interpreter and are very disruptive.
I have heard the confluent_kafka is thread safe, but I have not experienced that to be the case. And I'm open to the possibility that this is user error on my part. If so, please, show me the way.
The errors tend to happen after a first test case has run and during the second test case where the Consumer is attempting to monitor for assignments.
There are many different presentations:
Additionally, if the timeout argument to poll() is set, the consumer never appears to get an assignment at all.
How to reproduce
Here's a pretty elaborate script that reliably reproduces the total range of errors that I see with a lot of different knobs to tweak.
Assignments never happening when poll(1.0)
https://gist.github.com/andreaimprovised/6221cba7c0be98ee3189dd517998bda3
INTERNAL ERROR with 3 topics
https://gist.github.com/andreaimprovised/d80eedeea6ef7beb44fff228df1942da
segmentation fault with 12 topics
https://gist.github.com/andreaimprovised/5bc6acdc05fecb35d7cb7f20295c31f7
Segmentation fault with just 1 topic and 1 message per test case
https://gist.github.com/andreaimprovised/1fe7b9f8be0d34d8a6dc40827c802934
Additional requirements:
I'm currently using python 3.10.
Checklist
Please provide the following information:
confluent_kafka.version()
andconfluent_kafka.libversion()
):In [2]: confluent_kafka.version()
Out[2]: ('2.5.0', 33882112)
confluentinc/cp-kafka:7.6.0
{...}
It's in the code.
I've seen this on darwin arm64 and linux x86_64.
'debug': '..'
as necessary)Here is an example
Hmmm, I'll try to figure out how to get these.
It's not critical, but it depends on how critical you think automated test suites are.
The text was updated successfully, but these errors were encountered: