Skip to content

Commit

Permalink
fix: should reply the error when fail to shoot (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 authored Dec 4, 2023
1 parent 92a9646 commit 1928d03
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/greptimedb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,16 @@ do_shoot(State0, Requests0, Pending0, N, Channel) ->
case greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}) of
{ok, Stream} ->
shoot(Stream, Req, State1, [ReplyTo]);
_Err ->
State0
Error ->
reply(ReplyTo, Error),
State1

end
catch
E:R:S ->
logger:error("[GreptimeDB] failed to shoot(pending=~0p,channel=~0p): ~0p ~0p ~p", [N, Channel, E, R, S]),
State0
reply(ReplyTo, R),
State1
end.


Expand Down
47 changes: 47 additions & 0 deletions test/greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ all() ->
[t_write,
t_write_stream,
t_insert_requests,
t_write_failure,
t_write_batch,
t_bench_perf,
t_write_stream,
Expand Down Expand Up @@ -106,6 +107,52 @@ t_insert_requests(_) ->
end,
ok.

t_write_failure(_) ->
Metric = <<"temperatures">>,
Points =
[#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => greptimedb_values:int64_value(0),
<<"region">> => <<"hangzhou">>},
timestamp => 1619775142098},
#{fields => #{<<"temperature">> => 2},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverB">>,
<<"qos">> => greptimedb_values:int64_value(1),
<<"region">> => <<"ningbo">>,
<<"to">> => <<"kafka">>},
timestamp => 1619775143098}],
Options =
%% the port 5001 is invalid
[{endpoints, [{http, "localhost", 5001}]},
{pool, greptimedb_client_pool},
{pool_size, 5},
{pool_type, random},
{auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}],

{ok, Client} = greptimedb:start_client(Options),
false = greptimedb:is_alive(Client),
{error, _} = greptimedb:write(Client, Metric, Points),

%% async write
Ref = make_ref(),
TestPid = self(),
ResultCallback = {fun(Reply) -> TestPid ! {{Ref, reply}, Reply} end, []},

ok = greptimedb:async_write(Client, Metric, Points, ResultCallback),
receive
{{Ref, reply}, {error, Error}} ->
ct:print("write failure: ~p", [Error]);
{{Ref, reply}, _Other} ->
?assert(false)
end,

greptimedb:stop_client(Client),
ok.

t_write(_) ->
Metric = <<"temperatures">>,
Points =
Expand Down

0 comments on commit 1928d03

Please sign in to comment.