Skip to content

Commit

Permalink
feat: supports gRPC insertion hints, close #42 (#43)
Browse files Browse the repository at this point in the history
* feat: supports gRPC insertion hints, close #42

* ci: update greptimedb to v0.9.3

* chore: refactor code

* docs: update readme
  • Loading branch information
killme2008 authored Sep 10, 2024
1 parent 73f5d11 commit 458f4dd
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- uses: actions/checkout@v3
- name: Setup greptimedb
run: |
GREPTIMEDB_VER=v0.9.0
GREPTIMEDB_VER=v0.9.3
DOWNLOAD_URL=https://github.com/GreptimeTeam/greptimedb
curl -L ${DOWNLOAD_URL}/releases/download/${GREPTIMEDB_VER}/greptime-linux-amd64-${GREPTIMEDB_VER}.tar.gz -o /tmp/greptimedb-${GREPTIMEDB_VER}-linux-amd64.tar.gz
mkdir -p /tmp/greptimedb-download
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Start the client:
[{endpoints, [{http, "localhost", 4001}]},
{pool, greptimedb_client_pool},
{pool_size, 5},
{grpc_hints, #{}},
{pool_type, random},
{timeunit, ms}].
{ok, Client} = greptimedb:start_client(Options).
Expand Down Expand Up @@ -129,6 +130,12 @@ Stop the client:
greptimedb:stop_client(Client).
```

Check if the client is alive:

```erlang
true = greptimedb:is_alive(Client),
```

Connect GreptimeDB with authentication:

```erlang
Expand Down Expand Up @@ -205,6 +212,12 @@ A proper list contains:
* `endpoints`: List of the GreptimeDB server address in the form of `{http, host, port}`
* `pool`, `pool_size` etc.: the client pool settings
* `grpc_opts`: grpxbox [client options](https://github.com/tsloughter/grpcbox#defining-channels)
* `grpc_hints`: a map for GreptimeDB gRPC insertion hints, for example`#{ <<"append_mode">> => <<"true">> }` to enable append mode when creating tables automatically. Valid hints include:
* `append_mode`: `true` or `false` to enable append mode, default is `false`,
* `ttl`: time to live, the table `ttl` option,
* `merge_mode`: `last_row` or `last_non_null`, default is `last_row`,
* `auto_create_table`: `true` or `false`, whether to create tables automatically when writing data, default is `false`,
* More about these table options, please read the [doc](https://docs.greptime.com/reference/sql/create/#table-options).
* `ssl_opts`: when the endpoint scheme is `https`, the ssl options to use(`[]` by default).
* `auth`: authentication options, `{auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}` for example.
* `timeunit`: Timestamp unit, supports:
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{deps, [
{grpcbox, "0.17.1"},
{ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}}
{ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.9"}}}
]}.

{grpc, [{protos, "protos"}, {gpb_opts, [{module_name_suffix, "_pb"}]}]}.
Expand Down
25 changes: 16 additions & 9 deletions src/greptimedb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
-export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, async_handle/3]).
-export([connect/1]).

-record(state, {channel, requests}).
-record(state, {channel, requests, hints}).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
Expand All @@ -33,6 +33,7 @@
-define(CALL_TIMEOUT, 12_000).
-define(HEALTH_CHECK_TIMEOUT, 1_000).
-define(REQUEST_TIMEOUT, 10_000).
-define(GTDB_HINT_HEADER_PREFIX, <<"x-greptime-hint-">>).
-define(CONNECT_TIMEOUT, 5_000).
-define(ASYNC_BATCH_SIZE, 100).
-define(ASYNC_BATCH_TIMEOUT, 100).
Expand All @@ -50,17 +51,21 @@
init(Args) ->
logger:debug("[GreptimeDB] genserver has started (~w)~n", [self()]),
Endpoints = proplists:get_value(endpoints, Args),
Hints0 = proplists:get_value(grpc_hints, Args, #{}),
Hints = maps:fold(fun(Key, Value, Acc) ->
Acc#{<<?GTDB_HINT_HEADER_PREFIX/binary, Key/binary>> => Value}
end, #{}, Hints0),
SslOptions = proplists:get_value(ssl_opts, Args, []),
Options = proplists:get_value(grpc_opts, Args, #{connect_timeout => ?CONNECT_TIMEOUT}),
Channels =
lists:map(fun({Scheme, Host, Port}) -> {Scheme, Host, Port, ssl_options(Scheme, SslOptions)}
end, Endpoints),
Channel = list_to_atom(pid_to_list(self())),
{ok, _} = grpcbox_channel_sup:start_child(Channel, Channels, Options),
{ok, #state{channel = Channel, requests = #{ pending => queue:new(), pending_count => 0}}}.
{ok, #state{channel = Channel, hints = Hints, requests = #{ pending => queue:new(), pending_count => 0}}}.

handle_call({handle, Request}, _From, #state{channel = Channel} = State) ->
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
handle_call({handle, Request}, _From, #state{channel = Channel, hints = Hints} = State) ->
Ctx = new_ctx(?REQUEST_TIMEOUT, Hints),
Reply = greptime_v_1_greptime_database_client:handle(Ctx, Request, #{channel => Channel}),
logger:debug("[GreptimeDB] handle_call reply: ~w~n", [Reply]),
case Reply of
Expand All @@ -71,9 +76,9 @@ handle_call({handle, Request}, _From, #state{channel = Channel} = State) ->
Err ->
{reply, Err, State}
end;
handle_call(health_check, _From, #state{channel = Channel} = State) ->
handle_call(health_check, _From, #state{channel = Channel, hints = Hints} = State) ->
Request = #{},
Ctx = ctx:with_deadline_after(?HEALTH_CHECK_TIMEOUT, millisecond),
Ctx = new_ctx(?HEALTH_CHECK_TIMEOUT, Hints),
Reply =
greptime_v_1_health_check_client:health_check(Ctx, Request, #{channel => Channel}),
case Reply of
Expand Down Expand Up @@ -128,6 +133,9 @@ ssl_options(https, []) ->
ssl_options(_, SslOptions) ->
SslOptions.

new_ctx(TimeoutMs, Values) ->
Ctx0 = ctx:with_values(#{md_outgoing_key => Values}),
ctx:with_deadline_after(Ctx0, TimeoutMs, millisecond).

now_() ->
erlang:system_time(millisecond).
Expand Down Expand Up @@ -206,11 +214,11 @@ do_shoot(#state{requests = #{pending := Pending0, pending_count := N} = Requests
do_shoot(State, _Force) ->
State.

do_shoot(State0, Requests0, Pending0, N, Channel) ->
do_shoot(#state{hints = Hints} = State0, Requests0, Pending0, N, Channel) ->
{{value, ?PEND_REQ(ReplyTo, Req)}, Pending} = queue:out(Pending0),
Requests = Requests0#{pending := Pending, pending_count := N - 1},
State1 = State0#state{requests = Requests},
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
Ctx = new_ctx(?REQUEST_TIMEOUT, Hints),
try
case greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}) of
{ok, Stream} ->
Expand All @@ -227,7 +235,6 @@ do_shoot(State0, Requests0, Pending0, N, Channel) ->
State1
end.


shoot(Stream, ?REQ(Req, _), #state{requests = #{pending_count := 0}} = State, ReplyToList) ->
%% Write the last request and finish stream
case greptimedb_stream:write_request(Stream, Req) of
Expand Down
2 changes: 2 additions & 0 deletions test/greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ t_write(_) ->
[{endpoints, [{http, "localhost", 4001}]},
{pool, greptimedb_client_pool},
{pool_size, 5},
%% enable append mode
{grpc_hints, #{<<"append_mode">> => <<"true">>}},
{pool_type, random},
{auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}],

Expand Down

0 comments on commit 458f4dd

Please sign in to comment.