From 4619677de9b0b7a269ea1d136fe3a1394b822c94 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Wed, 2 Aug 2023 14:37:42 +0800 Subject: [PATCH] feat: async write (#26) * chore: merge_values * feat: first commit for async write * feat: async write by timeout * fix: dialyzer * test: improve async write test * test: revert tests * test: forgot t_auth_error * feat: adds spec and docs for greptimedb APIs * feat: reduce timeout events --- src/greptimedb.erl | 76 ++++++++++--- src/greptimedb_encoder.erl | 6 +- src/greptimedb_stream.erl | 31 +++++- src/greptimedb_worker.erl | 212 +++++++++++++++++++++++++++++++++++-- test/greptimedb_SUITE.erl | 52 ++++++++- 5 files changed, 347 insertions(+), 30 deletions(-) diff --git a/src/greptimedb.erl b/src/greptimedb.erl index 0943c05..dcb561e 100644 --- a/src/greptimedb.erl +++ b/src/greptimedb.erl @@ -15,7 +15,7 @@ -module(greptimedb). -export([start_client/1, stop_client/1, write_batch/2, write/3, write_stream/1, - is_alive/1, is_alive/2, ddl/1]). + async_write/4, async_write_batch/3, is_alive/1, is_alive/2, ddl/1]). -spec start_client(list()) -> {ok, Client :: map()} | @@ -38,6 +38,21 @@ start_client(Options0) -> {error, Reason} end. +%% @doc Write points to the metric table, return the result. +-spec write(Client, Metric, Points) -> {ok, term()} | {error, term()} + when Client :: map(), + Metric :: Table | {DbName, Table}, + DbName :: atom() | binary() | list(), + Table :: atom() | binary() | list(), + Points :: [Point], + Point :: + #{tags => map(), + fields => map(), + timestamp => integer()}. +write(Client, Metric, Points) -> + write_batch(Client, [{Metric, Points}]). + +%% @doc Write a batch of data points to the database, return the result. -spec write_batch(Client, MetricAndPoints) -> {ok, term()} | {error, term()} when Client :: map(), MetricAndPoints :: [MetricAndPoint], @@ -60,19 +75,7 @@ write_batch(Client, MetricAndPoints) -> {error, R} end. --spec write(Client, Metric, Points) -> {ok, term()} | {error, term()} - when Client :: map(), - Metric :: Table | {DbName, Table}, - DbName :: atom() | binary() | list(), - Table :: atom() | binary() | list(), - Points :: [Point], - Point :: - #{tags => map(), - fields => map(), - timestamp => integer()}. -write(Client, Metric, Points) -> - write_batch(Client, [{Metric, Points}]). - +%% @doc Create a gRPC stream to write data, return the stream or an error. -spec write_stream(Client) -> {ok, term()} | {error, term()} when Client :: map(). write_stream(Client) -> try @@ -83,6 +86,39 @@ write_stream(Client) -> {error, R} end. +%% @doc Send an async request to write points to the metric table. The callback is evaluated when an error happens or response is received. +-spec async_write(Client, Metric, Points, ResultCallback) -> ok | {error, term()} + when Client :: map(), + Metric :: Table | {DbName, Table}, + DbName :: atom() | binary() | list(), + Table :: atom() | binary() | list(), + Points :: [Point], + Point :: + #{tags => map(), + fields => map(), + timestamp => integer()}, + ResultCallback :: {function(), list()}. +async_write(Client, Metric, Points, ResultCallback) -> + async_write_batch(Client, [{Metric, Points}], ResultCallback). + +%% @doc Send a batch of async request. The callback is evaluated when an error happens or response is received. +-spec async_write_batch(Client, MetricAndPoints, ResultCallback) -> ok | {error, term()} + when Client :: map(), + MetricAndPoints :: [MetricAndPoint], + MetricAndPoint :: {Metric, Points}, + Metric :: Table | {DbName, Table}, + DbName :: atom() | binary() | list(), + Table :: atom() | binary() | list(), + Points :: [Point], + Point :: + #{tags => map(), + fields => map(), + timestamp => integer()}, + ResultCallback :: {function(), list()}. +async_write_batch(Client, MetricAndPoints, ResultCallback) -> + Request = greptimedb_encoder:insert_requests(Client, MetricAndPoints), + async_handle(Client, Request, ResultCallback). + ddl(_Client) -> todo. @@ -120,7 +156,17 @@ handle(#{pool := Pool} = _Client, Request) -> ecpool:with_client(Pool, Fun) catch E:R:S -> - logger:error("[GreptimeDB] grpc write fail: ~0p ~0p ~0p", [E, R, S]), + logger:error("[GreptimeDB] grpc handle fail: ~0p ~0p ~0p", [E, R, S]), + {error, {E, R}} + end. + +async_handle(#{pool := Pool} = _Client, Request, ResultCallback) -> + Fun = fun(Worker) -> greptimedb_worker:async_handle(Worker, Request, ResultCallback) end, + try + ecpool:with_client(Pool, Fun) + catch + E:R:S -> + logger:error("[GreptimeDB] grpc async_handle fail: ~0p ~0p ~0p", [E, R, S]), {error, {E, R}} end. diff --git a/src/greptimedb_encoder.erl b/src/greptimedb_encoder.erl index 804a179..9f5bb1f 100644 --- a/src/greptimedb_encoder.erl +++ b/src/greptimedb_encoder.erl @@ -101,8 +101,6 @@ values_size(#{ts_microsecond_values := Values}) -> values_size(#{ts_nanosecond_values := Values}) -> length(Values). -merge_values(V1, V2) when map_size(V1) == 0 -> - V2; merge_values(#{i8_values := V1} = L, #{i8_values := V2}) -> L#{i8_values := [V2 | V1]}; merge_values(#{i16_values := V1} = L, #{i16_values := V2}) -> @@ -138,7 +136,9 @@ merge_values(#{ts_millisecond_values := V1} = L, #{ts_millisecond_values := V2}) merge_values(#{ts_microsecond_values := V1} = L, #{ts_microsecond_values := V2}) -> L#{ts_microsecond_values := [V2 | V1]}; merge_values(#{ts_nanosecond_values := V1} = L, #{ts_nanosecond_values := V2}) -> - L#{ts_nanosecond_values := [V2 | V1]}. + L#{ts_nanosecond_values := [V2 | V1]}; +merge_values(V1, V2) when map_size(V1) == 0 -> + V2. pad_null_mask(#{values := Values, null_mask := NullMask} = Column, RowCount) -> ValuesSize = values_size(Values), diff --git a/src/greptimedb_stream.erl b/src/greptimedb_stream.erl index 5c7bffa..19fa7ec 100644 --- a/src/greptimedb_stream.erl +++ b/src/greptimedb_stream.erl @@ -1,7 +1,8 @@ -module(greptimedb_stream). --export([write/3, finish/1]). +-export([write/3, write_batch/2, write_request/2, finish/1]). +%% @doc write the points of the metric to the gRPC stream, returns the result. -spec write(Stream, Metric, Points) -> {ok, term()} | {error, term()} when Stream :: map(), Metric :: Table | {DbName, Table}, @@ -13,20 +14,40 @@ fields => map(), timestamp => integer()}. write(Stream, Metric, Points) -> + write_batch(Stream, [{Metric, Points}]). + +%% @doc Write a batch of data points to the gRPC stream, return the result. +-spec write_batch(Stream, MetricAndPoints) -> {ok, term()} | {error, term()} + when Stream :: map(), + MetricAndPoints :: [MetricAndPoint], + MetricAndPoint :: {Metric, Points}, + Metric :: Table | {DbName, Table}, + DbName :: atom() | binary() | list(), + Table :: atom() | binary() | list(), + Points :: [Point], + Point :: + #{tags => map(), + fields => map(), + timestamp => integer()}. +write_batch(Stream, MetricAndPoints) -> + Request = greptimedb_encoder:insert_requests(Stream, MetricAndPoints), + write_request(Stream, Request). + +write_request(Stream, Request) -> try - Request = greptimedb_encoder:insert_requests(Stream, [{Metric, Points}]), grpcbox_client:send(Stream, Request) catch E:R:S -> - logger:error("[GreptimeDB] stream write ~0p failed: ~0p ~0p ~0p ~p", - [Metric, Points, E, R, S]), + logger:error("[GreptimeDB] stream write ~0p failed: ~0p ~0p ~p", [Request, E, R, S]), {error, R} end. +%% @doc Finish the gRPC stream and wait the result. -spec finish(Stream :: map()) -> {ok, term()} | {error, term()}. finish(Stream) -> - finish(Stream, 5000). + finish(Stream, 10_000). +%% @doc Finish the gRPC stream and wait the result with timeout in milliseconds. -spec finish(Stream :: map(), Timeout :: integer()) -> {ok, term()} | {error, term()}. finish(Stream, Timeout) -> try diff --git a/src/greptimedb_worker.erl b/src/greptimedb_worker.erl index 79a5a25..faecf98 100644 --- a/src/greptimedb_worker.erl +++ b/src/greptimedb_worker.erl @@ -21,15 +21,28 @@ -include_lib("grpcbox/include/grpcbox.hrl"). -export([handle/2, stream/1, ddl/0, health_check/1]). --export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). +-export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, async_handle/3]). -export([connect/1]). --record(state, {channel}). +-record(state, {channel, requests}). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. -define(CALL_TIMEOUT, 12_000). -define(HEALTH_CHECK_TIMEOUT, 1_000). -define(REQUEST_TIMEOUT, 10_000). -define(CONNECT_TIMEOUT, 5_000). +-define(ASYNC_BATCH_SIZE, 100). +-define(ASYNC_BATCH_TIMEOUT, 100). +-define(ASYNC_REQ(Req, ExpireAt, ResultCallback), + {async, Req, ExpireAt, ResultCallback} + ). +-define(REQ(Req, ExpireAt), + {Req, ExpireAt} + ). +-define(PEND_REQ(ReplyTo, Req), {ReplyTo, Req}). %% =================================================================== %% gen_server callbacks @@ -42,7 +55,7 @@ init(Args) -> lists:map(fun({Schema, Host, Port}) -> {Schema, Host, Port, []} end, Endpoints), Channel = list_to_atom(pid_to_list(self())), {ok, _} = grpcbox_channel_sup:start_child(Channel, Channels, Options), - {ok, #state{channel = Channel}}. + {ok, #state{channel = Channel, requests = #{ pending => queue:new(), pending_count => 0}}}. handle_call({handle, Request}, _From, #state{channel = Channel} = State) -> Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond), @@ -69,26 +82,185 @@ handle_call(health_check, _From, #state{channel = Channel} = State) -> handle_call(channel, _From, #state{channel = Channel} = State) -> {reply, {ok, Channel}, State}. +handle_info(?ASYNC_REQ(Request, ExpireAt, ResultCallback), State0) -> + Req = ?REQ(Request, ExpireAt), + State1 = enqueue_req(ResultCallback, Req, State0), + State = maybe_shoot(State1, false), + noreply_state(State); + +handle_info(timeout, State0) -> + State = maybe_shoot(State0, true), + noreply_state(State); + +handle_info(Info, State) -> + logger:debug("~p unexpected_info: ~p, channel: ~p", [?MODULE, Info, State#state.channel]), + + {noreply, State, ?ASYNC_BATCH_TIMEOUT}. + + start_link(Args) -> gen_server:start_link(?MODULE, Args, []). handle_cast(_Request, State) -> {noreply, State}. -handle_info(_Info, State) -> - {noreply, State}. - terminate(Reason, #state{channel = Channel} = State) -> logger:debug("[GreptimeDB] genserver has stopped (~w)~n", [self()]), grpcbox_channel:stop(Channel), {stop, Reason, State}. + +%%%=================================================================== +%%% Helper functions +%%%=================================================================== +now_() -> + erlang:system_time(millisecond). + + +fresh_expire_at(Timeout) -> + now_() + Timeout. + +enqueue_latest_fn(#{prioritise_latest := true}) -> + fun queue:in_r/2; +enqueue_latest_fn(_) -> + fun queue:in/2. + +peek_oldest_fn(#{prioritise_latest := true}) -> + {fun queue:peek_r/1, fun queue:out_r/1}; +peek_oldest_fn(_) -> + {fun queue:peek/1, fun queue:out/1}. + +%% For async-request, we evaluate the result-callback with {error, timeout} +maybe_reply_timeout({F, A}) when is_function(F) -> + _ = erlang:apply(F, A ++ [{error, timeout}]), + ok; +maybe_reply_timeout(_) -> + %% This is not a callback, but the gen_server:call's From + %% The caller should have alreay given up waiting for a reply, + %% so no need to call gen_server:reply(From, {error, timeout}) + ok. + +reply({F, A}, Result) when is_function(F) -> + _ = erlang:apply(F, A ++ [Result]), + ok; +reply(From, Result) -> + gen_server:reply(From, Result). + +noreply_state(#state{requests = #{pending_count := N}} = State) when N > 0 -> + {noreply, State, ?ASYNC_BATCH_TIMEOUT}; + +noreply_state(State) -> + {noreply, State}. + + +%%%=================================================================== +%%% Async requests queue functions +%%%=================================================================== +enqueue_req(ReplyTo, Req, #state{requests = Requests0} = State) -> + #{ + pending := Pending, + pending_count := PC + } = Requests0, + InFun = enqueue_latest_fn(Requests0), + NewPending = InFun(?PEND_REQ(ReplyTo, Req), Pending), + Requests = Requests0#{pending := NewPending, pending_count := PC + 1}, + State#state{requests = drop_expired(Requests)}. + + +%% Try to write requests +maybe_shoot(#state{requests = Requests0, channel = Channel} = State0, Force) -> + State = State0#state{requests = drop_expired(Requests0)}, + %% If the channel is down + ClientDown = is_pid(Channel) andalso (not is_process_alive(Channel)), + case ClientDown of + true -> + State; + false -> + do_shoot(State, Force) + end. + + +do_shoot(#state{requests = #{pending := Pending0, pending_count := N} = Requests0, channel = Channel} = State0, _Force) when + N >= ?ASYNC_BATCH_SIZE -> + do_shoot(State0, Requests0, Pending0, N, Channel); + +do_shoot(#state{requests = #{pending := Pending0, pending_count := N} = Requests0, channel = Channel} = State0, true) when + N > 0 -> + do_shoot(State0, Requests0, Pending0, N, Channel); +do_shoot(State, _Force) -> + State. + +do_shoot(State0, Requests0, Pending0, N, Channel) -> + {{value, ?PEND_REQ(ReplyTo, Req)}, Pending} = queue:out(Pending0), + Requests = Requests0#{pending := Pending, pending_count := N - 1}, + State1 = State0#state{requests = Requests}, + Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond), + {ok, Stream} = greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}), + shoot(Stream, Req, State1, [ReplyTo]). + + +shoot(Stream, ?REQ(Req, _), #state{requests = #{pending_count := 0}} = State, ReplyToList) -> + %% Write the last request and finish stream + case greptimedb_stream:write_request(Stream, Req) of + ok -> + Result = greptimedb_stream:finish(Stream), + lists:foreach(fun(ReplyTo) -> + reply(ReplyTo, Result) + end, ReplyToList); + Error -> + lists:foreach(fun(ReplyTo) -> + reply(ReplyTo, Error) + end, ReplyToList) + end, + State; + +shoot(Stream, ?REQ(Req, _), #state{requests = #{pending := Pending0, pending_count := N} = Requests0} = State0, ReplyToList) -> + case greptimedb_stream:write_request(Stream, Req) of + ok -> + {{value, ?PEND_REQ(ReplyTo, NextReq)}, Pending} = queue:out(Pending0), + Requests = Requests0#{pending := Pending, pending_count := N - 1}, + State1 = State0#state{requests = Requests}, + shoot(Stream, NextReq, State1, [ReplyTo | ReplyToList]); + Error -> + lists:foreach(fun(ReplyTo) -> + reply(ReplyTo, Error) + end, ReplyToList), + State0 + end. + +%% Continue droping expired requests, to avoid the state RAM usage +%% explosion if http client can not keep up. +drop_expired(#{pending_count := 0} = Requests) -> + Requests; +drop_expired(Requests) -> + drop_expired(Requests, now_()). + +drop_expired(#{pending_count := 0} = Requests, _Now) -> + Requests; +drop_expired(#{pending := Pending, pending_count := PC} = Requests, Now) -> + {PeekFun, OutFun} = peek_oldest_fn(Requests), + {value, ?PEND_REQ(ReplyTo, ?REQ(_, ExpireAt))} = PeekFun(Pending), + case is_integer(ExpireAt) andalso Now > ExpireAt of + true -> + {_, NewPendings} = OutFun(Pending), + NewRequests = Requests#{pending => NewPendings, pending_count => PC - 1}, + ok = maybe_reply_timeout(ReplyTo), + drop_expired(NewRequests, Now); + false -> + Requests + end. + %%%=================================================================== %%% Public functions %%%=================================================================== handle(Pid, Request) -> gen_server:call(Pid, {handle, Request}, ?CALL_TIMEOUT). +async_handle(Pid, Request, ResultCallback) -> + ExpireAt = fresh_expire_at(?REQUEST_TIMEOUT), + _ = erlang:send(Pid, ?ASYNC_REQ(Request, ExpireAt, ResultCallback)), + ok. + health_check(Pid) -> gen_server:call(Pid, health_check, ?HEALTH_CHECK_TIMEOUT). @@ -105,3 +277,31 @@ ddl() -> %%%=================================================================== connect(Options) -> start_link(Options). + + +%%%=================================================================== +%%% Tests +%%%=================================================================== +-ifdef(TEST). + +prioritise_latest_test() -> + Opts = #{prioritise_latest => true}, + Seq = [1, 2, 3, 4], + In = enqueue_latest_fn(Opts), + {PeekOldest, OutOldest} = peek_oldest_fn(Opts), + Q = lists:foldl(fun(I, QIn) -> In(I, QIn) end, queue:new(), Seq), + ?assertEqual({value, 1}, PeekOldest(Q)), + ?assertMatch({{value, 1}, _}, OutOldest(Q)), + ?assertMatch({{value, 4}, _}, queue:out(Q)). + +prioritise_oldest_test() -> + Opts = #{prioritise_latest => false}, + Seq = [1, 2, 3, 4], + In = enqueue_latest_fn(Opts), + {PeekOldest, OutOldest} = peek_oldest_fn(Opts), + Q = lists:foldl(fun(I, QIn) -> In(I, QIn) end, queue:new(), Seq), + ?assertEqual({value, 1}, PeekOldest(Q)), + ?assertMatch({{value, 1}, _}, OutOldest(Q)), + ?assertMatch({{value, 1}, _}, queue:out(Q)). + +-endif. diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index 60f00a4..53a07ae 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -6,7 +6,14 @@ -include_lib("eunit/include/eunit.hrl"). all() -> - [t_write, t_write_stream, t_insert_requests, t_write_batch, t_bench_perf, t_auth_error]. + [t_write, + t_write_stream, + t_insert_requests, + t_write_batch, + t_bench_perf, + t_write_stream, + t_async_write_batch, + t_auth_error]. %%[t_bench_perf]. %%[t_insert_requests, t_bench_perf]. @@ -332,3 +339,46 @@ t_bench_perf(_) -> [Concurrency, Time, Rows, TPS]), greptimedb:stop_client(Client), ok. + +async_write(Client, StartMs) -> + Ref = make_ref(), + TestPid = self(), + ResultCallback = {fun(Reply) -> TestPid ! {{Ref, reply}, Reply} end, []}, + + Metric = <<"async_metrics">>, + Points = bench_points(StartMs, 10), + + ok = greptimedb:async_write_batch(Client, [{Metric, Points}], ResultCallback), + + Ref. + +recv(Ref) -> + receive + {{Ref, reply}, Reply} -> + ct:print("Reply ~w~n", [Reply]) + end. + +t_async_write_batch(_) -> + Options = + [{endpoints, [{http, "localhost", 4001}]}, + {pool, greptimedb_client_pool}, + {pool_size, 8}, + {pool_type, random}, + {auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}], + + {ok, Client} = greptimedb:start_client(Options), + true = greptimedb:is_alive(Client), + + StartMs = 1690874475279, + %% Write once + Ref = async_write(Client, StartMs + 100000), + recv(Ref), + + %% Write batches + N = 100, + Refs = + lists:map(fun(Num) -> async_write(Client, StartMs + Num * 10) end, lists:seq(1, N)), + lists:foreach(fun(Ref0) -> recv(Ref0) end, Refs), + + greptimedb:stop_client(Client), + ok.