Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supports ssl_opts #39

Merged
merged 4 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- uses: actions/checkout@v3
- name: Setup greptimedb
run: |
GREPTIMEDB_VER=v0.5.0
GREPTIMEDB_VER=v0.7.1
DOWNLOAD_URL=https://github.com/GreptimeTeam/greptimedb
curl -L ${DOWNLOAD_URL}/releases/download/${GREPTIMEDB_VER}/greptime-linux-amd64-${GREPTIMEDB_VER}.tar.gz -o /tmp/greptimedb-${GREPTIMEDB_VER}-linux-amd64.tar.gz
mkdir -p /tmp/greptimedb-download
Expand All @@ -31,6 +31,11 @@ jobs:
- name: Compile
run: rebar3 compile
- name: Run tests
env:
GT_TEST_HOST: ${{ secrets.GT_TEST_HOST }}
GT_TEST_DB: ${{ secrets.GT_TEST_DB }}
GT_TEST_USER: ${{ secrets.GT_TEST_USER }}
GT_TEST_PASSWD: ${{ secrets.GT_TEST_PASSWD }}
run: |
cat /tmp/greptimedb.log
rebar3 ct -v --cover
Expand All @@ -40,4 +45,8 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}
GT_TEST_HOST: ${{ secrets.GT_TEST_HOST }}
GT_TEST_DB: ${{ secrets.GT_TEST_DB }}
GT_TEST_USER: ${{ secrets.GT_TEST_USER }}
GT_TEST_PASSWD: ${{ secrets.GT_TEST_PASSWD }}
run: rebar3 as test coveralls send
58 changes: 57 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,60 @@ Connect GreptimeDB with authentication:
{ok, Client} = greptimedb:start_client(Options).
```

## Write to GreptimeCloud

[GreptimeCloud](https://greptime.com/product/cloud) is a fully-managed GreptimeDB as a service in the cloud.

After you creating a service, you must have the following info via `connect`:
* `Host` the service host to connect,
* `Port`, gRPC port, default is `5001`,
* `Database`, the database to write,
* `Username`, the service username,
* `Password`, the service password.

Connect to GreptimeCloud with authentication:

```erlang
Host = ...,
Database = ...,
Username = ...,
Password = ...,

Options =
[{endpoints, [{https, Host, 5001}]},
{pool, greptimedb_client_pool},
{pool_size, 5},
{pool_type, random},
{timeunit, ms},
{dbname, Database},
{auth, {basic, #{username => Username, password => Password }}}],

{ok, Client} = greptimedb:start_client(Options),

Metric = <"temperatures">>,
Points =
[#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => greptimedb_values:int64_value(0),
<<"region">> => <<"hangzhou">>},
timestamp => 1619775142098},
#{fields => #{<<"temperature">> => 2},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverB">>,
<<"qos">> => greptimedb_values:int64_value(1),
<<"region">> => <<"ningbo">>,
<<"to">> => <<"kafka">>},
timestamp => 1619775143098}],

greptimedb:write(Client, Metric, Points).

```

We change the endpoint scheme from `http` to `https` and set the `dbname` option.

## APIs guide

### Client options
Expand All @@ -151,13 +205,15 @@ A proper list contains:

* `endpoints`: List of the GreptimeDB server address in the form of `{http, host, port}`
* `pool`, `pool_size` etc.: the client pool settings
* `grpc_options`: grpxbox [client options](https://github.com/tsloughter/grpcbox#defining-channels)
* `grpc_opts`: grpxbox [client options](https://github.com/tsloughter/grpcbox#defining-channels)
* `ssl_opts`: when the endpoint scheme is `https`, the ssl options to use(`[]` by default).
* `auth`: authentication options, `{auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}` for example.
* `timeunit`: Timestamp unit, supports:
* `ns` or `nanosecond`
* `us` or `microsecond`
* `ms` or `millisecond`
* `s` or `second`
* `dbname`: the default database to write, `public` by default. Change it to the servce database name when connecting to GreptimeCloud.

### Write and datatypes
The metric name can be a string or binary. If you want to set the database, the metric name can be set in the form of `{dbname, metric}`. The data will be written into `greptime-public` by default.
Expand Down
2 changes: 1 addition & 1 deletion src/greptimedb_encoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ metric(Options, Metric) ->
metric_with_default(default_metric(Options), Metric).

default_metric(Options) ->
#{dbname => ?DEFAULT_DBNAME,
#{dbname => proplists:get_value(dbname, Options, ?DEFAULT_DBNAME),
timeunit => proplists:get_value(timeunit, Options, ms)}.

%% table is required
Expand Down
20 changes: 18 additions & 2 deletions src/greptimedb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,19 @@
init(Args) ->
logger:debug("[GreptimeDB] genserver has started (~w)~n", [self()]),
Endpoints = proplists:get_value(endpoints, Args),
Options = proplists:get_value(grpc_options, Args, #{connect_timeout => ?CONNECT_TIMEOUT}),
SslOptions = proplists:get_value(ssl_opts, Args, []),
Options = proplists:get_value(grpc_opts, Args, #{connect_timeout => ?CONNECT_TIMEOUT}),
Channels =
lists:map(fun({Schema, Host, Port}) -> {Schema, Host, Port, []} end, Endpoints),
lists:map(fun({Scheme, Host, Port}) -> {Scheme, Host, Port, ssl_options(Scheme, SslOptions)}
end, Endpoints),
Channel = list_to_atom(pid_to_list(self())),
{ok, _} = grpcbox_channel_sup:start_child(Channel, Channels, Options),
{ok, #state{channel = Channel, requests = #{ pending => queue:new(), pending_count => 0}}}.

handle_call({handle, Request}, _From, #state{channel = Channel} = State) ->
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
Reply = greptime_v_1_greptime_database_client:handle(Ctx, Request, #{channel => Channel}),
logger:debug("[GreptimeDB] handle_call reply: ~w~n", [Reply]),
case Reply of
{ok, Resp, _} ->
{reply, {ok, Resp}, State};
Expand Down Expand Up @@ -113,6 +116,19 @@ terminate(Reason, #state{channel = Channel} = State) ->
%%%===================================================================
%%% Helper functions
%%%===================================================================
ssl_options(https, []) ->
%% https://www.erlang.org/doc/man/ssl#type-client_option
[
{verify, verify_peer},
{cacerts, public_key:cacerts_get()},
%% hostname may be wildcard
{customize_hostname_check, [{match_fun,public_key:pkix_verify_hostname_match_fun(https)}]}
];

ssl_options(_, SslOptions) ->
SslOptions.


now_() ->
erlang:system_time(millisecond).

Expand Down
68 changes: 58 additions & 10 deletions test/greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ all() ->
t_bench_perf,
t_write_stream,
t_async_write_batch,
t_insert_greptime_cloud,
t_auth_error].

%%[t_bench_perf].
Expand Down Expand Up @@ -110,23 +111,26 @@ t_insert_requests(_) ->

t_insert_requests_with_timeunit(_) ->
TsNano = 1705946037724448346,
Points = [#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => "0",
<<"device">> => <<"NO.1">>,
<<"region">> => <<"hangzhou">>},
timestamp => TsNano}],
Points =
[#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => "0",
<<"device">> => <<"NO.1">>,
<<"region">> => <<"hangzhou">>},
timestamp => TsNano}],
AuthInfo = {basic, #{username => "test", password => "test"}},
Client = #{cli_opts => [{auth, AuthInfo}, {timeunit, second}]},
Metric = #{table => "Test", timeunit => nanosecond},
Request = greptimedb_encoder:insert_requests(Client, [{Metric, Points}]),
#{header := #{dbname := _DbName, authorization := _Auth},
request := {inserts, #{inserts := [#{columns := Columns}]}}} = Request,
request := {inserts, #{inserts := [#{columns := Columns}]}}} =
Request,
{value, TimestampColumn} =
lists:search(fun(C) -> maps:get(column_name, C) == <<"greptime_timestamp">> end, Columns),
?assertEqual([TsNano], maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))).
?assertEqual([TsNano],
maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))).

t_write_failure(_) ->
Metric = <<"temperatures">>,
Expand Down Expand Up @@ -477,3 +481,47 @@ t_async_write_batch(_) ->

greptimedb:stop_client(Client),
ok.

t_insert_greptime_cloud(_) ->
Host = os:getenv("GT_TEST_HOST"),
DbName = os:getenv("GT_TEST_DB"),
UserName = os:getenv("GT_TEST_USER"),
PassWd = os:getenv("GT_TEST_PASSWD"),

if (Host == false) or (DbName == false) or (UserName == false) or (PassWd == false) ->
ct:print("Ignored t_insert_greptime_cloud..."),
ok;
true ->
ct:print("Running t_insert_greptime_cloud..."),
%% the endpoint scheme must be `https`.
Options =
[{endpoints, [{https, Host, 5001}]},
{pool, greptimedb_client_pool},
{pool_size, 5},
{pool_type, random},
{timeunit, ms},
{dbname, DbName},
{auth, {basic, #{username => UserName, password => PassWd}}}],
{ok, Client} = greptimedb:start_client(Options),
Metric = <<"temperatures">>,
Points =
[#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => greptimedb_values:int64_value(0),
<<"region">> => <<"hangzhou">>},
timestamp => 1619775142098},
#{fields => #{<<"temperature">> => 2},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverB">>,
<<"qos">> => greptimedb_values:int64_value(1),
<<"region">> => <<"ningbo">>,
<<"to">> => <<"kafka">>},
timestamp => 1619775143098}],
{ok, #{response := {affected_rows, #{value := 2}}}} =
greptimedb:write(Client, Metric, Points),
greptimedb:stop_client(Client),
ok
end.
Loading