Skip to content

Commit

Permalink
feat: supports ssl_opts (#39)
Browse files Browse the repository at this point in the history
* test: insert data to greptime cloud

* ci: update greptimedb to v0.7.1

* chore: rename grpc_options to grpc_opts

* feat: adds dbname option
  • Loading branch information
killme2008 authored Mar 22, 2024
1 parent 17b1841 commit 959f841
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 15 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- uses: actions/checkout@v3
- name: Setup greptimedb
run: |
GREPTIMEDB_VER=v0.5.0
GREPTIMEDB_VER=v0.7.1
DOWNLOAD_URL=https://github.com/GreptimeTeam/greptimedb
curl -L ${DOWNLOAD_URL}/releases/download/${GREPTIMEDB_VER}/greptime-linux-amd64-${GREPTIMEDB_VER}.tar.gz -o /tmp/greptimedb-${GREPTIMEDB_VER}-linux-amd64.tar.gz
mkdir -p /tmp/greptimedb-download
Expand All @@ -31,6 +31,11 @@ jobs:
- name: Compile
run: rebar3 compile
- name: Run tests
env:
GT_TEST_HOST: ${{ secrets.GT_TEST_HOST }}
GT_TEST_DB: ${{ secrets.GT_TEST_DB }}
GT_TEST_USER: ${{ secrets.GT_TEST_USER }}
GT_TEST_PASSWD: ${{ secrets.GT_TEST_PASSWD }}
run: |
cat /tmp/greptimedb.log
rebar3 ct -v --cover
Expand All @@ -40,4 +45,8 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}
GT_TEST_HOST: ${{ secrets.GT_TEST_HOST }}
GT_TEST_DB: ${{ secrets.GT_TEST_DB }}
GT_TEST_USER: ${{ secrets.GT_TEST_USER }}
GT_TEST_PASSWD: ${{ secrets.GT_TEST_PASSWD }}
run: rebar3 as test coveralls send
58 changes: 57 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,60 @@ Connect GreptimeDB with authentication:
{ok, Client} = greptimedb:start_client(Options).
```

## Write to GreptimeCloud

[GreptimeCloud](https://greptime.com/product/cloud) is a fully-managed GreptimeDB as a service in the cloud.

After you creating a service, you must have the following info via `connect`:
* `Host` the service host to connect,
* `Port`, gRPC port, default is `5001`,
* `Database`, the database to write,
* `Username`, the service username,
* `Password`, the service password.

Connect to GreptimeCloud with authentication:

```erlang
Host = ...,
Database = ...,
Username = ...,
Password = ...,

Options =
[{endpoints, [{https, Host, 5001}]},
{pool, greptimedb_client_pool},
{pool_size, 5},
{pool_type, random},
{timeunit, ms},
{dbname, Database},
{auth, {basic, #{username => Username, password => Password }}}],

{ok, Client} = greptimedb:start_client(Options),

Metric = <"temperatures">>,
Points =
[#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => greptimedb_values:int64_value(0),
<<"region">> => <<"hangzhou">>},
timestamp => 1619775142098},
#{fields => #{<<"temperature">> => 2},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverB">>,
<<"qos">> => greptimedb_values:int64_value(1),
<<"region">> => <<"ningbo">>,
<<"to">> => <<"kafka">>},
timestamp => 1619775143098}],

greptimedb:write(Client, Metric, Points).

```

We change the endpoint scheme from `http` to `https` and set the `dbname` option.

## APIs guide

### Client options
Expand All @@ -151,13 +205,15 @@ A proper list contains:

* `endpoints`: List of the GreptimeDB server address in the form of `{http, host, port}`
* `pool`, `pool_size` etc.: the client pool settings
* `grpc_options`: grpxbox [client options](https://github.com/tsloughter/grpcbox#defining-channels)
* `grpc_opts`: grpxbox [client options](https://github.com/tsloughter/grpcbox#defining-channels)
* `ssl_opts`: when the endpoint scheme is `https`, the ssl options to use(`[]` by default).
* `auth`: authentication options, `{auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}` for example.
* `timeunit`: Timestamp unit, supports:
* `ns` or `nanosecond`
* `us` or `microsecond`
* `ms` or `millisecond`
* `s` or `second`
* `dbname`: the default database to write, `public` by default. Change it to the servce database name when connecting to GreptimeCloud.

### Write and datatypes
The metric name can be a string or binary. If you want to set the database, the metric name can be set in the form of `{dbname, metric}`. The data will be written into `greptime-public` by default.
Expand Down
2 changes: 1 addition & 1 deletion src/greptimedb_encoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ metric(Options, Metric) ->
metric_with_default(default_metric(Options), Metric).

default_metric(Options) ->
#{dbname => ?DEFAULT_DBNAME,
#{dbname => proplists:get_value(dbname, Options, ?DEFAULT_DBNAME),
timeunit => proplists:get_value(timeunit, Options, ms)}.

%% table is required
Expand Down
20 changes: 18 additions & 2 deletions src/greptimedb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,19 @@
init(Args) ->
logger:debug("[GreptimeDB] genserver has started (~w)~n", [self()]),
Endpoints = proplists:get_value(endpoints, Args),
Options = proplists:get_value(grpc_options, Args, #{connect_timeout => ?CONNECT_TIMEOUT}),
SslOptions = proplists:get_value(ssl_opts, Args, []),
Options = proplists:get_value(grpc_opts, Args, #{connect_timeout => ?CONNECT_TIMEOUT}),
Channels =
lists:map(fun({Schema, Host, Port}) -> {Schema, Host, Port, []} end, Endpoints),
lists:map(fun({Scheme, Host, Port}) -> {Scheme, Host, Port, ssl_options(Scheme, SslOptions)}
end, Endpoints),
Channel = list_to_atom(pid_to_list(self())),
{ok, _} = grpcbox_channel_sup:start_child(Channel, Channels, Options),
{ok, #state{channel = Channel, requests = #{ pending => queue:new(), pending_count => 0}}}.

handle_call({handle, Request}, _From, #state{channel = Channel} = State) ->
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
Reply = greptime_v_1_greptime_database_client:handle(Ctx, Request, #{channel => Channel}),
logger:debug("[GreptimeDB] handle_call reply: ~w~n", [Reply]),
case Reply of
{ok, Resp, _} ->
{reply, {ok, Resp}, State};
Expand Down Expand Up @@ -113,6 +116,19 @@ terminate(Reason, #state{channel = Channel} = State) ->
%%%===================================================================
%%% Helper functions
%%%===================================================================
ssl_options(https, []) ->
%% https://www.erlang.org/doc/man/ssl#type-client_option
[
{verify, verify_peer},
{cacerts, public_key:cacerts_get()},
%% hostname may be wildcard
{customize_hostname_check, [{match_fun,public_key:pkix_verify_hostname_match_fun(https)}]}
];

ssl_options(_, SslOptions) ->
SslOptions.


now_() ->
erlang:system_time(millisecond).

Expand Down
68 changes: 58 additions & 10 deletions test/greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ all() ->
t_bench_perf,
t_write_stream,
t_async_write_batch,
t_insert_greptime_cloud,
t_auth_error].

%%[t_bench_perf].
Expand Down Expand Up @@ -110,23 +111,26 @@ t_insert_requests(_) ->

t_insert_requests_with_timeunit(_) ->
TsNano = 1705946037724448346,
Points = [#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => "0",
<<"device">> => <<"NO.1">>,
<<"region">> => <<"hangzhou">>},
timestamp => TsNano}],
Points =
[#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => "0",
<<"device">> => <<"NO.1">>,
<<"region">> => <<"hangzhou">>},
timestamp => TsNano}],
AuthInfo = {basic, #{username => "test", password => "test"}},
Client = #{cli_opts => [{auth, AuthInfo}, {timeunit, second}]},
Metric = #{table => "Test", timeunit => nanosecond},
Request = greptimedb_encoder:insert_requests(Client, [{Metric, Points}]),
#{header := #{dbname := _DbName, authorization := _Auth},
request := {inserts, #{inserts := [#{columns := Columns}]}}} = Request,
request := {inserts, #{inserts := [#{columns := Columns}]}}} =
Request,
{value, TimestampColumn} =
lists:search(fun(C) -> maps:get(column_name, C) == <<"greptime_timestamp">> end, Columns),
?assertEqual([TsNano], maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))).
?assertEqual([TsNano],
maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))).

t_write_failure(_) ->
Metric = <<"temperatures">>,
Expand Down Expand Up @@ -477,3 +481,47 @@ t_async_write_batch(_) ->

greptimedb:stop_client(Client),
ok.

t_insert_greptime_cloud(_) ->
Host = os:getenv("GT_TEST_HOST"),
DbName = os:getenv("GT_TEST_DB"),
UserName = os:getenv("GT_TEST_USER"),
PassWd = os:getenv("GT_TEST_PASSWD"),

if (Host == false) or (DbName == false) or (UserName == false) or (PassWd == false) ->
ct:print("Ignored t_insert_greptime_cloud..."),
ok;
true ->
ct:print("Running t_insert_greptime_cloud..."),
%% the endpoint scheme must be `https`.
Options =
[{endpoints, [{https, Host, 5001}]},
{pool, greptimedb_client_pool},
{pool_size, 5},
{pool_type, random},
{timeunit, ms},
{dbname, DbName},
{auth, {basic, #{username => UserName, password => PassWd}}}],
{ok, Client} = greptimedb:start_client(Options),
Metric = <<"temperatures">>,
Points =
[#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => greptimedb_values:int64_value(0),
<<"region">> => <<"hangzhou">>},
timestamp => 1619775142098},
#{fields => #{<<"temperature">> => 2},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverB">>,
<<"qos">> => greptimedb_values:int64_value(1),
<<"region">> => <<"ningbo">>,
<<"to">> => <<"kafka">>},
timestamp => 1619775143098}],
{ok, #{response := {affected_rows, #{value := 2}}}} =
greptimedb:write(Client, Metric, Points),
greptimedb:stop_client(Client),
ok
end.

0 comments on commit 959f841

Please sign in to comment.