Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supports gRPC insertion hints, close #42 #43

Merged
merged 4 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- uses: actions/checkout@v3
- name: Setup greptimedb
run: |
GREPTIMEDB_VER=v0.9.0
GREPTIMEDB_VER=v0.9.3
DOWNLOAD_URL=https://github.com/GreptimeTeam/greptimedb
curl -L ${DOWNLOAD_URL}/releases/download/${GREPTIMEDB_VER}/greptime-linux-amd64-${GREPTIMEDB_VER}.tar.gz -o /tmp/greptimedb-${GREPTIMEDB_VER}-linux-amd64.tar.gz
mkdir -p /tmp/greptimedb-download
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Start the client:
[{endpoints, [{http, "localhost", 4001}]},
{pool, greptimedb_client_pool},
{pool_size, 5},
{grpc_hints, #{}},
{pool_type, random},
{timeunit, ms}].
{ok, Client} = greptimedb:start_client(Options).
Expand Down Expand Up @@ -129,6 +130,12 @@ Stop the client:
greptimedb:stop_client(Client).
```

Check if the client is alive:

```erlang
true = greptimedb:is_alive(Client),
```

Connect GreptimeDB with authentication:

```erlang
Expand Down Expand Up @@ -205,6 +212,12 @@ A proper list contains:
* `endpoints`: List of the GreptimeDB server address in the form of `{http, host, port}`
* `pool`, `pool_size` etc.: the client pool settings
* `grpc_opts`: grpxbox [client options](https://github.com/tsloughter/grpcbox#defining-channels)
* `grpc_hints`: a map for GreptimeDB gRPC insertion hints, for example`#{ <<"append_mode">> => <<"true">> }` to enable append mode when creating tables automatically. Valid hints include:
* `append_mode`: `true` or `false` to enable append mode, default is `false`,
* `ttl`: time to live, the table `ttl` option,
* `merge_mode`: `last_row` or `last_non_null`, default is `last_row`,
* `auto_create_table`: `true` or `false`, whether to create tables automatically when writing data, default is `false`,
* More about these table options, please read the [doc](https://docs.greptime.com/reference/sql/create/#table-options).
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
* `ssl_opts`: when the endpoint scheme is `https`, the ssl options to use(`[]` by default).
* `auth`: authentication options, `{auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}` for example.
* `timeunit`: Timestamp unit, supports:
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{deps, [
{grpcbox, "0.17.1"},
{ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}}
{ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.9"}}}
]}.

{grpc, [{protos, "protos"}, {gpb_opts, [{module_name_suffix, "_pb"}]}]}.
Expand Down
25 changes: 16 additions & 9 deletions src/greptimedb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
-export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, async_handle/3]).
-export([connect/1]).

-record(state, {channel, requests}).
-record(state, {channel, requests, hints}).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
Expand All @@ -33,6 +33,7 @@
-define(CALL_TIMEOUT, 12_000).
-define(HEALTH_CHECK_TIMEOUT, 1_000).
-define(REQUEST_TIMEOUT, 10_000).
-define(GTDB_HINT_HEADER_PREFIX, <<"x-greptime-hint-">>).
-define(CONNECT_TIMEOUT, 5_000).
-define(ASYNC_BATCH_SIZE, 100).
-define(ASYNC_BATCH_TIMEOUT, 100).
Expand All @@ -50,17 +51,21 @@
init(Args) ->
logger:debug("[GreptimeDB] genserver has started (~w)~n", [self()]),
Endpoints = proplists:get_value(endpoints, Args),
Hints0 = proplists:get_value(grpc_hints, Args, #{}),
Hints = maps:fold(fun(Key, Value, Acc) ->
Acc#{<<?GTDB_HINT_HEADER_PREFIX/binary, Key/binary>> => Value}
end, #{}, Hints0),
SslOptions = proplists:get_value(ssl_opts, Args, []),
Options = proplists:get_value(grpc_opts, Args, #{connect_timeout => ?CONNECT_TIMEOUT}),
Channels =
lists:map(fun({Scheme, Host, Port}) -> {Scheme, Host, Port, ssl_options(Scheme, SslOptions)}
end, Endpoints),
Channel = list_to_atom(pid_to_list(self())),
{ok, _} = grpcbox_channel_sup:start_child(Channel, Channels, Options),
{ok, #state{channel = Channel, requests = #{ pending => queue:new(), pending_count => 0}}}.
{ok, #state{channel = Channel, hints = Hints, requests = #{ pending => queue:new(), pending_count => 0}}}.

handle_call({handle, Request}, _From, #state{channel = Channel} = State) ->
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
handle_call({handle, Request}, _From, #state{channel = Channel, hints = Hints} = State) ->
Ctx = new_ctx(?REQUEST_TIMEOUT, Hints),
Reply = greptime_v_1_greptime_database_client:handle(Ctx, Request, #{channel => Channel}),
logger:debug("[GreptimeDB] handle_call reply: ~w~n", [Reply]),
case Reply of
Expand All @@ -71,9 +76,9 @@ handle_call({handle, Request}, _From, #state{channel = Channel} = State) ->
Err ->
{reply, Err, State}
end;
handle_call(health_check, _From, #state{channel = Channel} = State) ->
handle_call(health_check, _From, #state{channel = Channel, hints = Hints} = State) ->
Request = #{},
Ctx = ctx:with_deadline_after(?HEALTH_CHECK_TIMEOUT, millisecond),
Ctx = new_ctx(?HEALTH_CHECK_TIMEOUT, Hints),
Reply =
greptime_v_1_health_check_client:health_check(Ctx, Request, #{channel => Channel}),
case Reply of
Expand Down Expand Up @@ -128,6 +133,9 @@ ssl_options(https, []) ->
ssl_options(_, SslOptions) ->
SslOptions.

new_ctx(TimeoutMs, Values) ->
Ctx0 = ctx:with_values(#{md_outgoing_key => Values}),
ctx:with_deadline_after(Ctx0, TimeoutMs, millisecond).

now_() ->
erlang:system_time(millisecond).
Expand Down Expand Up @@ -206,11 +214,11 @@ do_shoot(#state{requests = #{pending := Pending0, pending_count := N} = Requests
do_shoot(State, _Force) ->
State.

do_shoot(State0, Requests0, Pending0, N, Channel) ->
do_shoot(#state{hints = Hints} = State0, Requests0, Pending0, N, Channel) ->
{{value, ?PEND_REQ(ReplyTo, Req)}, Pending} = queue:out(Pending0),
Requests = Requests0#{pending := Pending, pending_count := N - 1},
State1 = State0#state{requests = Requests},
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
Ctx = new_ctx(?REQUEST_TIMEOUT, Hints),
try
case greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}) of
{ok, Stream} ->
Expand All @@ -227,7 +235,6 @@ do_shoot(State0, Requests0, Pending0, N, Channel) ->
State1
end.


shoot(Stream, ?REQ(Req, _), #state{requests = #{pending_count := 0}} = State, ReplyToList) ->
%% Write the last request and finish stream
case greptimedb_stream:write_request(Stream, Req) of
Expand Down
2 changes: 2 additions & 0 deletions test/greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ t_write(_) ->
[{endpoints, [{http, "localhost", 4001}]},
{pool, greptimedb_client_pool},
{pool_size, 5},
%% enable append mode
{grpc_hints, #{<<"append_mode">> => <<"true">>}},
{pool_type, random},
{auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}],

Expand Down
Loading