From 458f4dd9ee9c59ab1cd14213cda803f36540da59 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Tue, 10 Sep 2024 15:39:13 +0800 Subject: [PATCH] feat: supports gRPC insertion hints, close #42 (#43) * feat: supports gRPC insertion hints, close #42 * ci: update greptimedb to v0.9.3 * chore: refactor code * docs: update readme --- .github/workflows/erlang.yml | 2 +- README.md | 13 +++++++++++++ rebar.config | 2 +- src/greptimedb_worker.erl | 25 ++++++++++++++++--------- test/greptimedb_SUITE.erl | 2 ++ 5 files changed, 33 insertions(+), 11 deletions(-) diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index 4411af2..1907a6b 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -21,7 +21,7 @@ jobs: - uses: actions/checkout@v3 - name: Setup greptimedb run: | - GREPTIMEDB_VER=v0.9.0 + GREPTIMEDB_VER=v0.9.3 DOWNLOAD_URL=https://github.com/GreptimeTeam/greptimedb curl -L ${DOWNLOAD_URL}/releases/download/${GREPTIMEDB_VER}/greptime-linux-amd64-${GREPTIMEDB_VER}.tar.gz -o /tmp/greptimedb-${GREPTIMEDB_VER}-linux-amd64.tar.gz mkdir -p /tmp/greptimedb-download diff --git a/README.md b/README.md index f8851bf..1c9934e 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Start the client: [{endpoints, [{http, "localhost", 4001}]}, {pool, greptimedb_client_pool}, {pool_size, 5}, + {grpc_hints, #{}}, {pool_type, random}, {timeunit, ms}]. {ok, Client} = greptimedb:start_client(Options). @@ -129,6 +130,12 @@ Stop the client: greptimedb:stop_client(Client). ``` +Check if the client is alive: + +```erlang + true = greptimedb:is_alive(Client), +``` + Connect GreptimeDB with authentication: ```erlang @@ -205,6 +212,12 @@ A proper list contains: * `endpoints`: List of the GreptimeDB server address in the form of `{http, host, port}` * `pool`, `pool_size` etc.: the client pool settings * `grpc_opts`: grpxbox [client options](https://github.com/tsloughter/grpcbox#defining-channels) +* `grpc_hints`: a map for GreptimeDB gRPC insertion hints, for example`#{ <<"append_mode">> => <<"true">> }` to enable append mode when creating tables automatically. Valid hints include: + * `append_mode`: `true` or `false` to enable append mode, default is `false`, + * `ttl`: time to live, the table `ttl` option, + * `merge_mode`: `last_row` or `last_non_null`, default is `last_row`, + * `auto_create_table`: `true` or `false`, whether to create tables automatically when writing data, default is `false`, + * More about these table options, please read the [doc](https://docs.greptime.com/reference/sql/create/#table-options). * `ssl_opts`: when the endpoint scheme is `https`, the ssl options to use(`[]` by default). * `auth`: authentication options, `{auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}` for example. * `timeunit`: Timestamp unit, supports: diff --git a/rebar.config b/rebar.config index 7a12a72..09f8e9f 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,6 @@ {deps, [ {grpcbox, "0.17.1"}, - {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}} + {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.9"}}} ]}. {grpc, [{protos, "protos"}, {gpb_opts, [{module_name_suffix, "_pb"}]}]}. diff --git a/src/greptimedb_worker.erl b/src/greptimedb_worker.erl index 4a8bac6..d1d52fa 100644 --- a/src/greptimedb_worker.erl +++ b/src/greptimedb_worker.erl @@ -24,7 +24,7 @@ -export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, async_handle/3]). -export([connect/1]). --record(state, {channel, requests}). +-record(state, {channel, requests, hints}). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -33,6 +33,7 @@ -define(CALL_TIMEOUT, 12_000). -define(HEALTH_CHECK_TIMEOUT, 1_000). -define(REQUEST_TIMEOUT, 10_000). +-define(GTDB_HINT_HEADER_PREFIX, <<"x-greptime-hint-">>). -define(CONNECT_TIMEOUT, 5_000). -define(ASYNC_BATCH_SIZE, 100). -define(ASYNC_BATCH_TIMEOUT, 100). @@ -50,6 +51,10 @@ init(Args) -> logger:debug("[GreptimeDB] genserver has started (~w)~n", [self()]), Endpoints = proplists:get_value(endpoints, Args), + Hints0 = proplists:get_value(grpc_hints, Args, #{}), + Hints = maps:fold(fun(Key, Value, Acc) -> + Acc#{<> => Value} + end, #{}, Hints0), SslOptions = proplists:get_value(ssl_opts, Args, []), Options = proplists:get_value(grpc_opts, Args, #{connect_timeout => ?CONNECT_TIMEOUT}), Channels = @@ -57,10 +62,10 @@ init(Args) -> end, Endpoints), Channel = list_to_atom(pid_to_list(self())), {ok, _} = grpcbox_channel_sup:start_child(Channel, Channels, Options), - {ok, #state{channel = Channel, requests = #{ pending => queue:new(), pending_count => 0}}}. + {ok, #state{channel = Channel, hints = Hints, requests = #{ pending => queue:new(), pending_count => 0}}}. -handle_call({handle, Request}, _From, #state{channel = Channel} = State) -> - Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond), +handle_call({handle, Request}, _From, #state{channel = Channel, hints = Hints} = State) -> + Ctx = new_ctx(?REQUEST_TIMEOUT, Hints), Reply = greptime_v_1_greptime_database_client:handle(Ctx, Request, #{channel => Channel}), logger:debug("[GreptimeDB] handle_call reply: ~w~n", [Reply]), case Reply of @@ -71,9 +76,9 @@ handle_call({handle, Request}, _From, #state{channel = Channel} = State) -> Err -> {reply, Err, State} end; -handle_call(health_check, _From, #state{channel = Channel} = State) -> +handle_call(health_check, _From, #state{channel = Channel, hints = Hints} = State) -> Request = #{}, - Ctx = ctx:with_deadline_after(?HEALTH_CHECK_TIMEOUT, millisecond), + Ctx = new_ctx(?HEALTH_CHECK_TIMEOUT, Hints), Reply = greptime_v_1_health_check_client:health_check(Ctx, Request, #{channel => Channel}), case Reply of @@ -128,6 +133,9 @@ ssl_options(https, []) -> ssl_options(_, SslOptions) -> SslOptions. +new_ctx(TimeoutMs, Values) -> + Ctx0 = ctx:with_values(#{md_outgoing_key => Values}), + ctx:with_deadline_after(Ctx0, TimeoutMs, millisecond). now_() -> erlang:system_time(millisecond). @@ -206,11 +214,11 @@ do_shoot(#state{requests = #{pending := Pending0, pending_count := N} = Requests do_shoot(State, _Force) -> State. -do_shoot(State0, Requests0, Pending0, N, Channel) -> +do_shoot(#state{hints = Hints} = State0, Requests0, Pending0, N, Channel) -> {{value, ?PEND_REQ(ReplyTo, Req)}, Pending} = queue:out(Pending0), Requests = Requests0#{pending := Pending, pending_count := N - 1}, State1 = State0#state{requests = Requests}, - Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond), + Ctx = new_ctx(?REQUEST_TIMEOUT, Hints), try case greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}) of {ok, Stream} -> @@ -227,7 +235,6 @@ do_shoot(State0, Requests0, Pending0, N, Channel) -> State1 end. - shoot(Stream, ?REQ(Req, _), #state{requests = #{pending_count := 0}} = State, ReplyToList) -> %% Write the last request and finish stream case greptimedb_stream:write_request(Stream, Req) of diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index cbd0b5c..d22c017 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -200,6 +200,8 @@ t_write(_) -> [{endpoints, [{http, "localhost", 4001}]}, {pool, greptimedb_client_pool}, {pool_size, 5}, + %% enable append mode + {grpc_hints, #{<<"append_mode">> => <<"true">>}}, {pool_type, random}, {auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}],