diff --git a/src/greptimedb_worker.erl b/src/greptimedb_worker.erl index 6aee681..763295f 100644 --- a/src/greptimedb_worker.erl +++ b/src/greptimedb_worker.erl @@ -195,13 +195,20 @@ do_shoot(State0, Requests0, Pending0, N, Channel) -> Requests = Requests0#{pending := Pending, pending_count := N - 1}, State1 = State0#state{requests = Requests}, Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond), - case greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}) of - {ok, Stream} -> - shoot(Stream, Req, State1, [ReplyTo]); - _Err -> + try + case greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}) of + {ok, Stream} -> + shoot(Stream, Req, State1, [ReplyTo]); + _Err -> + State0 + end + catch + E:R:S -> + logger:error("[GreptimeDB] failed to shoot(pending=~0p,channel=~0p): ~0p ~0p ~p", [N, Channel, E, R, S]), State0 end. + shoot(Stream, ?REQ(Req, _), #state{requests = #{pending_count := 0}} = State, ReplyToList) -> %% Write the last request and finish stream case greptimedb_stream:write_request(Stream, Req) of @@ -276,11 +283,17 @@ health_check(Pid) -> gen_server:call(Pid, health_check, ?HEALTH_CHECK_TIMEOUT). stream(Pid) -> - case gen_server:call(Pid, channel, ?CALL_TIMEOUT) of - {ok, Channel} -> - Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond), - greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}); - Err -> Err + try + case gen_server:call(Pid, channel, ?CALL_TIMEOUT) of + {ok, Channel} -> + Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond), + greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}); + Err -> Err + end + catch + E:R:S -> + logger:error("[GreptimeDB] failed to create stream for ~0p: ~0p ~0p ~p", [Pid, E, R, S]), + {error, R} end. ddl() ->