Skip to content

Commit

Permalink
Fix corner-case for pending request count decrements
Browse files Browse the repository at this point in the history
Fixes: #1121

Remaining issue is that current code does not catch all exceptions,
so pending request gauge value can still leak on those.

Signed-off-by: Eero Tamminen <[email protected]>
  • Loading branch information
eero-t committed Mar 3, 2025
1 parent 2b5bccd commit be19c98
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions comps/cores/mega/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def flow_to(self, from_service, to_service):
async def schedule(self, initial_inputs: Dict | BaseModel, llm_parameters: LLMParams = LLMParams(), **kwargs):
req_start = time.time()
self.metrics.pending_update(True)
had_llm = False

result_dict = {}
runtime_graph = DAG()
Expand Down Expand Up @@ -183,6 +184,11 @@ def fake_stream(text):
)
)
)
# check whether LLM query will be execute()d, i.e. whether request count
# would decrement once (async) processing of all the LLM tokens finishes
had_llm = had_llm or (
self.services[d_node].service_type in (ServiceType.LLM, ServiceType.LVM)
)
nodes_to_keep = []
for i in ind_nodes:
nodes_to_keep.append(i)
Expand All @@ -194,7 +200,7 @@ def fake_stream(text):
if node not in nodes_to_keep:
runtime_graph.delete_node_if_exists(node)

if not llm_parameters.stream:
if (not llm_parameters.stream) or (not had_llm):
self.metrics.pending_update(False)

return result_dict, runtime_graph
Expand Down Expand Up @@ -309,7 +315,7 @@ def generate():
yield chunk

self.metrics.request_update(req_start)
self.metrics.pending_update(False)
self.metrics.pending_update(False)

return (
StreamingResponse(self.align_generator(generate(), **kwargs), media_type="text/event-stream"),
Expand Down

0 comments on commit be19c98

Please sign in to comment.