From 959f8411e5f6390c435f33fc7525d366a887752f Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Sat, 23 Mar 2024 05:12:03 +0800 Subject: [PATCH] feat: supports ssl_opts (#39) * test: insert data to greptime cloud * ci: update greptimedb to v0.7.1 * chore: rename grpc_options to grpc_opts * feat: adds dbname option --- .github/workflows/erlang.yml | 11 +++++- README.md | 58 +++++++++++++++++++++++++++++- src/greptimedb_encoder.erl | 2 +- src/greptimedb_worker.erl | 20 +++++++++-- test/greptimedb_SUITE.erl | 68 ++++++++++++++++++++++++++++++------ 5 files changed, 144 insertions(+), 15 deletions(-) diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index 41e8e97..2a48901 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -21,7 +21,7 @@ jobs: - uses: actions/checkout@v3 - name: Setup greptimedb run: | - GREPTIMEDB_VER=v0.5.0 + GREPTIMEDB_VER=v0.7.1 DOWNLOAD_URL=https://github.com/GreptimeTeam/greptimedb curl -L ${DOWNLOAD_URL}/releases/download/${GREPTIMEDB_VER}/greptime-linux-amd64-${GREPTIMEDB_VER}.tar.gz -o /tmp/greptimedb-${GREPTIMEDB_VER}-linux-amd64.tar.gz mkdir -p /tmp/greptimedb-download @@ -31,6 +31,11 @@ jobs: - name: Compile run: rebar3 compile - name: Run tests + env: + GT_TEST_HOST: ${{ secrets.GT_TEST_HOST }} + GT_TEST_DB: ${{ secrets.GT_TEST_DB }} + GT_TEST_USER: ${{ secrets.GT_TEST_USER }} + GT_TEST_PASSWD: ${{ secrets.GT_TEST_PASSWD }} run: | cat /tmp/greptimedb.log rebar3 ct -v --cover @@ -40,4 +45,8 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} + GT_TEST_HOST: ${{ secrets.GT_TEST_HOST }} + GT_TEST_DB: ${{ secrets.GT_TEST_DB }} + GT_TEST_USER: ${{ secrets.GT_TEST_USER }} + GT_TEST_PASSWD: ${{ secrets.GT_TEST_PASSWD }} run: rebar3 as test coveralls send diff --git a/README.md b/README.md index 4a34666..27c8ecc 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,60 @@ Connect GreptimeDB with authentication: {ok, Client} = greptimedb:start_client(Options). ``` +## Write to GreptimeCloud + +[GreptimeCloud](https://greptime.com/product/cloud) is a fully-managed GreptimeDB as a service in the cloud. + +After you creating a service, you must have the following info via `connect`: +* `Host` the service host to connect, +* `Port`, gRPC port, default is `5001`, +* `Database`, the database to write, +* `Username`, the service username, +* `Password`, the service password. + +Connect to GreptimeCloud with authentication: + +```erlang + Host = ..., + Database = ..., + Username = ..., + Password = ..., + + Options = + [{endpoints, [{https, Host, 5001}]}, + {pool, greptimedb_client_pool}, + {pool_size, 5}, + {pool_type, random}, + {timeunit, ms}, + {dbname, Database}, + {auth, {basic, #{username => Username, password => Password }}}], + + {ok, Client} = greptimedb:start_client(Options), + + Metric = <"temperatures">>, + Points = + [#{fields => #{<<"temperature">> => 1}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverA">>, + <<"qos">> => greptimedb_values:int64_value(0), + <<"region">> => <<"hangzhou">>}, + timestamp => 1619775142098}, + #{fields => #{<<"temperature">> => 2}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverB">>, + <<"qos">> => greptimedb_values:int64_value(1), + <<"region">> => <<"ningbo">>, + <<"to">> => <<"kafka">>}, + timestamp => 1619775143098}], + + greptimedb:write(Client, Metric, Points). + +``` + +We change the endpoint scheme from `http` to `https` and set the `dbname` option. + ## APIs guide ### Client options @@ -151,13 +205,15 @@ A proper list contains: * `endpoints`: List of the GreptimeDB server address in the form of `{http, host, port}` * `pool`, `pool_size` etc.: the client pool settings -* `grpc_options`: grpxbox [client options](https://github.com/tsloughter/grpcbox#defining-channels) +* `grpc_opts`: grpxbox [client options](https://github.com/tsloughter/grpcbox#defining-channels) +* `ssl_opts`: when the endpoint scheme is `https`, the ssl options to use(`[]` by default). * `auth`: authentication options, `{auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}` for example. * `timeunit`: Timestamp unit, supports: * `ns` or `nanosecond` * `us` or `microsecond` * `ms` or `millisecond` * `s` or `second` +* `dbname`: the default database to write, `public` by default. Change it to the servce database name when connecting to GreptimeCloud. ### Write and datatypes The metric name can be a string or binary. If you want to set the database, the metric name can be set in the form of `{dbname, metric}`. The data will be written into `greptime-public` by default. diff --git a/src/greptimedb_encoder.erl b/src/greptimedb_encoder.erl index fa8dd7e..ee91490 100644 --- a/src/greptimedb_encoder.erl +++ b/src/greptimedb_encoder.erl @@ -58,7 +58,7 @@ metric(Options, Metric) -> metric_with_default(default_metric(Options), Metric). default_metric(Options) -> - #{dbname => ?DEFAULT_DBNAME, + #{dbname => proplists:get_value(dbname, Options, ?DEFAULT_DBNAME), timeunit => proplists:get_value(timeunit, Options, ms)}. %% table is required diff --git a/src/greptimedb_worker.erl b/src/greptimedb_worker.erl index e924009..4a8bac6 100644 --- a/src/greptimedb_worker.erl +++ b/src/greptimedb_worker.erl @@ -50,9 +50,11 @@ init(Args) -> logger:debug("[GreptimeDB] genserver has started (~w)~n", [self()]), Endpoints = proplists:get_value(endpoints, Args), - Options = proplists:get_value(grpc_options, Args, #{connect_timeout => ?CONNECT_TIMEOUT}), + SslOptions = proplists:get_value(ssl_opts, Args, []), + Options = proplists:get_value(grpc_opts, Args, #{connect_timeout => ?CONNECT_TIMEOUT}), Channels = - lists:map(fun({Schema, Host, Port}) -> {Schema, Host, Port, []} end, Endpoints), + lists:map(fun({Scheme, Host, Port}) -> {Scheme, Host, Port, ssl_options(Scheme, SslOptions)} + end, Endpoints), Channel = list_to_atom(pid_to_list(self())), {ok, _} = grpcbox_channel_sup:start_child(Channel, Channels, Options), {ok, #state{channel = Channel, requests = #{ pending => queue:new(), pending_count => 0}}}. @@ -60,6 +62,7 @@ init(Args) -> handle_call({handle, Request}, _From, #state{channel = Channel} = State) -> Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond), Reply = greptime_v_1_greptime_database_client:handle(Ctx, Request, #{channel => Channel}), + logger:debug("[GreptimeDB] handle_call reply: ~w~n", [Reply]), case Reply of {ok, Resp, _} -> {reply, {ok, Resp}, State}; @@ -113,6 +116,19 @@ terminate(Reason, #state{channel = Channel} = State) -> %%%=================================================================== %%% Helper functions %%%=================================================================== +ssl_options(https, []) -> + %% https://www.erlang.org/doc/man/ssl#type-client_option + [ + {verify, verify_peer}, + {cacerts, public_key:cacerts_get()}, + %% hostname may be wildcard + {customize_hostname_check, [{match_fun,public_key:pkix_verify_hostname_match_fun(https)}]} + ]; + +ssl_options(_, SslOptions) -> + SslOptions. + + now_() -> erlang:system_time(millisecond). diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index 5e8b597..cbd0b5c 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -15,6 +15,7 @@ all() -> t_bench_perf, t_write_stream, t_async_write_batch, + t_insert_greptime_cloud, t_auth_error]. %%[t_bench_perf]. @@ -110,23 +111,26 @@ t_insert_requests(_) -> t_insert_requests_with_timeunit(_) -> TsNano = 1705946037724448346, - Points = [#{fields => #{<<"temperature">> => 1}, - tags => - #{<<"from">> => <<"mqttx_4b963a8e">>, - <<"host">> => <<"serverA">>, - <<"qos">> => "0", - <<"device">> => <<"NO.1">>, - <<"region">> => <<"hangzhou">>}, - timestamp => TsNano}], + Points = + [#{fields => #{<<"temperature">> => 1}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverA">>, + <<"qos">> => "0", + <<"device">> => <<"NO.1">>, + <<"region">> => <<"hangzhou">>}, + timestamp => TsNano}], AuthInfo = {basic, #{username => "test", password => "test"}}, Client = #{cli_opts => [{auth, AuthInfo}, {timeunit, second}]}, Metric = #{table => "Test", timeunit => nanosecond}, Request = greptimedb_encoder:insert_requests(Client, [{Metric, Points}]), #{header := #{dbname := _DbName, authorization := _Auth}, - request := {inserts, #{inserts := [#{columns := Columns}]}}} = Request, + request := {inserts, #{inserts := [#{columns := Columns}]}}} = + Request, {value, TimestampColumn} = lists:search(fun(C) -> maps:get(column_name, C) == <<"greptime_timestamp">> end, Columns), - ?assertEqual([TsNano], maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))). + ?assertEqual([TsNano], + maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))). t_write_failure(_) -> Metric = <<"temperatures">>, @@ -477,3 +481,47 @@ t_async_write_batch(_) -> greptimedb:stop_client(Client), ok. + +t_insert_greptime_cloud(_) -> + Host = os:getenv("GT_TEST_HOST"), + DbName = os:getenv("GT_TEST_DB"), + UserName = os:getenv("GT_TEST_USER"), + PassWd = os:getenv("GT_TEST_PASSWD"), + + if (Host == false) or (DbName == false) or (UserName == false) or (PassWd == false) -> + ct:print("Ignored t_insert_greptime_cloud..."), + ok; + true -> + ct:print("Running t_insert_greptime_cloud..."), + %% the endpoint scheme must be `https`. + Options = + [{endpoints, [{https, Host, 5001}]}, + {pool, greptimedb_client_pool}, + {pool_size, 5}, + {pool_type, random}, + {timeunit, ms}, + {dbname, DbName}, + {auth, {basic, #{username => UserName, password => PassWd}}}], + {ok, Client} = greptimedb:start_client(Options), + Metric = <<"temperatures">>, + Points = + [#{fields => #{<<"temperature">> => 1}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverA">>, + <<"qos">> => greptimedb_values:int64_value(0), + <<"region">> => <<"hangzhou">>}, + timestamp => 1619775142098}, + #{fields => #{<<"temperature">> => 2}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverB">>, + <<"qos">> => greptimedb_values:int64_value(1), + <<"region">> => <<"ningbo">>, + <<"to">> => <<"kafka">>}, + timestamp => 1619775143098}], + {ok, #{response := {affected_rows, #{value := 2}}}} = + greptimedb:write(Client, Metric, Points), + greptimedb:stop_client(Client), + ok + end.