Skip to content

Commit

Permalink
Merge branch 'feat/unary-stream' into dockerify
Browse files Browse the repository at this point in the history
  • Loading branch information
anomit committed Oct 28, 2024
2 parents 776007e + 8c076d3 commit cfd3e84
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 19 deletions.
7 changes: 7 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ else
echo "Found LOCAL_COLLECTOR_PORT ${LOCAL_COLLECTOR_PORT}"
fi

if [ -z "$ENABLE_CRON_RESTART_LOCAL_COLLECTOR" ]; then
export ENABLE_CRON_RESTART_LOCAL_COLLECTOR=true
echo "ENABLE_CRON_RESTART_LOCAL_COLLECTOR not found in .env, setting to default value ${ENABLE_CRON_RESTART_LOCAL_COLLECTOR}"
else
echo "Found ENABLE_CRON_RESTART_LOCAL_COLLECTOR ${ENABLE_CRON_RESTART_LOCAL_COLLECTOR}"
fi

if [ "$MAX_STREAM_POOL_SIZE" ]; then
echo "Found MAX_STREAM_POOL_SIZE ${MAX_STREAM_POOL_SIZE}";
else
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ services:
- BLOCK_TIME=$BLOCK_TIME
- MAX_STREAM_POOL_SIZE=$MAX_STREAM_POOL_SIZE
- STREAM_POOL_HEALTH_CHECK_INTERVAL=$STREAM_POOL_HEALTH_CHECK_INTERVAL
- ENABLE_CRON_RESTART_LOCAL_COLLECTOR=$ENABLE_CRON_RESTART_LOCAL_COLLECTOR
command:
bash -c "bash server_autofill.sh && bash init_processes.sh"
networks:
Expand Down
31 changes: 17 additions & 14 deletions snapshotter/utils/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,20 +676,22 @@ async def send_message(self, msg, simulation=False):
Raises:
Exception: If failed to send the message.
"""
async with self._grpc_stub.SubmitSnapshot.open() as stream:
try:
await stream.send_message(msg)
self._logger.debug(f'Sent message to local collector: {msg}')
response = await stream.recv_message()
self._logger.debug(f'Received response from local collector for {msg}: {response}')
await stream.end()
self._logger.debug(f'gRPC stream ended for snapshot {msg}')
except (ConnectionResetError, grpclib.exceptions.StreamTerminatedError) as e:
pass # fail silently as this is intended for the stream to be closed right after sending the message
except asyncio.CancelledError:
self._logger.info('Task to send snapshot to local collector was asyncio cancelled! {}', msg)
else:
self._logger.info(f'Finalized snapshot submission to local collector without errors: {msg}')
try:
response = await self._grpc_stub.SubmitSnapshot(msg)
self._logger.debug(f'Sent message to local collector and received response: {response}')
except grpclib.GRPCError as e:
self._logger.error(f'gRPC error occurred while sending snapshot to local collector: {e}')
raise
except asyncio.CancelledError:
self._logger.info('Task to send snapshot to local collector was asyncio cancelled!')
raise
except Exception as e:
self._logger.error(f'Unexpected error occurred while sending snapshot to local collector: {e}')
raise
else:
self._logger.info(f'Successfully submitted snapshot to local collector: {msg}')

return response

async def _init_grpc(self):
"""
Expand Down Expand Up @@ -796,3 +798,4 @@ async def _cleanup_tasks(self):
)
task.cancel()
self._active_tasks.discard((task_start_time, task))

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ message SnapshotSubmission {

service Submission {
rpc SubmitSnapshotSimulation (stream SnapshotSubmission) returns (SubmissionResponse);
rpc SubmitSnapshot (stream SnapshotSubmission) returns (stream SubmissionResponse);
rpc SubmitSnapshot (SnapshotSubmission) returns (SubmissionResponse);
}

message SubmissionResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
),
'/submission.Submission/SubmitSnapshot': grpclib.const.Handler(
self.SubmitSnapshot,
grpclib.const.Cardinality.STREAM_STREAM,
grpclib.const.Cardinality.UNARY_UNARY,
submission_pb2.SnapshotSubmission,
submission_pb2.SubmissionResponse,
),
Expand All @@ -48,7 +48,7 @@ def __init__(self, channel: grpclib.client.Channel) -> None:
submission_pb2.SnapshotSubmission,
submission_pb2.SubmissionResponse,
)
self.SubmitSnapshot = grpclib.client.StreamStreamMethod(
self.SubmitSnapshot = grpclib.client.UnaryUnaryMethod(
channel,
'/submission.Submission/SubmitSnapshot',
submission_pb2.SnapshotSubmission,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit cfd3e84

Please sign in to comment.