-
Notifications
You must be signed in to change notification settings - Fork 14
Job events performance #241
base: main
Are you sure you want to change the base?
Conversation
This simulates 100 concurrent jobs each with 1000 job events.
Events per second: 4238.371749744774 All job events were successfully recorded:
|
await bulk_job_instance_events.get() | ||
for i in range(bulk_job_instance_events.qsize()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this reference the queue
attribute?
await bulk_job_instance_events.get() | |
for i in range(bulk_job_instance_events.qsize()) | |
await bulk_job_instance_events.queue.get() | |
for i in range(bulk_job_instance_events.queue.qsize()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll take a look at it.
await bulk_job_instance_hosts.get() | ||
for i in range(bulk_job_instance_hosts.qsize()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this also reference the queue
attribute?
await bulk_job_instance_hosts.get() | |
for i in range(bulk_job_instance_hosts.qsize()) | |
await bulk_job_instance_hosts.queue.get() | |
for i in range(bulk_job_instance_hosts.queue.qsize()) |
|
||
|
||
bulk_job_instance_events = Batcher(insert_job_instance_events) | ||
bulk_job_instance_hosts = Batcher(insert_job_instance_hosts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Global dependency. These functions are used in the scope of websocket handler. It's unclear why they have to be global and completely unnecessary.
|
||
def start(self, get_db_session_factory): | ||
self.get_db_session_factory = get_db_session_factory | ||
_ = asyncio.get_event_loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this call is needed?
- The
get_event_loop
function is deprecated for creating a new loop. See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_event_loop - If it was used to create an event loop, the event loop must be created when calling
asyncio.run
and not here. - If it's used to get an existing running loop, it doesn't make sense (And in general for this you should use
asyncio.get_running_loop()
)
|
||
async def _batcher(self): | ||
while True: | ||
timeout = time.time() + self.batch_timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use monotonic timer time.monotonic() for calculating time intervals.
async with db_session_factory() as db: | ||
query = insert(models.job_instance_events).values( | ||
[ | ||
await bulk_job_instance_events.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because you're consuming a fixed and already known number of queue entries, you can use queue.get_nowait()
here.
bulk_job_instance_events.start( | ||
app.dependency_overrides[get_db_session_factory] | ||
) | ||
bulk_job_instance_hosts.start( | ||
app.dependency_overrides[get_db_session_factory] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bulk_job_instance_events.start( | |
app.dependency_overrides[get_db_session_factory] | |
) | |
bulk_job_instance_hosts.start( | |
app.dependency_overrides[get_db_session_factory] | |
) | |
# FIXME: This is a hack. The database initialization should be refactored | |
# if the database session factory must be accessed outside of the request scope. | |
bulk_job_instance_events.start( | |
app.dependency_overrides[get_db_session_factory]() | |
) | |
bulk_job_instance_hosts.start( | |
app.dependency_overrides[get_db_session_factory]() | |
) |
The bulk processor should depend on a factory and not on a wrapper used for FastAPI DI mechanism.
At the moment there is a limitation, that the DatabaseProvider
is not accessible outside of the request context.
This should be refactored.
_ = asyncio.get_event_loop() | ||
asyncio.create_task(self._batcher()) | ||
|
||
async def _batcher(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here looks not very intuitive and probably not performant as well.
It runs a loop to check the queue size with fixed intervals and then passes the queue to the function that consumes messages from it. The queue size is unlimited which may lead to the back pressure problem.
While this somehow works because queue size is known and the queue access doesn't need synchronization, I'd re-write it in a different manner.
- Use limited queue size to avoid uncontrolled growth of a queue.
- The
processor_fn
callback should expect full batch: a sequence (e.g. list) of items to be processed. - The Batcher consumes the queue with a timeout:
# PSEUDOCODE #
while True:
try:
item = await asyncio.wait(queue.get(), timeout - elapsed_time)
except TimeoutError:
# flush
batch.append(item)
if len(batch) > max_batch_size:
# flush
Improves the performance of processing job events from 200 per second to 4000 per second.