Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Job events performance #241

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open

Job events performance #241

wants to merge 8 commits into from

Conversation

benthomasson
Copy link
Collaborator

Improves the performance of processing job events from 200 per second to 4000 per second.

@benthomasson
Copy link
Collaborator Author

benthomasson commented Nov 5, 2022

This simulates 100 concurrent jobs each with 1000 job events.

time ./job_events_cannon.py --websocket-address ws://localhost:8080/api/ws2 --job-id ca73cc33-4ede-465e-bd82-6ccaaa2915d6 -c 1000 --stdout test2 --workers 100 --verbose

Events per second: 4238.371749744774

All job events were successfully recorded:


eda_server=# select count(*) from job_instance_event;
 count  
--------
 100000


@benthomasson benthomasson marked this pull request as ready for review November 6, 2022 00:54
Comment on lines +241 to +242
await bulk_job_instance_events.get()
for i in range(bulk_job_instance_events.qsize())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this reference the queue attribute?

Suggested change
await bulk_job_instance_events.get()
for i in range(bulk_job_instance_events.qsize())
await bulk_job_instance_events.queue.get()
for i in range(bulk_job_instance_events.queue.qsize())

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll take a look at it.

Comment on lines +255 to +256
await bulk_job_instance_hosts.get()
for i in range(bulk_job_instance_hosts.qsize())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also reference the queue attribute?

Suggested change
await bulk_job_instance_hosts.get()
for i in range(bulk_job_instance_hosts.qsize())
await bulk_job_instance_hosts.queue.get()
for i in range(bulk_job_instance_hosts.queue.qsize())



bulk_job_instance_events = Batcher(insert_job_instance_events)
bulk_job_instance_hosts = Batcher(insert_job_instance_hosts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Global dependency. These functions are used in the scope of websocket handler. It's unclear why they have to be global and completely unnecessary.


def start(self, get_db_session_factory):
self.get_db_session_factory = get_db_session_factory
_ = asyncio.get_event_loop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this call is needed?

  1. The get_event_loop function is deprecated for creating a new loop. See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_event_loop
  2. If it was used to create an event loop, the event loop must be created when calling asyncio.run and not here.
  3. If it's used to get an existing running loop, it doesn't make sense (And in general for this you should use asyncio.get_running_loop())


async def _batcher(self):
while True:
timeout = time.time() + self.batch_timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use monotonic timer time.monotonic() for calculating time intervals.

async with db_session_factory() as db:
query = insert(models.job_instance_events).values(
[
await bulk_job_instance_events.get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you're consuming a fixed and already known number of queue entries, you can use queue.get_nowait() here.

Comment on lines +57 to +62
bulk_job_instance_events.start(
app.dependency_overrides[get_db_session_factory]
)
bulk_job_instance_hosts.start(
app.dependency_overrides[get_db_session_factory]
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bulk_job_instance_events.start(
app.dependency_overrides[get_db_session_factory]
)
bulk_job_instance_hosts.start(
app.dependency_overrides[get_db_session_factory]
)
# FIXME: This is a hack. The database initialization should be refactored
# if the database session factory must be accessed outside of the request scope.
bulk_job_instance_events.start(
app.dependency_overrides[get_db_session_factory]()
)
bulk_job_instance_hosts.start(
app.dependency_overrides[get_db_session_factory]()
)

The bulk processor should depend on a factory and not on a wrapper used for FastAPI DI mechanism.
At the moment there is a limitation, that the DatabaseProvider is not accessible outside of the request context.
This should be refactored.

_ = asyncio.get_event_loop()
asyncio.create_task(self._batcher())

async def _batcher(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here looks not very intuitive and probably not performant as well.
It runs a loop to check the queue size with fixed intervals and then passes the queue to the function that consumes messages from it. The queue size is unlimited which may lead to the back pressure problem.

While this somehow works because queue size is known and the queue access doesn't need synchronization, I'd re-write it in a different manner.

  1. Use limited queue size to avoid uncontrolled growth of a queue.
  2. The processor_fn callback should expect full batch: a sequence (e.g. list) of items to be processed.
  3. The Batcher consumes the queue with a timeout:
# PSEUDOCODE #

while True:
    try:
        item = await asyncio.wait(queue.get(), timeout - elapsed_time)
    except TimeoutError:
        # flush
    batch.append(item)
    if len(batch) > max_batch_size:
        # flush

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants