Skip to content

Commit

Permalink
feat: async write (#26)
Browse files Browse the repository at this point in the history
* chore: merge_values

* feat: first commit for async write

* feat: async write by timeout

* fix: dialyzer

* test: improve async write test

* test: revert tests

* test: forgot t_auth_error

* feat: adds spec and docs for greptimedb APIs

* feat: reduce timeout events
  • Loading branch information
killme2008 authored Aug 2, 2023
1 parent c1c6733 commit 4619677
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 30 deletions.
76 changes: 61 additions & 15 deletions src/greptimedb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-module(greptimedb).

-export([start_client/1, stop_client/1, write_batch/2, write/3, write_stream/1,
is_alive/1, is_alive/2, ddl/1]).
async_write/4, async_write_batch/3, is_alive/1, is_alive/2, ddl/1]).

-spec start_client(list()) ->
{ok, Client :: map()} |
Expand All @@ -38,6 +38,21 @@ start_client(Options0) ->
{error, Reason}
end.

%% @doc Write points to the metric table, return the result.
-spec write(Client, Metric, Points) -> {ok, term()} | {error, term()}
when Client :: map(),
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()}.
write(Client, Metric, Points) ->
write_batch(Client, [{Metric, Points}]).

%% @doc Write a batch of data points to the database, return the result.
-spec write_batch(Client, MetricAndPoints) -> {ok, term()} | {error, term()}
when Client :: map(),
MetricAndPoints :: [MetricAndPoint],
Expand All @@ -60,19 +75,7 @@ write_batch(Client, MetricAndPoints) ->
{error, R}
end.

-spec write(Client, Metric, Points) -> {ok, term()} | {error, term()}
when Client :: map(),
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()}.
write(Client, Metric, Points) ->
write_batch(Client, [{Metric, Points}]).

%% @doc Create a gRPC stream to write data, return the stream or an error.
-spec write_stream(Client) -> {ok, term()} | {error, term()} when Client :: map().
write_stream(Client) ->
try
Expand All @@ -83,6 +86,39 @@ write_stream(Client) ->
{error, R}
end.

%% @doc Send an async request to write points to the metric table. The callback is evaluated when an error happens or response is received.
-spec async_write(Client, Metric, Points, ResultCallback) -> ok | {error, term()}
when Client :: map(),
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()},
ResultCallback :: {function(), list()}.
async_write(Client, Metric, Points, ResultCallback) ->
async_write_batch(Client, [{Metric, Points}], ResultCallback).

%% @doc Send a batch of async request. The callback is evaluated when an error happens or response is received.
-spec async_write_batch(Client, MetricAndPoints, ResultCallback) -> ok | {error, term()}
when Client :: map(),
MetricAndPoints :: [MetricAndPoint],
MetricAndPoint :: {Metric, Points},
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()},
ResultCallback :: {function(), list()}.
async_write_batch(Client, MetricAndPoints, ResultCallback) ->
Request = greptimedb_encoder:insert_requests(Client, MetricAndPoints),
async_handle(Client, Request, ResultCallback).

ddl(_Client) ->
todo.

Expand Down Expand Up @@ -120,7 +156,17 @@ handle(#{pool := Pool} = _Client, Request) ->
ecpool:with_client(Pool, Fun)
catch
E:R:S ->
logger:error("[GreptimeDB] grpc write fail: ~0p ~0p ~0p", [E, R, S]),
logger:error("[GreptimeDB] grpc handle fail: ~0p ~0p ~0p", [E, R, S]),
{error, {E, R}}
end.

async_handle(#{pool := Pool} = _Client, Request, ResultCallback) ->
Fun = fun(Worker) -> greptimedb_worker:async_handle(Worker, Request, ResultCallback) end,
try
ecpool:with_client(Pool, Fun)
catch
E:R:S ->
logger:error("[GreptimeDB] grpc async_handle fail: ~0p ~0p ~0p", [E, R, S]),
{error, {E, R}}
end.

Expand Down
6 changes: 3 additions & 3 deletions src/greptimedb_encoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ values_size(#{ts_microsecond_values := Values}) ->
values_size(#{ts_nanosecond_values := Values}) ->
length(Values).

merge_values(V1, V2) when map_size(V1) == 0 ->
V2;
merge_values(#{i8_values := V1} = L, #{i8_values := V2}) ->
L#{i8_values := [V2 | V1]};
merge_values(#{i16_values := V1} = L, #{i16_values := V2}) ->
Expand Down Expand Up @@ -138,7 +136,9 @@ merge_values(#{ts_millisecond_values := V1} = L, #{ts_millisecond_values := V2})
merge_values(#{ts_microsecond_values := V1} = L, #{ts_microsecond_values := V2}) ->
L#{ts_microsecond_values := [V2 | V1]};
merge_values(#{ts_nanosecond_values := V1} = L, #{ts_nanosecond_values := V2}) ->
L#{ts_nanosecond_values := [V2 | V1]}.
L#{ts_nanosecond_values := [V2 | V1]};
merge_values(V1, V2) when map_size(V1) == 0 ->
V2.

pad_null_mask(#{values := Values, null_mask := NullMask} = Column, RowCount) ->
ValuesSize = values_size(Values),
Expand Down
31 changes: 26 additions & 5 deletions src/greptimedb_stream.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
-module(greptimedb_stream).

-export([write/3, finish/1]).
-export([write/3, write_batch/2, write_request/2, finish/1]).

%% @doc write the points of the metric to the gRPC stream, returns the result.
-spec write(Stream, Metric, Points) -> {ok, term()} | {error, term()}
when Stream :: map(),
Metric :: Table | {DbName, Table},
Expand All @@ -13,20 +14,40 @@
fields => map(),
timestamp => integer()}.
write(Stream, Metric, Points) ->
write_batch(Stream, [{Metric, Points}]).

%% @doc Write a batch of data points to the gRPC stream, return the result.
-spec write_batch(Stream, MetricAndPoints) -> {ok, term()} | {error, term()}
when Stream :: map(),
MetricAndPoints :: [MetricAndPoint],
MetricAndPoint :: {Metric, Points},
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()}.
write_batch(Stream, MetricAndPoints) ->
Request = greptimedb_encoder:insert_requests(Stream, MetricAndPoints),
write_request(Stream, Request).

write_request(Stream, Request) ->
try
Request = greptimedb_encoder:insert_requests(Stream, [{Metric, Points}]),
grpcbox_client:send(Stream, Request)
catch
E:R:S ->
logger:error("[GreptimeDB] stream write ~0p failed: ~0p ~0p ~0p ~p",
[Metric, Points, E, R, S]),
logger:error("[GreptimeDB] stream write ~0p failed: ~0p ~0p ~p", [Request, E, R, S]),
{error, R}
end.

%% @doc Finish the gRPC stream and wait the result.
-spec finish(Stream :: map()) -> {ok, term()} | {error, term()}.
finish(Stream) ->
finish(Stream, 5000).
finish(Stream, 10_000).

%% @doc Finish the gRPC stream and wait the result with timeout in milliseconds.
-spec finish(Stream :: map(), Timeout :: integer()) -> {ok, term()} | {error, term()}.
finish(Stream, Timeout) ->
try
Expand Down
Loading

0 comments on commit 4619677

Please sign in to comment.