diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index 41e8e97..4272677 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -31,6 +31,11 @@ jobs: - name: Compile run: rebar3 compile - name: Run tests + env: + GT_TEST_HOST: ${{ secrets.GT_TEST_HOST }} + GT_TEST_DB: ${{ secrets.GT_TEST_DB }} + GT_TEST_USER: ${{ secrets.GT_TEST_USER }} + GT_TEST_PASSWD: ${{ secrets.GT_TEST_PASSWD }} run: | cat /tmp/greptimedb.log rebar3 ct -v --cover @@ -40,4 +45,8 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} + GT_TEST_HOST: ${{ secrets.GT_TEST_HOST }} + GT_TEST_DB: ${{ secrets.GT_TEST_DB }} + GT_TEST_USER: ${{ secrets.GT_TEST_USER }} + GT_TEST_PASSWD: ${{ secrets.GT_TEST_PASSWD }} run: rebar3 as test coveralls send diff --git a/src/greptimedb_worker.erl b/src/greptimedb_worker.erl index e924009..2390dac 100644 --- a/src/greptimedb_worker.erl +++ b/src/greptimedb_worker.erl @@ -50,9 +50,11 @@ init(Args) -> logger:debug("[GreptimeDB] genserver has started (~w)~n", [self()]), Endpoints = proplists:get_value(endpoints, Args), + SslOptions = proplists:get_value(ssl_opts, Args, []), Options = proplists:get_value(grpc_options, Args, #{connect_timeout => ?CONNECT_TIMEOUT}), Channels = - lists:map(fun({Schema, Host, Port}) -> {Schema, Host, Port, []} end, Endpoints), + lists:map(fun({Scheme, Host, Port}) -> {Scheme, Host, Port, ssl_options(Scheme, SslOptions)} + end, Endpoints), Channel = list_to_atom(pid_to_list(self())), {ok, _} = grpcbox_channel_sup:start_child(Channel, Channels, Options), {ok, #state{channel = Channel, requests = #{ pending => queue:new(), pending_count => 0}}}. @@ -60,6 +62,7 @@ init(Args) -> handle_call({handle, Request}, _From, #state{channel = Channel} = State) -> Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond), Reply = greptime_v_1_greptime_database_client:handle(Ctx, Request, #{channel => Channel}), + logger:debug("[GreptimeDB] handle_call reply: ~w~n", [Reply]), case Reply of {ok, Resp, _} -> {reply, {ok, Resp}, State}; @@ -113,6 +116,19 @@ terminate(Reason, #state{channel = Channel} = State) -> %%%=================================================================== %%% Helper functions %%%=================================================================== +ssl_options(https, []) -> + %% https://www.erlang.org/doc/man/ssl#type-client_option + [ + {verify, verify_peer}, + {cacerts, public_key:cacerts_get()}, + %% hostname may be wildcard + {customize_hostname_check, [{match_fun,public_key:pkix_verify_hostname_match_fun(https)}]} + ]; + +ssl_options(_, SslOptions) -> + SslOptions. + + now_() -> erlang:system_time(millisecond). diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index 5e8b597..b75422a 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -15,6 +15,7 @@ all() -> t_bench_perf, t_write_stream, t_async_write_batch, + t_insert_greptime_cloud, t_auth_error]. %%[t_bench_perf]. @@ -110,23 +111,26 @@ t_insert_requests(_) -> t_insert_requests_with_timeunit(_) -> TsNano = 1705946037724448346, - Points = [#{fields => #{<<"temperature">> => 1}, - tags => - #{<<"from">> => <<"mqttx_4b963a8e">>, - <<"host">> => <<"serverA">>, - <<"qos">> => "0", - <<"device">> => <<"NO.1">>, - <<"region">> => <<"hangzhou">>}, - timestamp => TsNano}], + Points = + [#{fields => #{<<"temperature">> => 1}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverA">>, + <<"qos">> => "0", + <<"device">> => <<"NO.1">>, + <<"region">> => <<"hangzhou">>}, + timestamp => TsNano}], AuthInfo = {basic, #{username => "test", password => "test"}}, Client = #{cli_opts => [{auth, AuthInfo}, {timeunit, second}]}, Metric = #{table => "Test", timeunit => nanosecond}, Request = greptimedb_encoder:insert_requests(Client, [{Metric, Points}]), #{header := #{dbname := _DbName, authorization := _Auth}, - request := {inserts, #{inserts := [#{columns := Columns}]}}} = Request, + request := {inserts, #{inserts := [#{columns := Columns}]}}} = + Request, {value, TimestampColumn} = lists:search(fun(C) -> maps:get(column_name, C) == <<"greptime_timestamp">> end, Columns), - ?assertEqual([TsNano], maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))). + ?assertEqual([TsNano], + maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))). t_write_failure(_) -> Metric = <<"temperatures">>, @@ -477,3 +481,46 @@ t_async_write_batch(_) -> greptimedb:stop_client(Client), ok. + +t_insert_greptime_cloud(_) -> + Host = os:getenv("GT_TEST_HOST"), + DbName = os:getenv("GT_TEST_DB"), + UserName = os:getenv("GT_TEST_USER"), + PassWd = os:getenv("GT_TEST_PASSWD"), + + if (Host == false) or (DbName == false) or (UserName == false) or (PassWd == false) -> + ct:print("Ignored t_insert_greptime_cloud..."), + ok; + true -> + ct:print("Running t_insert_greptime_cloud..."), + %% the endpoint scheme must be `https`. + Options = + [{endpoints, [{https, Host, 5001}]}, + {pool, greptimedb_client_pool}, + {pool_size, 5}, + {pool_type, random}, + {timeunit, ms}, + {auth, {basic, #{username => UserName, password => PassWd}}}], + {ok, Client} = greptimedb:start_client(Options), + Metric = {DbName, <<"temperatures">>}, + Points = + [#{fields => #{<<"temperature">> => 1}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverA">>, + <<"qos">> => greptimedb_values:int64_value(0), + <<"region">> => <<"hangzhou">>}, + timestamp => 1619775142098}, + #{fields => #{<<"temperature">> => 2}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverB">>, + <<"qos">> => greptimedb_values:int64_value(1), + <<"region">> => <<"ningbo">>, + <<"to">> => <<"kafka">>}, + timestamp => 1619775143098}], + {ok, #{response := {affected_rows, #{value := 2}}}} = + greptimedb:write(Client, Metric, Points), + greptimedb:stop_client(Client), + ok + end.