Skip to content

Commit

Permalink
Added sleep(0)
Browse files Browse the repository at this point in the history
  • Loading branch information
danyi1212 committed Jan 26, 2025
1 parent 4a15676 commit 1fa2fee
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 51 deletions.
87 changes: 36 additions & 51 deletions packages/opal-client/opal_client/tests/data_updater_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,43 +312,8 @@ async def test_client_get_initial_data(server):
await updater.stop()


DATA_UPDATE_1 = [JSONPatchAction(op="add", path="/", value={"user":"1"})]
DATA_UPDATE_2 = [JSONPatchAction(op="add", path="/", value={"user":"2"})]



ENTRIES = [
DataSourceEntry(
url="",
data=PATCH_DATA_UPDATE,
dst_path="test",
topics=DATA_TOPICS_TEST_RACE,
config = {"fetcher":"TestsFetchProvider", "timeout": 10}
),
DataSourceEntry(
url="",
data=PATCH_DATA_UPDATE,
dst_path="test",
topics=DATA_TOPICS_TEST_RACE,
config = {"fetcher":"TestsFetchProvider", "timeout": 1}
)
]

def trigger_update_race(entry: DataSourceEntry):
async def run():
# trigger an update
entries = [entry]
update = DataUpdate(reason="Test", entries=entries)
async with PubSubClient(
server_uri=UPDATES_URL,
extra_headers=[get_authorization_header(opal_client_config.CLIENT_TOKEN)],
) as client:
# Channel must be ready before we can publish on it
await asyncio.wait_for(client.wait_until_ready(), 5)
logging.info("Publishing data event")
await client.publish(DATA_TOPICS_TEST_RACE, data=update)

asyncio.run(run())
DATA_UPDATE_1 = [JSONPatchAction(op="add", path="/", value={"user": "1"})]
DATA_UPDATE_2 = [JSONPatchAction(op="add", path="/", value={"user": "2"})]


@pytest.mark.asyncio
Expand All @@ -366,18 +331,38 @@ async def test_data_updater_race(server):
)
# start the updater (terminate on exit)
await updater.start()
try:
proc_1 = multiprocessing.Process(target=trigger_update_race(entry=ENTRIES[0]), daemon=True)
proc_1.start()
await asyncio.sleep(3)
proc_2 = multiprocessing.Process(target=trigger_update_race(entry=ENTRIES[1]), daemon=True)
proc_2.start()
await asyncio.wait_for(policy_store.wait_for_data(), 60)
# cleanup
finally:
await updater.stop()
if proc_1:
proc_1.terminate()
if proc_2:
proc_2.terminate()

await updater.trigger_data_update(
DataUpdate(
id="alice",
reason="Alice",
entries=[
DataSourceEntry(
url="",
data=PATCH_DATA_UPDATE,
dst_path="test",
topics=DATA_TOPICS_TEST_RACE,
config={"fetcher": "TestsFetchProvider", "timeout": 10},
save_method="PUT",
),
],
)
)
await asyncio.sleep(3)
await updater.trigger_data_update(
DataUpdate(
id="bob",
reason="Bob",
entries=[
DataSourceEntry(
url="",
data=PATCH_DATA_UPDATE,
dst_path="test",
topics=DATA_TOPICS_TEST_RACE,
config={"fetcher": "TestsFetchProvider", "timeout": 1},
save_method="PUT",
),
],
)
)
# await asyncio.wait_for(policy_store.wait_for_data(), 60)
1 change: 1 addition & 0 deletions packages/opal-common/opal_common/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ async def _handle_queue(self, handler: Coroutine):

async def start_queue_handling(self, handler: Coroutine):
self._handler_task = asyncio.create_task(self._handle_queue(handler))
await asyncio.sleep(0) # Let the task start

async def stop_queue_handling(self):
if self._handler_task:
Expand Down

0 comments on commit 1fa2fee

Please sign in to comment.