Skip to content

Commit

Permalink
test: insert data to greptime cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Mar 22, 2024
1 parent 17b1841 commit 6ce95b0
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 11 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ jobs:
- name: Compile
run: rebar3 compile
- name: Run tests
env:
GT_TEST_HOST: ${{ secrets.GT_TEST_HOST }}
GT_TEST_DB: ${{ secrets.GT_TEST_DB }}
GT_TEST_USER: ${{ secrets.GT_TEST_USER }}
GT_TEST_PASSWD: ${{ secrets.GT_TEST_PASSWD }}
run: |
cat /tmp/greptimedb.log
rebar3 ct -v --cover
Expand All @@ -40,4 +45,8 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}
GT_TEST_HOST: ${{ secrets.GT_TEST_HOST }}
GT_TEST_DB: ${{ secrets.GT_TEST_DB }}
GT_TEST_USER: ${{ secrets.GT_TEST_USER }}
GT_TEST_PASSWD: ${{ secrets.GT_TEST_PASSWD }}
run: rebar3 as test coveralls send
18 changes: 17 additions & 1 deletion src/greptimedb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,19 @@
init(Args) ->
logger:debug("[GreptimeDB] genserver has started (~w)~n", [self()]),
Endpoints = proplists:get_value(endpoints, Args),
SslOptions = proplists:get_value(ssl_opts, Args, []),
Options = proplists:get_value(grpc_options, Args, #{connect_timeout => ?CONNECT_TIMEOUT}),
Channels =
lists:map(fun({Schema, Host, Port}) -> {Schema, Host, Port, []} end, Endpoints),
lists:map(fun({Scheme, Host, Port}) -> {Scheme, Host, Port, ssl_options(Scheme, SslOptions)}
end, Endpoints),
Channel = list_to_atom(pid_to_list(self())),
{ok, _} = grpcbox_channel_sup:start_child(Channel, Channels, Options),
{ok, #state{channel = Channel, requests = #{ pending => queue:new(), pending_count => 0}}}.

handle_call({handle, Request}, _From, #state{channel = Channel} = State) ->
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
Reply = greptime_v_1_greptime_database_client:handle(Ctx, Request, #{channel => Channel}),
logger:debug("[GreptimeDB] handle_call reply: ~w~n", [Reply]),
case Reply of
{ok, Resp, _} ->
{reply, {ok, Resp}, State};
Expand Down Expand Up @@ -113,6 +116,19 @@ terminate(Reason, #state{channel = Channel} = State) ->
%%%===================================================================
%%% Helper functions
%%%===================================================================
ssl_options(https, []) ->
%% https://www.erlang.org/doc/man/ssl#type-client_option
[
{verify, verify_peer},
{cacerts, public_key:cacerts_get()},
%% hostname may be wildcard
{customize_hostname_check, [{match_fun,public_key:pkix_verify_hostname_match_fun(https)}]}
];

ssl_options(_, SslOptions) ->
SslOptions.


now_() ->
erlang:system_time(millisecond).

Expand Down
67 changes: 57 additions & 10 deletions test/greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ all() ->
t_bench_perf,
t_write_stream,
t_async_write_batch,
t_insert_greptime_cloud,
t_auth_error].

%%[t_bench_perf].
Expand Down Expand Up @@ -110,23 +111,26 @@ t_insert_requests(_) ->

t_insert_requests_with_timeunit(_) ->
TsNano = 1705946037724448346,
Points = [#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => "0",
<<"device">> => <<"NO.1">>,
<<"region">> => <<"hangzhou">>},
timestamp => TsNano}],
Points =
[#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => "0",
<<"device">> => <<"NO.1">>,
<<"region">> => <<"hangzhou">>},
timestamp => TsNano}],
AuthInfo = {basic, #{username => "test", password => "test"}},
Client = #{cli_opts => [{auth, AuthInfo}, {timeunit, second}]},
Metric = #{table => "Test", timeunit => nanosecond},
Request = greptimedb_encoder:insert_requests(Client, [{Metric, Points}]),
#{header := #{dbname := _DbName, authorization := _Auth},
request := {inserts, #{inserts := [#{columns := Columns}]}}} = Request,
request := {inserts, #{inserts := [#{columns := Columns}]}}} =
Request,
{value, TimestampColumn} =
lists:search(fun(C) -> maps:get(column_name, C) == <<"greptime_timestamp">> end, Columns),
?assertEqual([TsNano], maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))).
?assertEqual([TsNano],
maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))).

t_write_failure(_) ->
Metric = <<"temperatures">>,
Expand Down Expand Up @@ -477,3 +481,46 @@ t_async_write_batch(_) ->

greptimedb:stop_client(Client),
ok.

t_insert_greptime_cloud(_) ->
Host = os:getenv("GT_TEST_HOST"),
DbName = os:getenv("GT_TEST_DB"),
UserName = os:getenv("GT_TEST_USER"),
PassWd = os:getenv("GT_TEST_PASSWD"),

if (Host == false) or (DbName == false) or (UserName == false) or (PassWd == false) ->
ct:print("Ignored t_insert_greptime_cloud..."),
ok;
true ->
ct:print("Running t_insert_greptime_cloud..."),
%% the endpoint scheme must be `https`.
Options =
[{endpoints, [{https, Host, 5001}]},
{pool, greptimedb_client_pool},
{pool_size, 5},
{pool_type, random},
{timeunit, ms},
{auth, {basic, #{username => UserName, password => PassWd}}}],
{ok, Client} = greptimedb:start_client(Options),
Metric = {DbName, <<"temperatures">>},
Points =
[#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => greptimedb_values:int64_value(0),
<<"region">> => <<"hangzhou">>},
timestamp => 1619775142098},
#{fields => #{<<"temperature">> => 2},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverB">>,
<<"qos">> => greptimedb_values:int64_value(1),
<<"region">> => <<"ningbo">>,
<<"to">> => <<"kafka">>},
timestamp => 1619775143098}],
{ok, #{response := {affected_rows, #{value := 2}}}} =
greptimedb:write(Client, Metric, Points),
greptimedb:stop_client(Client),
ok
end.

0 comments on commit 6ce95b0

Please sign in to comment.