Skip to content

Commit

Permalink
retries for GRPC streams to handle queued tasks in CE (#47)
Browse files Browse the repository at this point in the history
Co-authored-by: Maxim Kolomeychenko <[email protected]>
  • Loading branch information
mkolomeychenko and Maxim Kolomeychenko authored Nov 6, 2023
1 parent 97f4a4f commit f7077b0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 31 deletions.
40 changes: 11 additions & 29 deletions agent/configs/timeouts_for_stateless.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,46 @@
"__endless_stream_in": {
"class": "RetrierAlwaysYield",
"params": {
"retry_cnt": 30,
"retry_cnt": 100,
"wait_sec_first": 2,
"wait_sec_max": 2,
"timeout": [
4,
30
]
"timeout": [4, 30]
}
},
"__data_stream_in": {
"class": "RetrierConnTOYield",
"params": {
"retry_cnt": 5,
"retry_cnt": 100,
"wait_sec_first": 1,
"wait_sec_max": 4,
"timeout": [
4,
60
]
"timeout": [4, 60]
}
},
"__data_stream_out": {
"class": "RetrierConnTO",
"params": {
"retry_cnt": 5,
"retry_cnt": 100,
"wait_sec_first": 1,
"wait_sec_max": 4,
"timeout": [
4,
60
]
"timeout": [4, 60]
}
},
"__simple_request": {
"class": "RetrierConnTO",
"params": {
"retry_cnt": 5,
"retry_cnt": 100,
"wait_sec_first": 1,
"wait_sec_max": 4,
"timeout": [
4,
60
]
"timeout": [4, 60]
}
},
"Log": {
"class": "RetrierAlways",
"params": {
"retry_cnt": 1,
"retry_cnt": 100,
"wait_sec_first": 0,
"wait_sec_max": 0,
"timeout": [
4,
30
],
"timeout": [4, 30],
"swallow_exc": true
}
},
Expand All @@ -66,10 +51,7 @@
"retry_cnt": 1000000000,
"wait_sec_first": 2,
"wait_sec_max": 2,
"timeout": [
4,
15
]
"timeout": [4, 15]
}
}
}
12 changes: 10 additions & 2 deletions agent/worker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ def send_connect_info(self):

def get_new_task(self):
for task in self.api.get_endless_stream(
"GetNewTask", sly.api_proto.Task, sly.api_proto.Empty()
"GetNewTask",
sly.api_proto.Task,
sly.api_proto.Empty(),
server_fail_limit=200,
wait_server_sec=5,
):
task_msg = json.loads(task.data)
task_msg["agent_info"] = self.agent_info
Expand All @@ -233,7 +237,11 @@ def get_new_task(self):

def get_stop_task(self):
for task in self.api.get_endless_stream(
"GetStopTask", sly.api_proto.Id, sly.api_proto.Empty()
"GetStopTask",
sly.api_proto.Id,
sly.api_proto.Empty(),
server_fail_limit=200,
wait_server_sec=5,
):
stop_task_id = task.id
self.logger.info("GET_STOP_TASK", extra={"task_id": stop_task_id})
Expand Down

0 comments on commit f7077b0

Please sign in to comment.