diff --git a/src/greptimedb_worker.erl b/src/greptimedb_worker.erl index 763295f..4fef10a 100644 --- a/src/greptimedb_worker.erl +++ b/src/greptimedb_worker.erl @@ -199,13 +199,16 @@ do_shoot(State0, Requests0, Pending0, N, Channel) -> case greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}) of {ok, Stream} -> shoot(Stream, Req, State1, [ReplyTo]); - _Err -> - State0 + Error -> + reply(ReplyTo, Error), + State1 + end catch E:R:S -> logger:error("[GreptimeDB] failed to shoot(pending=~0p,channel=~0p): ~0p ~0p ~p", [N, Channel, E, R, S]), - State0 + reply(ReplyTo, R), + State1 end. diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index 0b209ba..fe6f249 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -9,6 +9,7 @@ all() -> [t_write, t_write_stream, t_insert_requests, + t_write_failure, t_write_batch, t_bench_perf, t_write_stream, @@ -106,6 +107,52 @@ t_insert_requests(_) -> end, ok. +t_write_failure(_) -> + Metric = <<"temperatures">>, + Points = + [#{fields => #{<<"temperature">> => 1}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverA">>, + <<"qos">> => greptimedb_values:int64_value(0), + <<"region">> => <<"hangzhou">>}, + timestamp => 1619775142098}, + #{fields => #{<<"temperature">> => 2}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverB">>, + <<"qos">> => greptimedb_values:int64_value(1), + <<"region">> => <<"ningbo">>, + <<"to">> => <<"kafka">>}, + timestamp => 1619775143098}], + Options = + %% the port 5001 is invalid + [{endpoints, [{http, "localhost", 5001}]}, + {pool, greptimedb_client_pool}, + {pool_size, 5}, + {pool_type, random}, + {auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}], + + {ok, Client} = greptimedb:start_client(Options), + false = greptimedb:is_alive(Client), + {error, _} = greptimedb:write(Client, Metric, Points), + + %% async write + Ref = make_ref(), + TestPid = self(), + ResultCallback = {fun(Reply) -> TestPid ! {{Ref, reply}, Reply} end, []}, + + ok = greptimedb:async_write(Client, Metric, Points, ResultCallback), + receive + {{Ref, reply}, {error, Error}} -> + ct:print("write failure: ~p", [Error]); + {{Ref, reply}, _Other} -> + ?assert(false) + end, + + greptimedb:stop_client(Client), + ok. + t_write(_) -> Metric = <<"temperatures">>, Points =