From d7b855955e782388cb6236a728bc643bf82534aa Mon Sep 17 00:00:00 2001 From: yuanbohan Date: Fri, 12 Jan 2024 16:55:18 +0800 Subject: [PATCH] delete query logic --- .github/workflows/lint.yml | 43 ++ LICENSE | 2 +- README.md | 23 +- client.go | 46 +- client_test.go | 743 +------------------------ config.go => config/config.go | 16 +- doc.go | 54 -- doc_test.go | 153 ----- docs/gorm/mysql.md | 85 --- docs/gorm/postgres.md | 82 --- docs/insert.md | 116 ---- docs/promql.md | 117 ---- docs/streaming.md | 134 ----- errors.go => error/error.go | 4 +- go.mod | 2 +- header.go | 18 +- header_test.go | 13 +- insert.go | 23 +- metric.go => model/metric.go | 156 +----- metric_test.go => model/metric_test.go | 12 +- series.go => model/series.go | 14 +- series_test.go => model/series_test.go | 4 +- prom/api.go | 113 ---- prom/api_test.go | 47 -- query.go | 93 ---- query_promql.go | 184 ------ query_promql_test.go | 132 ----- query_sql.go | 50 -- request_test.go | 137 ----- stream_client.go | 10 +- stream_client_test.go | 103 +--- mask.go => util/mask.go | 10 +- mask_test.go => util/mask_test.go | 10 +- util.go => util/util.go | 30 +- util/util_test.go | 315 +++++++++++ util_test.go | 314 ----------- 36 files changed, 506 insertions(+), 2902 deletions(-) create mode 100644 .github/workflows/lint.yml rename config.go => config/config.go (89%) delete mode 100644 doc.go delete mode 100644 doc_test.go delete mode 100644 docs/gorm/mysql.md delete mode 100644 docs/gorm/postgres.md delete mode 100644 docs/insert.md delete mode 100644 docs/promql.md delete mode 100644 docs/streaming.md rename errors.go => error/error.go (97%) rename metric.go => model/metric.go (66%) rename metric_test.go => model/metric_test.go (97%) rename series.go => model/series.go (97%) rename series_test.go => model/series_test.go (99%) delete mode 100644 prom/api.go delete mode 100644 prom/api_test.go delete mode 100644 query.go delete mode 100644 query_promql.go delete mode 100644 query_promql_test.go delete mode 100644 query_sql.go delete mode 100644 request_test.go rename mask.go => util/mask.go (87%) rename mask_test.go => util/mask_test.go (87%) rename util.go => util/util.go (87%) create mode 100644 util/util_test.go delete mode 100644 util_test.go diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..3bd7c0f --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,43 @@ +name: golangci-lint +on: [push] + +permissions: + contents: read + # Optional: allow read access to pull request. Use with `only-new-issues` option. + # pull-requests: read +jobs: + golangci: + if: github.event.pull_request.draft == false + name: lint + runs-on: ubuntu-latest + steps: + - uses: actions/setup-go@v3 + with: + go-version: '1.20.x' + + - uses: actions/checkout@v3 + + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version + version: latest + + # Optional: working directory, useful for monorepos + # working-directory: somedir + + # Optional: golangci-lint command line arguments. + args: --timeout 10m -v + + # Optional: show only new issues if it's a pull request. The default value is `false`. + # only-new-issues: true + + # Optional: if set to true then the all caching functionality will be complete disabled, + # takes precedence over all other caching options. + # skip-cache: true + + # Optional: if set to true then the action don't cache or restore ~/go/pkg. + # skip-pkg-cache: true + + # Optional: if set to true then the action don't cache or restore ~/.cache/go-build. + # skip-build-cache: true diff --git a/LICENSE b/LICENSE index 819e2e3..a0ddf18 100644 --- a/LICENSE +++ b/LICENSE @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2023 Greptime Team + Copyright 2024 Greptime Team Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/README.md b/README.md index 326c780..893788f 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,16 @@ -[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/GreptimeTeam/greptimedb-client-go/blob/main/LICENSE) -[![Build Status](https://github.com/greptimeteam/greptimedb-client-go/actions/workflows/ci.yml/badge.svg)](https://github.com/GreptimeTeam/greptimedb-client-go/blob/main/.github/workflows/ci.yml) -[![codecov](https://codecov.io/gh/GreptimeTeam/greptimedb-client-go/branch/main/graph/badge.svg?token=76KIKITADQ)](https://codecov.io/gh/GreptimeTeam/greptimedb-client-go) -[![Go Reference](https://pkg.go.dev/badge/github.com/GreptimeTeam/greptimedb-client-go.svg)](https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-client-go) -# GreptimeDB Go Client +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/GreptimeTeam/greptimedb-ingester-go/blob/main/LICENSE) +[![Build Status](https://github.com/greptimeteam/greptimedb-ingester-go/actions/workflows/ci.yml/badge.svg)](https://github.com/GreptimeTeam/greptimedb-ingester-go/blob/main/.github/workflows/ci.yml) +[![codecov](https://codecov.io/gh/GreptimeTeam/greptimedb-ingester-go/branch/main/graph/badge.svg?token=76KIKITADQ)](https://codecov.io/gh/GreptimeTeam/greptimedb-ingester-go) +[![Go Reference](https://pkg.go.dev/badge/github.com/GreptimeTeam/greptimedb-ingester-go.svg)](https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-ingester-go) + +# GreptimeDB Go Ingester Provide API for using GreptimeDB client in Go. ## Installation ```sh -go get -u github.com/GreptimeTeam/greptimedb-client-go +go get -u github.com/GreptimeTeam/greptimedb-ingester-go ``` ## Documentation @@ -18,7 +19,7 @@ visit [docs](./docs) to get complete examples. You can also visit [Documentation ## API reference -#### Datatype Supported +### Datatype Supported - int8, int16, int32, int64, int - uint8, uint16, uint32, uint64, uint @@ -28,7 +29,7 @@ visit [docs](./docs) to get complete examples. You can also visit [Documentation - string - time.Time -#### Customize metric Timestamp +### Customize metric Timestamp you can customize timestamp index via calling methods of [Metric][metric_doc] @@ -37,9 +38,9 @@ you can customize timestamp index via calling methods of [Metric][metric_doc] ## License -This greptimedb-client-go uses the __Apache 2.0 license__ to strike a balance +This greptimedb-ingester-go uses the __Apache 2.0 license__ to strike a balance between open contributions and allowing you to use the software however you want. -[document]: https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-client-go -[metric_doc]: https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-client-go#Metric +[document]: https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-ingester-go +[metric_doc]: https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-ingester-go#Metric diff --git a/client.go b/client.go index d47990a..747b383 100644 --- a/client.go +++ b/client.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,13 +20,14 @@ import ( greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" "github.com/apache/arrow/go/v13/arrow/flight" "google.golang.org/grpc" - "google.golang.org/protobuf/proto" + + "github.com/GreptimeTeam/greptimedb-ingester-go/config" ) // Client helps to Insert/Query data Into/From GreptimeDB. A Client is safe for concurrent // use by multiple goroutines,you can have one Client instance in your application. type Client struct { - cfg *Config + cfg *config.Config // For `query`, since unary calls have not been implemented for query and only do_get helps flightClient flight.Client @@ -39,13 +40,13 @@ type Client struct { } // NewClient helps to create the greptimedb client, which will be responsible Write/Read data To/From GreptimeDB -func NewClient(cfg *Config) (*Client, error) { - flightClient, err := flight.NewClientWithMiddleware(cfg.getGRPCAddr(), nil, nil, cfg.DialOptions...) +func NewClient(cfg *config.Config) (*Client, error) { + flightClient, err := flight.NewClientWithMiddleware(cfg.GetGRPCAddr(), nil, nil, cfg.DialOptions...) if err != nil { return nil, err } - conn, err := grpc.Dial(cfg.getGRPCAddr(), cfg.DialOptions...) + conn, err := grpc.Dial(cfg.GetGRPCAddr(), cfg.DialOptions...) if err != nil { return nil, err } @@ -69,36 +70,3 @@ func (c *Client) Insert(ctx context.Context, req InsertsRequest) (*greptimepb.Gr } return c.greptimeClient.Handle(ctx, request, c.cfg.CallOptions...) } - -// Query helps to retrieve data from greptimedb -func (c *Client) Query(ctx context.Context, req QueryRequest) (*Metric, error) { - request, err := req.buildGreptimeRequest(c.cfg) - if err != nil { - return nil, err - } - - b, err := proto.Marshal(request) - if err != nil { - return nil, err - } - sr, err := c.flightClient.DoGet(ctx, &flight.Ticket{Ticket: b}, c.cfg.CallOptions...) - if err != nil { - return nil, err - } - - reader, err := flight.NewRecordReader(sr) - if err != nil { - return nil, err - } - - return buildMetricFromReader(reader) -} - -// PromqlQuery helps to retrieve data from greptimedb via InstantQuery or RangeQuery -func (c *Client) PromqlQuery(ctx context.Context, req QueryRequest) (*greptimepb.PromqlResponse, error) { - request, err := req.buildPromqlRequest(c.cfg) - if err != nil { - return nil, err - } - return c.promqlClient.Handle(ctx, request, c.cfg.CallOptions...) -} diff --git a/client_test.go b/client_test.go index 22b232b..01df9ea 100644 --- a/client_test.go +++ b/client_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -30,6 +30,9 @@ import ( "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + + "github.com/GreptimeTeam/greptimedb-ingester-go/config" + "github.com/GreptimeTeam/greptimedb-ingester-go/model" ) type monitor struct { @@ -111,7 +114,7 @@ func newClient(t *testing.T) *Client { options := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), } - cfg := NewCfg(host).WithPort(grpcPort).WithDatabase(database).WithDialOptions(options...) + cfg := config.New(host).WithPort(grpcPort).WithDatabase(database).WithDialOptions(options...) client, err := NewClient(cfg) assert.Nil(t, err) return client @@ -135,263 +138,31 @@ func TestInvalidClient(t *testing.T) { grpc.WithBlock(), grpc.WithTimeout(time.Second), } - cfg := NewCfg("invalid host").WithPort(grpcPort).WithDatabase(database).WithDialOptions(options...) + cfg := config.New("invalid host").WithPort(grpcPort).WithDatabase(database).WithDialOptions(options...) client, err := NewClient(cfg) assert.Nil(t, client) assert.NotNil(t, err) - cfg = NewCfg(host).WithPort(1111).WithDatabase(database).WithDialOptions(options...) + cfg = config.New(host).WithPort(1111).WithDatabase(database).WithDialOptions(options...) client, err = NewClient(cfg) assert.Nil(t, client) assert.NotNil(t, err) } func TestInsertAndQueryWithSql(t *testing.T) { - table := "test_insert_and_query_with_sql" - ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() - ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() - insertMonitors := []monitor{ - { - host: "127.0.0.1", - ts: time.UnixMilli(ts1), - memory: 21, - cpu: 0.81, - temperature: 21, - isAuthed: true, - }, - { - host: "127.0.0.2", - ts: time.UnixMilli(ts2), - memory: 22, - cpu: 0.82, - temperature: 22, - isAuthed: true, - }, - } - client := newClient(t) - - metric := Metric{} - metric.SetTimePrecision(time.Microsecond) - metric.SetTimestampAlias("ts") - - for _, monitor := range insertMonitors { - series := Series{} - series.AddTag("host", monitor.host) - - series.AddField("memory", monitor.memory) - series.AddField("cpu", monitor.cpu) - series.AddField("temperature", monitor.temperature) - series.AddField("is_authed", monitor.isAuthed) - - series.SetTimestamp(monitor.ts) - - metric.AddSeries(series) - } - - req := InsertRequest{} - req.WithTable(table).WithMetric(metric) - reqs := InsertsRequest{} - reqs.Append(req) - - resp, err := client.Insert(context.Background(), reqs) - assert.Nil(t, err) - assert.True(t, ParseRespHeader(resp).IsSuccess()) - assert.False(t, ParseRespHeader(resp).IsRateLimited()) - assert.Equal(t, uint32(len(insertMonitors)), resp.GetAffectedRows().GetValue()) - - // Query with metric - queryReq := QueryRequest{} - queryReq.WithSql("SELECT * FROM " + table) - resMetric, err := client.Query(context.Background(), queryReq) - assert.Nil(t, err) - assert.Equal(t, 2, len(resMetric.GetSeries())) - - queryMonitors := []monitor{} - for _, series := range resMetric.GetSeries() { - host, ok := series.GetString("host") - assert.True(t, ok) - temperature, ok := series.GetInt("temperature") - assert.True(t, ok) - memory, ok := series.GetUint("memory") - assert.True(t, ok) - cpu, ok := series.GetFloat("cpu") - assert.True(t, ok) - isAuthed, ok := series.GetBool("is_authed") - assert.True(t, ok) - - ts, ok := series.GetTimestamp("ts") - assert.True(t, ok) - - queryMonitors = append(queryMonitors, monitor{ - host: host, - ts: ts, - memory: memory, - cpu: cpu, - temperature: temperature, - isAuthed: isAuthed, - }) - } - - assert.Equal(t, insertMonitors, queryMonitors) - - // query but no data - queryReq = QueryRequest{} - queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s WHERE host = 'not_exist'", table)).WithDatabase(database) - resMetric, err = client.Query(context.Background(), queryReq) - assert.Nil(t, err) - assert.Equal(t, 0, len(resMetric.GetSeries())) } func TestPrecisionSecond(t *testing.T) { - table := "test_precision_second" - client := newClient(t) - - nano := time.Unix(1677728740, 123456789) - micro := time.UnixMicro(nano.UnixMicro()) - milli := time.UnixMilli(nano.UnixMilli()) - sec := time.Unix(nano.Unix(), 0) - - series := Series{} - series.SetTimestamp(nano) - metric := Metric{} - metric.AddSeries(series) - // We set the precision as nanosecond - metric.SetTimePrecision(time.Nanosecond) - req := InsertRequest{} - req.WithTable(table).WithMetric(metric) - reqs := InsertsRequest{} - reqs.WithDatabase(database).Append(req) - - resp, err := client.Insert(context.Background(), reqs) - assert.Nil(t, err) - assert.True(t, ParseRespHeader(resp).IsSuccess()) - assert.False(t, ParseRespHeader(resp).IsRateLimited()) - assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue()) - queryReq := QueryRequest{} - queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s", table)).WithDatabase(database) - resMetric, err := client.Query(context.Background(), queryReq) - assert.Nil(t, err) - assert.Equal(t, 1, len(resMetric.GetSeries())) - - resTime, ok := resMetric.GetSeries()[0].GetTimestamp("ts") - assert.True(t, ok) - // since the precision is second, others should not equal - assert.Equal(t, nano, resTime) - assert.NotEqual(t, sec, resTime) - assert.NotEqual(t, milli, resTime) - assert.NotEqual(t, micro, resTime) } func TestNilInColumn(t *testing.T) { - table := "test_nil_in_column" - - insertMonitors := []monitor{ - { - ts: time.UnixMicro(1677728740000001), - cpu: 0.45, - }, - { - ts: time.UnixMicro(1677728740012002), - memory: 28, - }, - } - - client := newClient(t) - - // Insert - metric := Metric{} - metric.SetTimePrecision(time.Microsecond) - - series1 := Series{} - series1.SetTimestamp(insertMonitors[0].ts) - series1.AddField("cpu", insertMonitors[0].cpu) - metric.AddSeries(series1) - - series2 := Series{} - series2.SetTimestamp(insertMonitors[1].ts) - series2.AddField("memory", insertMonitors[1].memory) - metric.AddSeries(series2) - req := InsertRequest{} - req.WithTable(table).WithMetric(metric) - reqs := InsertsRequest{} - reqs.WithDatabase(database).Append(req) - - resp, err := client.Insert(context.Background(), reqs) - assert.Nil(t, err) - assert.True(t, ParseRespHeader(resp).IsSuccess()) - assert.False(t, ParseRespHeader(resp).IsRateLimited()) - assert.Equal(t, uint32(len(insertMonitors)), resp.GetAffectedRows().GetValue()) - - // Query with metric - queryReq := QueryRequest{} - queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s", table)).WithDatabase(database) - - resMetric, err := client.Query(context.Background(), queryReq) - assert.Nil(t, err) - assert.Equal(t, 2, len(resMetric.GetSeries())) - - resSeries0 := resMetric.GetSeries()[0] - ts, ok := resSeries0.GetTimestamp("ts") - assert.True(t, ok) - - assert.Equal(t, insertMonitors[0].ts, ts) - _, ok = resSeries0.Get("memory") - assert.False(t, ok) - cpu, ok := resSeries0.Get("cpu") - assert.True(t, ok) - assert.Equal(t, insertMonitors[0].cpu, cpu.(float64)) - - resSeries1 := resMetric.GetSeries()[1] - ts, ok = resSeries1.GetTimestamp("ts") - assert.True(t, ok) - - assert.Equal(t, insertMonitors[1].ts, ts) - memory, ok := resSeries1.Get("memory") - assert.True(t, ok) - assert.Equal(t, insertMonitors[1].memory, memory.(uint64)) - _, ok = resSeries1.Get("cpu") - assert.False(t, ok) } func TestNoNeedAuth(t *testing.T) { - table := "test_no_need_auth" - options := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - } - // Client can always connect to a no-auth database, even the usernames and passwords are wrong - cfg := NewCfg(host).WithPort(grpcPort).WithDatabase(database).WithAuth("user", "pwd").WithDialOptions(options...) - client, err := NewClient(cfg) - assert.Nil(t, err) - nano := time.Unix(1677728740, 123456789) - series := Series{} - series.SetTimestamp(time.Now()) - metric := Metric{} - metric.AddSeries(series) - - req := InsertRequest{} - req.WithTable(table).WithMetric(metric) - reqs := InsertsRequest{} - reqs.WithDatabase(database).Append(req) - resp, err := client.Insert(context.Background(), reqs) - assert.Nil(t, err) - assert.True(t, ParseRespHeader(resp).IsSuccess()) - assert.False(t, ParseRespHeader(resp).IsRateLimited()) - assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue()) - - queryReq := QueryRequest{} - queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s", table)).WithDatabase(database) - resMetric, err := client.Query(context.Background(), queryReq) - assert.Nil(t, err) - assert.Equal(t, 1, len(resMetric.GetSeries())) - - resTime, ok := resMetric.GetSeries()[0].GetTimestamp("ts") - assert.True(t, ok) - // since the precision is second, others should not equal - assert.NotEqual(t, nano, resTime) } func TestInsertSameColumnWithDifferentType(t *testing.T) { @@ -399,10 +170,10 @@ func TestInsertSameColumnWithDifferentType(t *testing.T) { client := newClient(t) // insert at first - series := Series{} + series := model.Series{} series.AddIntTag("count", 1) series.SetTimestamp(time.Now()) - metric := Metric{} + metric := model.Metric{} metric.AddSeries(series) req := InsertRequest{} @@ -416,10 +187,10 @@ func TestInsertSameColumnWithDifferentType(t *testing.T) { assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue()) // insert again but with different type - series = Series{} + series = model.Series{} series.AddFloatTag("count", 1) series.SetTimestamp(time.Now()) - metric = Metric{} + metric = model.Metric{} metric.AddSeries(series) req = InsertRequest{} @@ -437,10 +208,10 @@ func TestInsertTimestampWithDifferentPrecision(t *testing.T) { client := newClient(t) // insert with Second precision at first - series := Series{} + series := model.Series{} series.AddIntTag("count", 1) series.SetTimestamp(time.Now()) - metric := Metric{} + metric := model.Metric{} metric.AddSeries(series) metric.SetTimePrecision(time.Second) @@ -455,10 +226,10 @@ func TestInsertTimestampWithDifferentPrecision(t *testing.T) { assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue()) // insert again but with different type - series = Series{} + series = model.Series{} series.AddIntTag("count", 1) series.SetTimestamp(time.Now()) - metric = Metric{} + metric = model.Metric{} metric.AddSeries(series) metric.SetTimePrecision(time.Millisecond) @@ -473,501 +244,17 @@ func TestInsertTimestampWithDifferentPrecision(t *testing.T) { } func TestGetNonMatchedTypeColumn(t *testing.T) { - table := "get_non_matched_type_column" - client := newClient(t) - - column := "count" - var val int64 = 1 - series := Series{} - series.AddIntTag(column, 1) // int64 type - series.SetTimestamp(time.Now()) - metric := Metric{} - metric.AddSeries(series) - metric.SetTimePrecision(time.Second) - req := InsertRequest{} - req.WithTable(table).WithMetric(metric) - reqs := InsertsRequest{} - reqs.WithDatabase(database).Append(req) - resp, err := client.Insert(context.Background(), reqs) - assert.Nil(t, err) - assert.True(t, ParseRespHeader(resp).IsSuccess()) - assert.False(t, ParseRespHeader(resp).IsRateLimited()) - assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue()) - - // Query with metric - queryReq := QueryRequest{} - queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s", table)) - - resMetric, err := client.Query(context.Background(), queryReq) - assert.Nil(t, err) - assert.Equal(t, 1, len(resMetric.GetSeries())) - - // get non exist column - series = resMetric.GetSeries()[0] - - v, ok := series.Get(column) - assert.True(t, ok) - assert.Equal(t, val, v) - - v, ok = series.GetInt(column) - assert.True(t, ok) - assert.Equal(t, val, v) - - _, ok = series.GetUint(column) - assert.False(t, ok) - - _, ok = series.GetFloat(column) - assert.False(t, ok) - - _, ok = series.GetBool(column) - assert.False(t, ok) - - _, ok = series.GetString(column) - assert.False(t, ok) - - _, ok = series.GetBytes(column) - assert.False(t, ok) } func TestGetNotExistColumn(t *testing.T) { - table := "get_not_exist_column" - client := newClient(t) - - series := Series{} - series.AddIntTag("count", 1) - series.SetTimestamp(time.Now()) - metric := Metric{} - metric.AddSeries(series) - metric.SetTimePrecision(time.Second) - req := InsertRequest{} - req.WithTable(table).WithMetric(metric) - reqs := InsertsRequest{} - reqs.WithDatabase(database).Append(req) - resp, err := client.Insert(context.Background(), reqs) - assert.Nil(t, err) - assert.True(t, ParseRespHeader(resp).IsSuccess()) - assert.False(t, ParseRespHeader(resp).IsRateLimited()) - assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue()) - - // Query with metric - queryReq := QueryRequest{} - queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s", table)) - - resMetric, err := client.Query(context.Background(), queryReq) - assert.Nil(t, err) - assert.Equal(t, 1, len(resMetric.GetSeries())) - - // get non exist column - series = resMetric.GetSeries()[0] - _, ok := series.Get("non_exist") - assert.False(t, ok) - - _, ok = series.GetInt("non_exist") - assert.False(t, ok) - - _, ok = series.GetUint("non_exist") - assert.False(t, ok) - - _, ok = series.GetFloat("non_exist") - assert.False(t, ok) - - _, ok = series.GetBool("non_exist") - assert.False(t, ok) - - _, ok = series.GetString("non_exist") - assert.False(t, ok) - - _, ok = series.GetBytes("non_exist") - assert.False(t, ok) } func TestDataTypes(t *testing.T) { - table := "test_data_types" - type datatype struct { - int64V int64 - int32V int32 - int16V int16 - int8V int8 - intV int - uint64V uint64 - uint32V uint32 - uint16V uint16 - uint8V uint8 - uintV uint - float64V float64 - float32V float32 - stringV string - byteV []byte - boolV bool - timeV time.Time - } - - data := datatype{ - int64V: 64, - int32V: 32, - int16V: 16, - int8V: 8, - intV: 64, - uint64V: 64, - uint32V: 32, - uint16V: 16, - uint8V: 8, - uintV: 64, - float64V: 64.0, - float32V: 32.0, - stringV: "string", - byteV: []byte("byte"), - boolV: true, - timeV: time.UnixMilli(1677728740012), - } - client := newClient(t) - - // Insert - metric := Metric{} - metric.SetTimestampAlias("time_v") - - series := Series{} - // int - assert.Nil(t, series.AddIntTag("int64_v_tag", data.int64V)) - assert.Nil(t, series.AddTag("int32_v_tag", data.int32V)) - assert.Nil(t, series.AddTag("int16_v_tag", data.int16V)) - assert.Nil(t, series.AddTag("int8_v_tag", data.int8V)) - assert.Nil(t, series.AddTag("int_v_tag", data.intV)) - assert.Nil(t, series.AddIntField("int64_v_field", data.int64V)) - assert.Nil(t, series.AddField("int32_v_field", data.int32V)) - assert.Nil(t, series.AddField("int16_v_field", data.int16V)) - assert.Nil(t, series.AddField("int8_v_field", data.int8V)) - assert.Nil(t, series.AddField("int_v_field", data.intV)) - - // uint - assert.Nil(t, series.AddUintTag("uint64_v_tag", data.uint64V)) - assert.Nil(t, series.AddTag("uint32_v_tag", data.uint32V)) - assert.Nil(t, series.AddTag("uint16_v_tag", data.uint16V)) - assert.Nil(t, series.AddTag("uint8_v_tag", data.uint8V)) - assert.Nil(t, series.AddTag("uint_v_tag", data.uintV)) - assert.Nil(t, series.AddUintField("uint64_v_field", data.uint64V)) - assert.Nil(t, series.AddField("uint32_v_field", data.uint32V)) - assert.Nil(t, series.AddField("uint16_v_field", data.uint16V)) - assert.Nil(t, series.AddField("uint8_v_field", data.uint8V)) - assert.Nil(t, series.AddField("uint_v_field", data.uintV)) - - // float - assert.Nil(t, series.AddFloatTag("float64_v_tag", data.float64V)) - assert.Nil(t, series.AddTag("float32_v_tag", data.float32V)) - assert.Nil(t, series.AddFloatField("float64_v_field", data.float64V)) - assert.Nil(t, series.AddField("float32_v_field", data.float32V)) - - // string - assert.Nil(t, series.AddStringTag("string_v_tag", data.stringV)) - assert.Nil(t, series.AddStringField("string_v_field", data.stringV)) - - assert.Nil(t, series.AddBytesTag("byte_v_tag", data.byteV)) - assert.Nil(t, series.AddBytesField("byte_v_field", data.byteV)) - - // bool - assert.Nil(t, series.AddBoolTag("bool_v_tag", data.boolV)) - assert.Nil(t, series.AddBoolField("bool_v_field", data.boolV)) - - assert.Nil(t, series.SetTimestamp(data.timeV)) - metric.AddSeries(series) - - req := InsertRequest{} - req.WithTable(table).WithMetric(metric) - reqs := InsertsRequest{} - reqs.WithDatabase(database).Append(req) - - resp, err := client.Insert(context.Background(), reqs) - assert.Nil(t, err) - assert.True(t, ParseRespHeader(resp).IsSuccess()) - assert.False(t, ParseRespHeader(resp).IsRateLimited()) - assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue()) - - // Query with metric - queryReq := QueryRequest{} - queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s", table)).WithDatabase(database) - - resMetric, err := client.Query(context.Background(), queryReq) - assert.Nil(t, err) - assert.Equal(t, 1, len(resMetric.GetSeries())) - - series = resMetric.GetSeries()[0] - // int - int64V, ok := series.GetInt("int64_v_tag") - assert.True(t, ok) - int32V, ok := series.GetInt("int32_v_tag") - assert.True(t, ok) - int16V, ok := series.GetInt("int16_v_tag") - assert.True(t, ok) - int8V, ok := series.GetInt("int8_v_tag") - assert.True(t, ok) - intV, ok := series.GetInt("int_v_tag") - assert.True(t, ok) - - _, ok = series.GetInt("int64_v_field") - assert.True(t, ok) - _, ok = series.GetInt("int32_v_field") - assert.True(t, ok) - _, ok = series.GetInt("int16_v_field") - assert.True(t, ok) - _, ok = series.GetInt("int8_v_field") - assert.True(t, ok) - _, ok = series.GetInt("int_v_field") - assert.True(t, ok) - - // uint - uint64V, ok := series.GetUint("uint64_v_tag") - assert.True(t, ok) - uint32V, ok := series.GetUint("uint32_v_tag") - assert.True(t, ok) - uint16V, ok := series.GetUint("uint16_v_tag") - assert.True(t, ok) - uint8V, ok := series.GetUint("uint8_v_tag") - assert.True(t, ok) - uintV, ok := series.GetUint("uint_v_tag") - assert.True(t, ok) - - _, ok = series.GetUint("uint64_v_field") - assert.True(t, ok) - _, ok = series.GetUint("uint32_v_field") - assert.True(t, ok) - _, ok = series.GetUint("uint16_v_field") - assert.True(t, ok) - _, ok = series.GetUint("uint8_v_field") - assert.True(t, ok) - _, ok = series.GetUint("uint_v_field") - assert.True(t, ok) - - // float - float64V, ok := series.GetFloat("float64_v_tag") - assert.True(t, ok) - float32V, ok := series.GetFloat("float32_v_tag") - assert.True(t, ok) - - _, ok = series.GetFloat("float64_v_field") - assert.True(t, ok) - _, ok = series.GetFloat("float32_v_field") - assert.True(t, ok) - - // string - stringV, ok := series.GetString("string_v_tag") - assert.True(t, ok) - - _, ok = series.GetString("string_v_field") - assert.True(t, ok) - - // bytes - byteV, ok := series.GetBytes("byte_v_tag") - assert.True(t, ok) - - _, ok = series.GetBytes("byte_v_field") - assert.True(t, ok) - - // bool - boolV, ok := series.GetBool("bool_v_tag") - assert.True(t, ok) - - _, ok = series.GetBool("bool_v_field") - assert.True(t, ok) - - timeV, ok := series.GetTimestamp("time_v") - assert.True(t, ok) - - querydata := datatype{ - int64V: int64V, - int32V: int32(int32V), - int16V: int16(int16V), - int8V: int8(int8V), - intV: int(intV), - uint64V: uint64V, - uint32V: uint32(uint32V), - uint16V: uint16(uint16V), - uint8V: uint8(uint8V), - uintV: uint(uintV), - float64V: float64V, - float32V: float32(float32V), - stringV: stringV, - byteV: byteV, - boolV: boolV, - timeV: timeV, - } - assert.Equal(t, data, querydata) } func TestCreateTableInAdvance(t *testing.T) { - table := "create_datatypes_table_in_advance" - schema := "CREATE TABLE " + table + " (" + - " id varchar," + - " i64 bigint," + - " i32 int," + - " i16 smallint," + - " i8 tinyint," + - " u64 bigint unsigned," + - " u32 int unsigned," + - " u16 smallint unsigned," + - " u8 tinyint unsigned," + - " f32 float," + - " f64 double," + - " bool boolean," + - " bytes varbinary," + - " times TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP," + - " TIME INDEX (times)," + - " PRIMARY KEY(id))" - createTable(t, schema) - - type datatype struct { - id string - i64 int64 - i32 int32 - i16 int16 - i8 int8 - u64 uint64 - u32 uint32 - u16 uint16 - u8 uint8 - f64 float64 - f32 float32 - bool bool - bytes []byte - } - - now := time.Now() - data := datatype{ - id: "test", - i64: 64, - i32: 32, - i16: 16, - i8: 8, - u64: 64, - u32: 32, - u16: 16, - u8: 8, - f64: 64.0, - f32: 32.0, - bytes: []byte("byte"), - bool: true, - } - - client := newClient(t) - - series := Series{} - - // string - assert.Nil(t, series.AddTag("id", data.id)) - - // int - assert.Nil(t, series.AddField("i64", data.i64)) - assert.Nil(t, series.AddField("i32", data.i32)) - assert.Nil(t, series.AddField("i16", data.i16)) - assert.Nil(t, series.AddField("i8", data.i8)) - - // uint - assert.Nil(t, series.AddField("u64", data.u64)) - assert.Nil(t, series.AddField("u32", data.u32)) - assert.Nil(t, series.AddField("u16", data.u16)) - assert.Nil(t, series.AddField("u8", data.u8)) - - // float - assert.Nil(t, series.AddField("f64", data.f64)) - assert.Nil(t, series.AddField("f32", data.f32)) - - // []byte - assert.Nil(t, series.AddField("bytes", data.bytes)) - - // bool - assert.Nil(t, series.AddBoolField("bool", data.bool)) - - assert.Nil(t, series.SetTimestamp(now)) - - // Insert - metric := Metric{} - metric.SetTimestampAlias("times") - metric.SetTimePrecision(time.Second) - metric.AddSeries(series) - - req := InsertRequest{} - req.WithTable(table).WithMetric(metric) - reqs := InsertsRequest{} - reqs.WithDatabase(database).Append(req) - - resp, err := client.Insert(context.Background(), reqs) - assert.Nil(t, err) - assert.True(t, ParseRespHeader(resp).IsSuccess()) - assert.False(t, ParseRespHeader(resp).IsRateLimited()) - assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue()) - - // Query with metric - queryReq := QueryRequest{} - queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s", table)).WithDatabase(database) - - resMetric, err := client.Query(context.Background(), queryReq) - assert.Nil(t, err) - assert.Equal(t, 1, len(resMetric.GetSeries())) - - series = resMetric.GetSeries()[0] - - // int - int64V, ok := series.Get("i64") - assert.True(t, ok) - int32V, ok := series.Get("i32") - assert.True(t, ok) - int16V, ok := series.Get("i16") - assert.True(t, ok) - int8V, ok := series.Get("i8") - assert.True(t, ok) - - // uint - uint64V, ok := series.Get("u64") - assert.True(t, ok) - uint32V, ok := series.Get("u32") - assert.True(t, ok) - uint16V, ok := series.Get("u16") - assert.True(t, ok) - uint8V, ok := series.Get("u8") - assert.True(t, ok) - - // float - float64V, ok := series.Get("f64") - assert.True(t, ok) - float32V, ok := series.Get("f32") - assert.True(t, ok) - - // string - stringV, ok := series.Get("id") - assert.True(t, ok) - - // []byte - byteV, ok := series.Get("bytes") - assert.True(t, ok) - - // bool - boolV, ok := series.Get("bool") - assert.True(t, ok) - - querydata := datatype{ - id: stringV.(string), - - i64: int64V.(int64), - i32: int32V.(int32), - i16: int16V.(int16), - i8: int8V.(int8), - - u64: uint64V.(uint64), - u32: uint32V.(uint32), - u16: uint16V.(uint16), - u8: uint8V.(uint8), - - f64: float64V.(float64), - f32: float32V.(float32), - - bytes: byteV.([]byte), - bool: boolV.(bool), - } - assert.Equal(t, data, querydata) - timeV, ok := series.GetTimestamp("times") - assert.True(t, ok) - assert.Equal(t, now.Unix(), timeV.Unix()) } diff --git a/config.go b/config/config.go similarity index 89% rename from config.go rename to config/config.go index b2b6067..a91eef6 100644 --- a/config.go +++ b/config/config.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,13 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -package greptime +package config import ( "fmt" greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" "google.golang.org/grpc" + + gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util" ) // Config is to define how the Client behaves. @@ -46,8 +48,8 @@ type Config struct { CallOptions []grpc.CallOption } -// NewCfg helps to init Config with host only -func NewCfg(host string) *Config { +// New helps to init Config with host only +func New(host string) *Config { return &Config{ Host: host, Port: 4001, @@ -98,8 +100,8 @@ func (c *Config) WithCallOptions(options ...grpc.CallOption) *Config { } // buildAuthHeader only supports Basic Auth so far -func (c *Config) buildAuthHeader() *greptimepb.AuthHeader { - if isEmptyString(c.Username) || isEmptyString(c.Password) { +func (c *Config) BuildAuthHeader() *greptimepb.AuthHeader { + if gutil.IsEmptyString(c.Username) || gutil.IsEmptyString(c.Password) { return nil } @@ -114,6 +116,6 @@ func (c *Config) buildAuthHeader() *greptimepb.AuthHeader { } -func (c *Config) getGRPCAddr() string { +func (c *Config) GetGRPCAddr() string { return fmt.Sprintf("%s:%d", c.Host, c.Port) } diff --git a/doc.go b/doc.go deleted file mode 100644 index 0ec3c2a..0000000 --- a/doc.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package greptime provides API for using GreptimeDB client in Go. -// -// # Basic Insert and Query -// -// You can call [NewClient] with [Config] to init a concurrent safe [Client], and -// construct rows of data by [Metric] and [Series], call [Client.Insert] to insert -// [InsertsRequest] into greptimedb, and call [Client.Query] to retrieve data from -// greptimedb via [QueryRequest]. -// -// # Promql -// -// You can also call [Client.PromqlQuery] to retrieve data in []byte format, which -// is absolutely the same as Prometheus. You can choose [InstantPromql] or [RangePromql] -// to get vector or matrix result. -// -// # Series -// -// You don't need to create the table, it will be created automatically via [Series] fields. -// What you have to know about [Series] in advance: -// -// - Tag is like index, it helps you to retrieve data more efficiently -// - Field is like value, it can be used to analyze, calculate, aggregate, etc,. -// - Timestamp is required for timeseries data -// -// Once the schema is created automatically, it can not be changed by [Client], it -// will fail if the column type does not match -// -// # Metric -// -// [Metric] is like multiple [Series], it will check if all of the [Series] are valid: -// -// - the same column name in different series: data type MUST BE the same -// - Tag and Field MUST NOT contain the same column name -// - timestamp MUST NOT BE empty -// -// Also, [Metric] can set: -// -// - [Metric.SetTimePrecision] -// - [Metric.SetTimestampAlias] -package greptime diff --git a/doc_test.go b/doc_test.go deleted file mode 100644 index fe899a6..0000000 --- a/doc_test.go +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package greptime_test - -import ( - "context" - "fmt" - "time" - - greptime "github.com/GreptimeTeam/greptimedb-client-go" - "github.com/GreptimeTeam/greptimedb-client-go/prom" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -var ( - client *greptime.Client - - monitorTable string = "monitor" -) - -func init() { - options := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - } - cfg := greptime.NewCfg("127.0.0.1"). - WithPort(4001). - WithDatabase("public"). - WithAuth("", ""). - WithDialOptions(options...). - WithCallOptions() - c, err := greptime.NewClient(cfg) - if err != nil { - panic("failed to init client") - } - client = c -} - -func constructInsertRequest(table string) greptime.InsertRequest { - series := greptime.Series{} - series.AddTag("region", "az") - series.AddStringTag("host", "localhost") - series.AddFloatField("cpu", 0.90) - series.AddField("memory", 1024) - series.SetTimestamp(time.Now()) - - metric := greptime.Metric{} - metric.AddSeries(series) - - insertRequest := greptime.InsertRequest{} - insertRequest.WithTable(table).WithMetric(metric) - - return insertRequest -} - -func ExampleInsert() { - insertsRequest := greptime.InsertsRequest{} - insertsRequest. - Append(constructInsertRequest(monitorTable)). - Append(constructInsertRequest("temperatures")) - - resp, err := client.Insert(context.Background(), insertsRequest) - if err != nil { - fmt.Printf("fail to insert, err: %+v\n", err) - return - } - fmt.Printf("AffectedRows: %d\n", resp.GetAffectedRows().GetValue()) -} - -func ExampleQueryViaSql() { - type Monitor struct { - region string - host string - cpu float64 - memory int64 - ts time.Time - } - - req := greptime.QueryRequest{} - req.WithSql("SELECT * FROM " + monitorTable) - - resMetric, err := client.Query(context.Background(), req) - if err != nil { - fmt.Printf("fail to query, err: %+v\n", err) - return - } - - monitors := []Monitor{} - for _, series := range resMetric.GetSeries() { - monitor := &Monitor{} - host, exist := series.Get("host") - if exist { - monitor.host = host.(string) - } - monitor.region, _ = series.GetString("region") - monitor.cpu, _ = series.GetFloat("cpu") - monitor.memory, _ = series.GetInt("memory") - monitor.ts, _ = series.GetTimestamp("ts") - monitors = append(monitors, *monitor) - } - fmt.Println(monitors) -} - -func ExampleQueryViaInstantPromql() { - promql := greptime.NewInstantPromql(monitorTable) - req := greptime.QueryRequest{} - req.WithInstantPromql(promql) - resp, err := client.PromqlQuery(context.Background(), req) - if err != nil { - fmt.Printf("failed to do instant promql query: %+v\n", err) - return - } - - result, err := prom.UnmarshalApiResponse(resp.GetBody()) - if err != nil { - fmt.Printf("failed to unmarshal instant promql, body: %s, err: %+v", string(resp.GetBody()), err) - return - } - fmt.Printf("%s:\n%+v\n", result.Type, result.Val) - -} - -func ExamplQueryViaRangePromql() { - end := time.Now() - start := end.Add(time.Duration(-15) * time.Second) - promql := greptime.NewRangePromql(monitorTable).WithStart(start).WithEnd(end).WithStep(time.Second) - req := greptime.QueryRequest{} - req.WithRangePromql(promql) - resp, err := client.PromqlQuery(context.Background(), req) - if err != nil { - fmt.Printf("failed to do range promql query: %+v\n", err) - return - } - - result, err := prom.UnmarshalApiResponse(resp.GetBody()) - if err != nil { - fmt.Printf("failed to unmarshal instant promql, body: %s, err: %+v", string(resp.GetBody()), err) - return - } - fmt.Printf("%s:\n%+v\n", result.Type, result.Val) -} diff --git a/docs/gorm/mysql.md b/docs/gorm/mysql.md deleted file mode 100644 index e202775..0000000 --- a/docs/gorm/mysql.md +++ /dev/null @@ -1,85 +0,0 @@ -Retrieving via MySQL -== - - -```go -package main - -import ( - "fmt" - "time" - - "gorm.io/driver/mysql" - "gorm.io/gorm" -) - -type Monitor struct { - ID int64 `gorm:"primaryKey"` - Host string `gorm:"column:host"` - Memory uint64 `gorm:"column:memory"` - Cpu float64 `gorm:"column:cpu"` - Temperature int64 `gorm:"column:temperature"` - Ts time.Time `gorm:"column:ts"` -} - -func (Monitor) TableName() string { - return "monitor" -} - -type Mysql struct { - Host string - Port string - User string - Password string - Database string - - DB *gorm.DB -} - -func (m *Mysql) Setup() error { - if m.DB != nil { - return nil - } - - dsn := fmt.Sprintf("tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", - m.Host, m.Port, m.Database) - if m.User != "" && m.Password != "" { - dsn = fmt.Sprintf("%s:%s@%s", m.User, m.Password, dsn) - } - db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) - if err != nil { - return err - } - - m.DB = db - return nil -} - -func (p *Mysql) AllMonitors() ([]Monitor, error) { - var monitors []Monitor - err := p.DB.Find(&monitors).Error - return monitors, err -} - -func main() { - mysql := &Mysql{ - Host: "127.0.0.1", - Port: "4002", - User: "", - Password: "", - Database: "public", - } - if err := mysql.Setup(); err != nil { - panic(err) - } - - all, err := mysql.AllMonitors() - if err != nil { - panic(err) - } - - for i, m := range all { - fmt.Printf("%d: %#v\n", i, m) - } -} -``` \ No newline at end of file diff --git a/docs/gorm/postgres.md b/docs/gorm/postgres.md deleted file mode 100644 index 9f2ea64..0000000 --- a/docs/gorm/postgres.md +++ /dev/null @@ -1,82 +0,0 @@ -Retrieving via PostgresQL -== - -```go -package main - -import ( - "fmt" - "time" - - "gorm.io/driver/postgres" - "gorm.io/gorm" -) - -type Monitor struct { - ID int64 `gorm:"primaryKey"` - Host string `gorm:"column:host"` - Memory uint64 `gorm:"column:memory"` - Cpu float64 `gorm:"column:cpu"` - Temperature int64 `gorm:"column:temperature"` - Ts time.Time `gorm:"column:ts"` -} - -func (Monitor) TableName() string { - return "monitor" -} - -type Postgres struct { - Host string - Port string - User string - Password string - Database string - - DB *gorm.DB -} - -// Setup is to init the DB, and SHOULD BE called only once -func (p *Postgres) Setup() error { - if p.DB != nil { - return nil - } - - dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=Asia/Shanghai", - p.Host, p.User, p.Password, p.Database, p.Port) - - db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) - if err != nil { - return err - } - - p.DB = db - return nil -} - -func (p *Postgres) AllMonitors() ([]Monitor, error) { - var monitors []Monitor - err := p.DB.Find(&monitors).Error - return monitors, err -} - -func main() { - pg := &Postgres{ - Host: "127.0.0.1", - Port: "4003", - User: "", - Password: "", - Database: "public", - } - if err := pg.Setup(); err != nil { - panic(err) - } - - all, err := pg.AllMonitors() - if err != nil { - panic(err) - } - for i, m := range all { - fmt.Printf("%d: %#v\n", i, m) - } -} -``` \ No newline at end of file diff --git a/docs/insert.md b/docs/insert.md deleted file mode 100644 index 4bec6b2..0000000 --- a/docs/insert.md +++ /dev/null @@ -1,116 +0,0 @@ -Inserting -== - -```go -package insert - -import ( - "context" - "fmt" - "strconv" - "time" - - gc "github.com/GreptimeTeam/greptimedb-client-go" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -type Monitor struct { - ID int64 - Host string - Memory uint64 - Cpu float64 - Temperature int64 - Ts time.Time -} - -type Greptime struct { - Host string - Port string - User string - Password string - Database string - - Client gc.Client -} - -func (g *Greptime) Setup() error { - options := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - } - - cfg := gc.NewCfg(g.Host). - WithDatabase(g.Database). - WithAuth(g.User, g.Password). - WithDialOptions(options...) - - if len(g.Port) > 0 { - port, err := strconv.Atoi(g.Port) - if err != nil { - return err - } - cfg.WithPort(port) - } - - cli, err := gc.NewClient(cfg) - if err != nil { - return err - } - - g.Client = *cli - return nil -} - -func (g *Greptime) Insert() error { - table := "monitor" - monitor := Monitor{ - ID: time.Now().UnixMicro(), - Host: "127.0.0.1", - Ts: time.Now(), - Memory: 21, - Cpu: 0.81, - Temperature: 21, - } - - series := gc.Series{} - series.AddTag("id", monitor.ID) - series.AddField("host", monitor.Host) - series.AddField("memory", monitor.Memory) - series.AddField("cpu", monitor.Cpu) - series.AddField("temperature", monitor.Temperature) - series.SetTimestamp(monitor.Ts) - - metric := gc.Metric{} - metric.SetTimePrecision(time.Microsecond) - metric.SetTimestampAlias("ts") - metric.AddSeries(series) - - req := gc.InsertRequest{} - req.WithTable(table).WithMetric(metric) - reqs := gc.InsertsRequest{} - reqs.Append(req) - - resp, err := g.Client.Insert(context.Background(), reqs) - fmt.Println(resp) - return err -} - -func main() { - greptimedb := &Greptime{ - Host: "127.0.0.1", - Port: "4001", - User: "", - Password: "", - Database: "public", - } - if err := greptimedb.Setup(); err != nil { - panic(err) - } - - if err := greptimedb.Insert(); err != nil { - panic(err) - } - - fmt.Println("insert success via greptimedb-client") -} -``` \ No newline at end of file diff --git a/docs/promql.md b/docs/promql.md deleted file mode 100644 index 6133410..0000000 --- a/docs/promql.md +++ /dev/null @@ -1,117 +0,0 @@ -Retrieving via PromQL -== - -```go -package main - -import ( - "context" - "fmt" - "strconv" - "time" - - gc "github.com/GreptimeTeam/greptimedb-client-go" - "github.com/GreptimeTeam/greptimedb-client-go/prom" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -var monitorTable = "monitor" - -type Greptime struct { - Host string - Port string - User string - Password string - Database string - - Client gc.Client -} - -func (g *Greptime) Setup() error { - options := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - } - - cfg := gc.NewCfg(g.Host). - WithDatabase(g.Database). - WithAuth(g.User, g.Password). - WithDialOptions(options...) - - if len(g.Port) > 0 { - port, err := strconv.Atoi(g.Port) - if err != nil { - return err - } - cfg.WithPort(port) - } - - cli, err := gc.NewClient(cfg) - if err != nil { - return err - } - - g.Client = *cli - return nil -} - -// the response format is in []byte, and is absolutely the same as Prometheus -func (g *Greptime) queryViaInstantPromql() { - promql := gc.NewInstantPromql(monitorTable) - req := gc.QueryRequest{} - req.WithInstantPromql(promql) - resp, err := g.Client.PromqlQuery(context.Background(), req) - if err != nil { - fmt.Printf("failed to do instant promql query: %+v\n", err) - return - } - - // you can use prom package to unmarshal the response as you want - result, err := prom.UnmarshalApiResponse(resp.GetBody()) - if err != nil { - fmt.Printf("failed to unmarshal instant promql, body: %s, err: %+v", string(resp.GetBody()), err) - return - } - fmt.Printf("%s:\n%+v\n", result.Type, result.Val) - -} - -// the response format is in []byte, and is absolutely the same as Prometheus -func (g *Greptime) queryViaRangePromql() { - end := time.Now() - start := end.Add(time.Duration(-15) * time.Second) // 15 seconds before - promql := gc.NewRangePromql(monitorTable).WithStart(start).WithEnd(end).WithStep(time.Second) - req := gc.QueryRequest{} - req.WithRangePromql(promql) - resp, err := g.Client.PromqlQuery(context.Background(), req) - if err != nil { - fmt.Printf("failed to do range promql query: %+v\n", err) - return - } - - // you can use prom package to unmarshal the response as you want - result, err := prom.UnmarshalApiResponse(resp.GetBody()) - if err != nil { - fmt.Printf("failed to unmarshal instant promql, body: %s, err: %+v", string(resp.GetBody()), err) - return - } - fmt.Printf("%s:\n%+v\n", result.Type, result.Val) -} - -func main() { - greptimedb := &Greptime{ - Host: "127.0.0.1", - Port: "4001", - User: "", - Password: "", - Database: "public", - } - if err := greptimedb.Setup(); err != nil { - panic(err) - } - - greptimedb.queryViaInstantPromql() - greptimedb.queryViaRangePromql() -} - -``` \ No newline at end of file diff --git a/docs/streaming.md b/docs/streaming.md deleted file mode 100644 index 17c8dee..0000000 --- a/docs/streaming.md +++ /dev/null @@ -1,134 +0,0 @@ -Streaming Inserting -== - -```go -package main - -import ( - "context" - "fmt" - "strconv" - "time" - - gc "github.com/GreptimeTeam/greptimedb-client-go" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -type Monitor struct { - ID int64 - Host string - Memory uint64 - Cpu float64 - Temperature int64 - Ts time.Time -} - -type Greptime struct { - Host string - Port string - User string - Password string - Database string - - StreamClient gc.StreamClient -} - -func mockData(size int) []Monitor { - monitors := make([]Monitor, 0, size) - for i := 0; i < size; i++ { - monitor := Monitor{ - ID: time.Now().UnixMicro(), - Host: "127.0.0.1", - Ts: time.Now(), - Memory: 21, - Cpu: 0.81, - Temperature: 21, - } - - monitors = append(monitors, monitor) - time.Sleep(time.Millisecond) - } - return monitors -} - -func (g *Greptime) Setup() error { - options := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - } - - cfg := gc.NewCfg(g.Host). - WithDatabase(g.Database). - WithAuth(g.User, g.Password). - WithDialOptions(options...) - - if len(g.Port) > 0 { - port, err := strconv.Atoi(g.Port) - if err != nil { - return err - } - cfg.WithPort(port) - } - - cli, err := gc.NewStreamClient(cfg) - if err != nil { - return err - } - - g.StreamClient = *cli - return nil -} - -func (g *Greptime) StreamInsert() error { - table := "monitor" - - size := 20 - insertMonitors := mockData(size) - metric := gc.Metric{} - for i, monitor := range insertMonitors { - series := gc.Series{} - series.AddTag("id", monitor.ID) - series.AddField("host", monitor.Host) - series.AddField("memory", monitor.Memory) - series.AddField("cpu", monitor.Cpu) - series.AddField("temperature", monitor.Temperature) - series.SetTimestamp(monitor.Ts) - metric.AddSeries(series) - - if len(metric.GetSeries()) > 0 { - if (i+1)%10 == 0 || i >= size-1 { - req := gc.InsertRequest{} - req.WithTable(table).WithMetric(metric) - - reqs := gc.InsertsRequest{} - reqs.Append(req) - - fmt.Printf("ready to send %d records\n", len(metric.GetSeries())) - if err := g.StreamClient.Send(context.Background(), reqs); err != nil { - fmt.Println(err) - } - metric = gc.Metric{} - } - } - } - _, err := g.StreamClient.CloseAndRecv(context.Background()) - return err -} - -func main() { - greptimedb := &Greptime{ - Host: "127.0.0.1", - Port: "4001", - User: "", - Password: "", - Database: "public", - } - if err := greptimedb.Setup(); err != nil { - panic(err) - } - - greptimedb.StreamInsert() - fmt.Println("stream insert success via greptimedb-client") -} - -``` \ No newline at end of file diff --git a/errors.go b/error/error.go similarity index 97% rename from errors.go rename to error/error.go index bde0286..0eae956 100644 --- a/errors.go +++ b/error/error.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package greptime +package error import ( "errors" diff --git a/go.mod b/go.mod index 67d5fc2..266565a 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/GreptimeTeam/greptimedb-client-go +module github.com/GreptimeTeam/greptimedb-ingester-go go 1.20 diff --git a/header.go b/header.go index 0860a67..7ad7571 100644 --- a/header.go +++ b/header.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,24 +16,28 @@ package greptime import ( greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" + + "github.com/GreptimeTeam/greptimedb-ingester-go/config" + gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error" + gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util" ) type reqHeader struct { database string } -func (h *reqHeader) build(cfg *Config) (*greptimepb.RequestHeader, error) { - if isEmptyString(h.database) { +func (h *reqHeader) build(cfg *config.Config) (*greptimepb.RequestHeader, error) { + if gutil.IsEmptyString(h.database) { h.database = cfg.Database } - if isEmptyString(h.database) { - return nil, ErrEmptyDatabase + if gutil.IsEmptyString(h.database) { + return nil, gerr.ErrEmptyDatabase } header := &greptimepb.RequestHeader{ Dbname: h.database, - Authorization: cfg.buildAuthHeader(), + Authorization: cfg.BuildAuthHeader(), } return header, nil @@ -53,7 +57,7 @@ func (h RespHeader) IsRateLimited() bool { } func (h RespHeader) IsNil() bool { - return h.Code == 0 && isEmptyString(h.Msg) + return h.Code == 0 && gutil.IsEmptyString(h.Msg) } type getRespHeader interface { diff --git a/header_test.go b/header_test.go index 8b38867..3123a78 100644 --- a/header_test.go +++ b/header_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,23 +19,26 @@ import ( greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" "github.com/stretchr/testify/assert" + + "github.com/GreptimeTeam/greptimedb-ingester-go/config" + gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error" ) func TestHeaderBuild(t *testing.T) { h := &reqHeader{} - gh, err := h.build(&Config{}) - assert.ErrorIs(t, err, ErrEmptyDatabase) + gh, err := h.build(&config.Config{}) + assert.ErrorIs(t, err, gerr.ErrEmptyDatabase) assert.Nil(t, gh) - gh, err = h.build(&Config{Database: "database"}) + gh, err = h.build(&config.Config{Database: "database"}) assert.Nil(t, err) assert.Equal(t, &greptimepb.RequestHeader{ Dbname: "database", }, gh) h.database = "db_in_header" - gh, err = h.build(&Config{Database: "database"}) + gh, err = h.build(&config.Config{Database: "database"}) assert.Nil(t, err) assert.Equal(t, &greptimepb.RequestHeader{ Dbname: "db_in_header", diff --git a/insert.go b/insert.go index 2e09fc3..a9725bc 100644 --- a/insert.go +++ b/insert.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,6 +16,11 @@ package greptime import ( greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" + + "github.com/GreptimeTeam/greptimedb-ingester-go/config" + gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error" + "github.com/GreptimeTeam/greptimedb-ingester-go/model" + gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util" ) type InsertsRequest struct { @@ -42,14 +47,14 @@ func (r *InsertsRequest) Append(insert InsertRequest) *InsertsRequest { return r } -func (r InsertsRequest) build(cfg *Config) (*greptimepb.GreptimeRequest, error) { +func (r InsertsRequest) build(cfg *config.Config) (*greptimepb.GreptimeRequest, error) { header, err := r.header.build(cfg) if err != nil { return nil, err } if len(r.inserts) == 0 { - return nil, ErrEmptyInserts + return nil, gerr.ErrEmptyInserts } reqs := make([]*greptimepb.InsertRequest, 0, len(r.inserts)) @@ -75,7 +80,7 @@ func (r InsertsRequest) build(cfg *Config) (*greptimepb.GreptimeRequest, error) // InsertRequest insert metric to specified table. You can also specify the database in header. type InsertRequest struct { table string - metric Metric + metric model.Metric } func (r *InsertRequest) WithTable(table string) *InsertRequest { @@ -83,21 +88,21 @@ func (r *InsertRequest) WithTable(table string) *InsertRequest { return r } -func (r *InsertRequest) WithMetric(metric Metric) *InsertRequest { +func (r *InsertRequest) WithMetric(metric model.Metric) *InsertRequest { r.metric = metric return r } func (r *InsertRequest) RowCount() uint32 { - return uint32(len(r.metric.series)) + return uint32(len(r.metric.GetSeries())) } func (r *InsertRequest) build() (*greptimepb.InsertRequest, error) { - if isEmptyString(r.table) { - return nil, ErrEmptyTable + if gutil.IsEmptyString(r.table) { + return nil, gerr.ErrEmptyTable } - columns, err := r.metric.intoGreptimeColumn() + columns, err := r.metric.IntoGreptimeColumn() if err != nil { return nil, err } diff --git a/metric.go b/model/metric.go similarity index 66% rename from metric.go rename to model/metric.go index e604ca1..9d31aab 100644 --- a/metric.go +++ b/model/metric.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,18 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -package greptime +package model import ( - "errors" "fmt" "math" "time" greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/flight" + + gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error" + gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util" ) // Metric represents multiple rows of data, and also Metric can specify @@ -50,125 +49,6 @@ func (m *Metric) GetSeries() []Series { return m.series } -func buildMetricFromReader(r *flight.Reader) (*Metric, error) { - metric := Metric{} - - if r == nil { - return nil, errors.New("Internal Error, empty reader pointer") - } - - fields := r.Schema().Fields() - for r.Next() { - record := r.Record() - for i := 0; i < int(record.NumRows()); i++ { - series := Series{} - for j := 0; j < int(record.NumCols()); j++ { - column := record.Column(j) - colVal, err := fromColumn(column, i) - if err != nil { - return nil, err - } - series.AddField(fields[j].Name, colVal) - } - if err := metric.AddSeries(series); err != nil { - return nil, err - } - } - } - - return &metric, nil -} - -func extractPrecision(field *arrow.Field) (time.Duration, error) { - if field == nil { - return 0, errors.New("field should not be empty") - } - dataType, ok := field.Type.(*arrow.TimestampType) - if !ok { - return 0, fmt.Errorf("unsupported arrow field type '%s'", field.Type.Name()) - } - switch dataType.Unit { - case arrow.Microsecond: - return time.Microsecond, nil - case arrow.Millisecond: - return time.Millisecond, nil - case arrow.Second: - return time.Second, nil - case arrow.Nanosecond: - return time.Nanosecond, nil - default: - return 0, fmt.Errorf("unsupported arrow type '%s'", field.Type.Name()) - } - -} - -// fromColumn retrieves arrow value from the column at idx position -func fromColumn(column arrow.Array, idx int) (any, error) { - if column.IsNull(idx) { - return nil, nil - } - switch typedColumn := column.(type) { - case *array.Int64: - return typedColumn.Value(idx), nil - case *array.Int32: - return typedColumn.Value(idx), nil - case *array.Int16: - return typedColumn.Value(idx), nil - case *array.Int8: - return typedColumn.Value(idx), nil - case *array.Uint64: - return typedColumn.Value(idx), nil - case *array.Uint32: - return typedColumn.Value(idx), nil - case *array.Uint16: - return typedColumn.Value(idx), nil - case *array.Uint8: - return typedColumn.Value(idx), nil - case *array.Float64: - return typedColumn.Value(idx), nil - case *array.Float32: - return typedColumn.Value(idx), nil - case *array.String: - return typedColumn.Value(idx), nil - case *array.Boolean: - return typedColumn.Value(idx), nil - case *array.Binary: - return typedColumn.Value(idx), nil - case *array.LargeBinary: - return typedColumn.Value(idx), nil - case *array.FixedSizeBinary: - return typedColumn.Value(idx), nil - case *array.Time32: - return typedColumn.Value(idx), nil - case *array.Time64: - return typedColumn.Value(idx), nil - case *array.Date32: - return typedColumn.Value(idx), nil - case *array.Date64: - return typedColumn.Value(idx), nil - case *array.Timestamp: - dataType, ok := column.DataType().(*arrow.TimestampType) - if !ok { - return nil, fmt.Errorf("unsupported arrow timestamp type '%T' for '%s'", typedColumn, column.DataType().Name()) - } - value := int64(typedColumn.Value(idx)) - switch dataType.Unit { - case arrow.Microsecond: - return time.UnixMicro(value), nil - case arrow.Millisecond: - return time.UnixMilli(value), nil - case arrow.Second: - return time.Unix(value, 0), nil - case arrow.Nanosecond: - return time.Unix(0, value), nil - default: - return nil, fmt.Errorf("unsupported arrow timestamp type '%T' for '%s'", typedColumn, column.DataType().Name()) - } - default: - return nil, fmt.Errorf("unsupported arrow type '%T' for '%s'", typedColumn, column.DataType().Name()) - } -} - // SetTimePrecision set precision for Metric. Valid durations include: // - time.Nanosecond // - time.Microsecond @@ -180,8 +60,8 @@ func fromColumn(column arrow.Array, idx int) (any, error) { // - once the precision has been set, it can not be changed // - insert will fail if precision does not match with the existing precision of the schema in greptimedb func (m *Metric) SetTimePrecision(precision time.Duration) error { - if !isValidPrecision(precision) { - return ErrInvalidTimePrecision + if !gutil.IsValidPrecision(precision) { + return gerr.ErrInvalidTimePrecision } m.timestampPrecision = precision return nil @@ -189,7 +69,7 @@ func (m *Metric) SetTimePrecision(precision time.Duration) error { // SetTimestampAlias helps to specify the timestamp column name, default is ts. func (m *Metric) SetTimestampAlias(alias string) error { - alias, err := toColumnName(alias) + alias, err := gutil.ToColumnName(alias) if err != nil { return err } @@ -242,9 +122,9 @@ func (m *Metric) AddSeries(s Series) error { return nil } -func (m *Metric) intoGreptimeColumn() ([]*greptimepb.Column, error) { +func (m *Metric) IntoGreptimeColumn() ([]*greptimepb.Column, error) { if len(m.series) == 0 { - return nil, ErrNoSeriesInMetric + return nil, gerr.ErrNoSeriesInMetric } result, err := m.intoDataColumns() @@ -267,7 +147,7 @@ func (m *Metric) nullMaskByteSize() int { // intoDataColumns does not contain timestamp semantic column func (m *Metric) intoDataColumns() ([]*greptimepb.Column, error) { - nullMasks := map[string]*mask{} + nullMasks := map[string]*gutil.Mask{} mappedCols := map[string]*greptimepb.Column{} for name, col := range m.columns { column := greptimepb.Column{ @@ -289,10 +169,10 @@ func (m *Metric) intoDataColumns() ([]*greptimepb.Column, error) { } else { nullMask, exist := nullMasks[name] if !exist { - nullMask = &mask{} + nullMask = &gutil.Mask{} nullMasks[name] = nullMask } - nullMask.set(uint(rowIdx)) + nullMask.Set(uint(rowIdx)) } } } @@ -312,7 +192,7 @@ func (m *Metric) intoDataColumns() ([]*greptimepb.Column, error) { } func (m *Metric) intoTimestampColumn() (*greptimepb.Column, error) { - datatype, err := precisionToDataType(m.timestampPrecision) + datatype, err := gutil.PrecisionToDataType(m.timestampPrecision) if err != nil { return nil, err } @@ -323,7 +203,7 @@ func (m *Metric) intoTimestampColumn() (*greptimepb.Column, error) { Values: &greptimepb.Column_Values{}, NullMask: nil, } - nullMask := mask{} + nullMask := gutil.Mask{} for _, s := range m.series { switch datatype { case greptimepb.ColumnDataType_TIMESTAMP_SECOND: @@ -337,7 +217,7 @@ func (m *Metric) intoTimestampColumn() (*greptimepb.Column, error) { } } - if b, err := nullMask.shrink(m.nullMaskByteSize()); err != nil { + if b, err := nullMask.Shrink(m.nullMaskByteSize()); err != nil { return nil, err } else { tsColumn.NullMask = b @@ -388,9 +268,9 @@ func setColumn(col *greptimepb.Column, val any) error { return nil } -func setNullMask(cols map[string]*greptimepb.Column, masks map[string]*mask, size int) error { +func setNullMask(cols map[string]*greptimepb.Column, masks map[string]*gutil.Mask, size int) error { for name, mask := range masks { - b, err := mask.shrink(size) + b, err := mask.Shrink(size) if err != nil { return err } diff --git a/metric_test.go b/model/metric_test.go similarity index 97% rename from metric_test.go rename to model/metric_test.go index 10b22f7..0091ee0 100644 --- a/metric_test.go +++ b/model/metric_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package greptime +package model import ( "testing" @@ -20,6 +20,8 @@ import ( greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" "github.com/stretchr/testify/assert" + + gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error" ) func TestMetric(t *testing.T) { @@ -103,7 +105,7 @@ func TestGreptimeColumn(t *testing.T) { assert.Nil(t, m.AddSeries(s1)) assert.Nil(t, m.AddSeries(s2)) - cols, err := m.intoGreptimeColumn() + cols, err := m.IntoGreptimeColumn() assert.Nil(t, err) assert.Equal(t, 9, len(cols)) @@ -300,13 +302,13 @@ func TestSetColumn(t *testing.T) { func TestSetTimePrecision(t *testing.T) { m := Metric{} err := m.SetTimePrecision(123) - assert.Equal(t, ErrInvalidTimePrecision, err) + assert.Equal(t, gerr.ErrInvalidTimePrecision, err) } func TestSetTimeAlias(t *testing.T) { m := Metric{} err := m.SetTimestampAlias("") - assert.Equal(t, ErrEmptyKey, err) + assert.Equal(t, gerr.ErrEmptyKey, err) } func TestGetTags(t *testing.T) { diff --git a/series.go b/model/series.go similarity index 97% rename from series.go rename to model/series.go index 9b7397d..4132c33 100644 --- a/series.go +++ b/model/series.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,13 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -package greptime +package model import ( "fmt" "time" greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" + + gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util" ) type column struct { @@ -200,7 +202,7 @@ func (s *Series) GetTimestamp(key string) (time.Time, bool) { } func (s *Series) add(name string, val any, semantic greptimepb.SemanticType) error { - key, err := toColumnName(name) + key, err := gutil.ToColumnName(name) if err != nil { return err } @@ -209,13 +211,13 @@ func (s *Series) add(name string, val any, semantic greptimepb.SemanticType) err s.columns = map[string]column{} } - v, err := convert(val) + v, err := gutil.Convert(val) if err != nil { return fmt.Errorf("add tag err: %w", err) } newCol := column{ - typ: v.typ, + typ: v.Type, semantic: semantic, } if col, seen := s.columns[key]; seen { @@ -229,7 +231,7 @@ func (s *Series) add(name string, val any, semantic greptimepb.SemanticType) err if s.vals == nil { s.vals = map[string]any{} } - s.vals[key] = v.val + s.vals[key] = v.Val return nil } diff --git a/series_test.go b/model/series_test.go similarity index 99% rename from series_test.go rename to model/series_test.go index 1fa132e..be0f724 100644 --- a/series_test.go +++ b/model/series_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package greptime +package model import ( "fmt" diff --git a/prom/api.go b/prom/api.go deleted file mode 100644 index 31f76ec..0000000 --- a/prom/api.go +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prom - -import ( - "encoding/json" - "fmt" - "strings" - - "github.com/prometheus/common/model" -) - -// apiResponse implements error interface -type apiResponse struct { - Status string `json:"status"` - Data json.RawMessage `json:"data"` - - Type string `json:"errorType"` - Msg string `json:"error"` -} - -func (r *apiResponse) isError() bool { - return !strings.EqualFold(r.Status, "success") -} - -// IsRateLimited checkes if this error is caused by rate limit restriction -func (e *apiResponse) isRateLimited() bool { - return strings.EqualFold(e.Type, "RateLimited") -} - -func (e *apiResponse) Error() string { - return fmt.Sprintf("%s: %s", e.Type, e.Msg) -} - -func UnmarshalApiResponse(body []byte) (*QueryResult, error) { - var resp apiResponse - if err := json.Unmarshal(body, &resp); err != nil { - return nil, err - } - - if resp.isError() { - return nil, &resp - } - - var res QueryResult - if err := json.Unmarshal(resp.Data, &res); err != nil { - fmt.Printf("failed to unmarshal range promql, body:%s, err: %+v", resp.Data, err) - return nil, err - } - - return &res, nil -} - -func IsRateLimitedError(err error) bool { - e, ok := err.(*apiResponse) - return ok && e.isRateLimited() -} - -// QueryResult contains result data for a query. -type QueryResult struct { - Type model.ValueType `json:"resultType"` - Result any `json:"result"` - - // The decoded value. - Val model.Value -} - -func (qr *QueryResult) UnmarshalJSON(b []byte) error { - v := struct { - Type model.ValueType `json:"resultType"` - Result json.RawMessage `json:"result"` - }{} - - err := json.Unmarshal(b, &v) - if err != nil { - return err - } - - qr.Type = v.Type - - switch v.Type { - case model.ValScalar: - var sv model.Scalar - err = json.Unmarshal(v.Result, &sv) - qr.Val = &sv - - case model.ValVector: - var vv model.Vector - err = json.Unmarshal(v.Result, &vv) - qr.Val = vv - - case model.ValMatrix: - var mv model.Matrix - err = json.Unmarshal(v.Result, &mv) - qr.Val = mv - - default: - err = fmt.Errorf("unexpected value type %q with data: '%s'", v.Type.String(), string(b)) - } - return err -} diff --git a/prom/api_test.go b/prom/api_test.go deleted file mode 100644 index 91f816c..0000000 --- a/prom/api_test.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prom - -import ( - "encoding/json" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestApiResponseError(t *testing.T) { - data, err := json.Marshal(map[string]any{"resultType": "", "result": []byte{}}) - assert.Nil(t, err) - - resp := apiResponse{ - Status: "error", - Data: data, - Type: "RateLimited", - Msg: "Read request banned for xxx until 1692722423416", - } - b, err := json.Marshal(resp) - assert.Nil(t, err) - res, err := UnmarshalApiResponse(b) - - assert.Nil(t, res) - assert.True(t, IsRateLimitedError(err)) - - e, ok := err.(*apiResponse) - assert.True(t, ok) - assert.Equal(t, "error", e.Status) - assert.NotEmpty(t, e.Data) - assert.Equal(t, "RateLimited", e.Type) - assert.Equal(t, "Read request banned for xxx until 1692722423416", e.Msg) -} diff --git a/query.go b/query.go deleted file mode 100644 index 8259077..0000000 --- a/query.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package greptime - -import ( - greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" -) - -type query interface { - // buildGreptimeRequest helps to construct a normal request, and the response - // is in Metric and Series - buildGreptimeRequest(header *greptimepb.RequestHeader) (*greptimepb.GreptimeRequest, error) - - // buildPromqlRequest helps to construct a promql request, and the response - // is absolutely the same as Prometheus - buildPromqlRequest(header *greptimepb.RequestHeader) (*greptimepb.PromqlRequest, error) -} - -// QueryRequest helps to query data from greptimedb, and the response is in Metric. -// But if you expect the response format is the same as Prometheus, you should consider -// [PromqlRequest]. -// -// At least one of Sql, InstantPromql, RangePromql MUST be specified. -// If multiple fields are specified, the field specified later will be used -type QueryRequest struct { - header reqHeader - query query -} - -func NewQueryRequest() *QueryRequest { - return &QueryRequest{} -} - -// WithDatabase helps to specify different database from the default one. -func (r *QueryRequest) WithDatabase(database string) *QueryRequest { - r.header = reqHeader{ - database: database, - } - return r -} - -func (r *QueryRequest) WithSql(sql string) *QueryRequest { - r.query = &Sql{sql: sql} - return r -} - -func (r *QueryRequest) WithInstantPromql(instantPromql *InstantPromql) *QueryRequest { - r.query = instantPromql - return r -} - -func (r *QueryRequest) WithRangePromql(rangePromql *RangePromql) *QueryRequest { - r.query = rangePromql - return r -} - -func (r *QueryRequest) buildGreptimeRequest(cfg *Config) (*greptimepb.GreptimeRequest, error) { - header, err := r.header.build(cfg) - if err != nil { - return nil, err - } - - if r.query == nil { - return nil, ErrEmptyQuery - } - - return r.query.buildGreptimeRequest(header) -} - -func (r *QueryRequest) buildPromqlRequest(cfg *Config) (*greptimepb.PromqlRequest, error) { - header, err := r.header.build(cfg) - if err != nil { - return nil, err - } - - if r.query == nil { - return nil, ErrEmptyQuery - } - - return r.query.buildPromqlRequest(header) -} diff --git a/query_promql.go b/query_promql.go deleted file mode 100644 index afb063a..0000000 --- a/query_promql.go +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package greptime - -import ( - "fmt" - "strconv" - "time" - - greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" -) - -var ( - _ query = (*InstantPromql)(nil) - _ query = (*RangePromql)(nil) -) - -// InstantPromql helps to fire a request to greptimedb compatible with Prometheus instant query, -// you can visit [instant query] for detail. -// -// [instant query]: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries -type InstantPromql struct { - Query string - Ts time.Time -} - -func NewInstantPromql(query string) *InstantPromql { - return &InstantPromql{query, time.Now()} -} - -// WithQuery helps to update the query -func (ip *InstantPromql) WithQuery(query string) *InstantPromql { - ip.Query = query - return ip -} - -// WithTime to specify the evaluation time. Default is now. -func (ip *InstantPromql) WithTime(ts time.Time) *InstantPromql { - ip.Ts = ts - return ip -} - -func (ip *InstantPromql) check() error { - if isEmptyString(ip.Query) { - return ErrEmptyPromql - } - - return nil -} - -func (ip *InstantPromql) buildGreptimeRequest(header *greptimepb.RequestHeader) (*greptimepb.GreptimeRequest, error) { - return nil, ErrNotImplemented -} - -func (ip *InstantPromql) buildPromqlRequest(header *greptimepb.RequestHeader) (*greptimepb.PromqlRequest, error) { - if err := ip.check(); err != nil { - return nil, err - } - - promql := &greptimepb.PromqlRequest_InstantQuery{ - InstantQuery: &greptimepb.PromInstantQuery{ - Query: ip.Query, - }, - } - - if !ip.Ts.IsZero() { - promql.InstantQuery.Time = fmt.Sprintf("%d", ip.Ts.Unix()) - } - - request := &greptimepb.PromqlRequest{ - Header: header, - Promql: promql, - } - - return request, nil -} - -// RangePromql helps to fire a request to greptimedb compatible with Prometheus range query, -// you can visit [range query] for detail. -// -// [range query]: https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries -type RangePromql struct { - Query string - Start time.Time - End time.Time - Step time.Duration -} - -func NewRangePromql(query string) *RangePromql { - return &RangePromql{ - Query: query, - } -} - -// WithQuery helps to update the query -func (rp *RangePromql) WithQuery(query string) *RangePromql { - rp.Query = query - return rp -} - -// WithStart helps to specify the start of the range -func (rp *RangePromql) WithStart(start time.Time) *RangePromql { - rp.Start = start - return rp -} - -// WithEnd helps to specify the end of the range -func (rp *RangePromql) WithEnd(end time.Time) *RangePromql { - rp.End = end - return rp -} - -// WithStep helps to specify the step of the range -func (rp *RangePromql) WithStep(step time.Duration) *RangePromql { - rp.Step = step - return rp -} - -func (rp *RangePromql) check() error { - if isEmptyString(rp.Query) { - return ErrEmptyPromql - } - - if rp.Start.IsZero() || rp.End.IsZero() { - return ErrEmptyRange - } - - if rp.Step <= 0 { - return ErrEmptyStep - } - - return nil -} - -func (rp *RangePromql) toGreptimedbPromRangeQuery() *greptimepb.PromRangeQuery { - return &greptimepb.PromRangeQuery{ - Query: rp.Query, - Start: fmt.Sprintf("%d", rp.Start.Unix()), - End: fmt.Sprintf("%d", rp.End.Unix()), - Step: strconv.FormatFloat(rp.Step.Seconds(), 'f', -1, 64), - } -} - -func (rp *RangePromql) buildGreptimeRequest(header *greptimepb.RequestHeader) (*greptimepb.GreptimeRequest, error) { - if err := rp.check(); err != nil { - return nil, err - } - - request := &greptimepb.GreptimeRequest_Query{ - Query: &greptimepb.QueryRequest{ - Query: &greptimepb.QueryRequest_PromRangeQuery{ - PromRangeQuery: rp.toGreptimedbPromRangeQuery(), - }, - }, - } - - return &greptimepb.GreptimeRequest{ - Header: header, - Request: request, - }, nil -} - -func (rp *RangePromql) buildPromqlRequest(header *greptimepb.RequestHeader) (*greptimepb.PromqlRequest, error) { - request := &greptimepb.PromqlRequest{ - Header: header, - Promql: &greptimepb.PromqlRequest_RangeQuery{ - RangeQuery: rp.toGreptimedbPromRangeQuery(), - }, - } - - return request, nil -} diff --git a/query_promql_test.go b/query_promql_test.go deleted file mode 100644 index 2651db3..0000000 --- a/query_promql_test.go +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package greptime - -import ( - "context" - "testing" - "time" - - "github.com/GreptimeTeam/greptimedb-client-go/prom" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -func getClient(t *testing.T) *Client { - options := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - } - cfg := NewCfg(host). - WithPort(grpcPort). - WithDatabase(database). - WithDialOptions(options...) - client, err := NewClient(cfg) - assert.Nil(t, err) - assert.NotNil(t, client) - return client -} - -func insert(t *testing.T, client *Client, table string, value float64, secs int64) { - series := Series{} - series.AddTag("host", "127.0.0.1") - series.SetTimestamp(time.Unix(secs, 0)) - series.AddField("val", value) - - metric := Metric{} - metric.AddSeries(series) - - insert := InsertRequest{} - insert.WithTable(table).WithMetric(metric) - - inserts := InsertsRequest{} - inserts.WithDatabase(database).Append(insert) - - resp, err := client.Insert(context.Background(), inserts) - assert.Nil(t, err) - assert.True(t, ParseRespHeader(resp).IsSuccess()) - assert.False(t, ParseRespHeader(resp).IsRateLimited()) - assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue()) -} - -func TestRangePromql(t *testing.T) { - table := "test_range_promql" - var secs int64 = 1677728740 - val := 0.45 - client := getClient(t) - insert(t, client, table, val, secs) - - rp := NewRangePromql(table).WithStart(time.Unix(secs, 0)).WithEnd(time.Unix(secs, 0)).WithStep(time.Second) - req := NewQueryRequest().WithRangePromql(rp).WithDatabase(database) - resp, err := client.PromqlQuery(context.Background(), *req) - - assert.Nil(t, err) - assert.True(t, ParseRespHeader(resp).IsSuccess()) - assert.False(t, ParseRespHeader(resp).IsRateLimited()) - - result, err := prom.UnmarshalApiResponse(resp.GetBody()) - assert.Nil(t, err) - assert.NotNil(t, result.Val) - - assert.Equal(t, model.ValMatrix, result.Val.Type()) - matrix, ok := result.Val.(model.Matrix) - assert.True(t, ok) - assert.Equal(t, 1, matrix.Len()) - - sample := matrix[0] - assert.Equal(t, table, string(sample.Metric["__name__"])) - assert.Equal(t, 1, len(sample.Values)) - assert.Equal(t, val, float64(sample.Values[0].Value)) -} - -func TestInstantPromql(t *testing.T) { - table := "test_instant_promql" - var secs int64 = 1677728740 - val := 0.45 - client := getClient(t) - insert(t, client, table, val, secs) - - promql := NewInstantPromql(table).WithTime(time.Unix(secs, 0)) - req := NewQueryRequest().WithInstantPromql(promql) - resp, err := client.PromqlQuery(context.Background(), *req) - - assert.Nil(t, err) - assert.True(t, ParseRespHeader(resp).IsSuccess()) - assert.False(t, ParseRespHeader(resp).IsRateLimited()) - - result, err := prom.UnmarshalApiResponse(resp.GetBody()) - assert.Nil(t, err) - assert.NotNil(t, result.Val) - - assert.Equal(t, model.ValVector, result.Val.Type()) - vectors, ok := result.Val.(model.Vector) - assert.True(t, ok) - assert.Equal(t, 1, len(vectors)) - vector := vectors[0] - - assert.Equal(t, table, string(vector.Metric["__name__"])) - assert.Equal(t, val, float64(vector.Value)) -} - -func TestRangePromqlEmptyStep(t *testing.T) { - rp := RangePromql{ - Query: "up", - Start: time.Unix(1677728740, 0), - End: time.Unix(1677728740, 0), - } - - assert.ErrorIs(t, rp.check(), ErrEmptyStep) -} diff --git a/query_sql.go b/query_sql.go deleted file mode 100644 index d3b1219..0000000 --- a/query_sql.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package greptime - -import ( - greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" -) - -// Sql helps to fire a request to greptimedb in SQL. It can not be used -// as Promql Query -type Sql struct { - sql string -} - -var ( - _ query = (*Sql)(nil) -) - -func (s *Sql) buildGreptimeRequest(header *greptimepb.RequestHeader) (*greptimepb.GreptimeRequest, error) { - if isEmptyString(s.sql) { - return nil, ErrEmptySql - } - - request := &greptimepb.GreptimeRequest_Query{ - Query: &greptimepb.QueryRequest{ - Query: &greptimepb.QueryRequest_Sql{Sql: s.sql}, - }, - } - - return &greptimepb.GreptimeRequest{ - Header: header, - Request: request, - }, nil -} - -func (s *Sql) buildPromqlRequest(header *greptimepb.RequestHeader) (*greptimepb.PromqlRequest, error) { - return nil, ErrSqlInPromql -} diff --git a/request_test.go b/request_test.go deleted file mode 100644 index a5e5b1b..0000000 --- a/request_test.go +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package greptime - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestQueryBuildGreptimeRequest(t *testing.T) { - rb := &QueryRequest{} - request, err := rb.buildGreptimeRequest(&Config{}) - assert.Nil(t, request) - assert.ErrorIs(t, err, ErrEmptyDatabase) - - rb.WithDatabase("disk_usage") - request, err = rb.buildGreptimeRequest(&Config{}) - assert.Nil(t, request) - assert.ErrorIs(t, err, ErrEmptyQuery) - - // test Sql - rb.WithSql("select * from monitor") - request, err = rb.buildGreptimeRequest(&Config{}) - assert.NotNil(t, request) - assert.Nil(t, err) - - // test instant promql - rb.WithInstantPromql(NewInstantPromql("up == 0")) - request, err = rb.buildGreptimeRequest(&Config{}) - assert.Nil(t, request) - assert.ErrorIs(t, err, ErrNotImplemented) - - // test range promql - rp := &RangePromql{ - Query: "up == 0", - Start: time.Now(), - End: time.Now(), - Step: time.Second * 10, - } - rb.WithRangePromql(rp) - request, err = rb.buildGreptimeRequest(&Config{}) - assert.NotNil(t, request) - assert.Nil(t, err) -} - -func TestQueryBuildPromqlRequest(t *testing.T) { - rb := &QueryRequest{} - request, err := rb.buildPromqlRequest(&Config{}) - assert.Nil(t, request) - assert.ErrorIs(t, err, ErrEmptyDatabase) - - rb.WithDatabase("disk_usage") - request, err = rb.buildPromqlRequest(&Config{}) - assert.Nil(t, request) - assert.ErrorIs(t, err, ErrEmptyQuery) - - // test Sql - rb.WithSql("select * from monitor") - request, err = rb.buildPromqlRequest(&Config{}) - assert.Nil(t, request) - assert.ErrorIs(t, err, ErrSqlInPromql) - - // test instant promql - rb.WithInstantPromql(NewInstantPromql("up == 0")) - request, err = rb.buildPromqlRequest(&Config{}) - assert.NotNil(t, request) - assert.Nil(t, err) - - // test range promql - rp := &RangePromql{ - Query: "up == 0", - Start: time.Now(), - End: time.Now(), - Step: time.Second * 10, - } - rb.WithRangePromql(rp) - request, err = rb.buildPromqlRequest(&Config{}) - assert.NotNil(t, request) - assert.Nil(t, err) -} - -func TestInsertBuilder(t *testing.T) { - cfg := &Config{} - r := InsertRequest{} - - // empty table - req, err := r.build() - assert.Equal(t, ErrEmptyTable, err) - assert.Nil(t, req) - - // empty series - r.WithTable("monitor") - req, err = r.build() - assert.Equal(t, ErrNoSeriesInMetric, err) - assert.Nil(t, req) - - series := Series{} - series.AddTag("host", "fake host") - series.AddField("memory", 2.3) - series.SetTimestamp(time.Now()) - metric := Metric{} - metric.AddSeries(series) - r.WithMetric(metric) - - rs := InsertsRequest{} - - // empty database - reqs, err := rs.build(cfg) - assert.Equal(t, ErrEmptyDatabase, err) - assert.Nil(t, reqs) - - // empty inserts - rs.WithDatabase("public") - reqs, err = rs.build(cfg) - assert.Equal(t, ErrEmptyInserts, err) - assert.Nil(t, reqs) - - // normal - rs.Append(r) - reqs, err = rs.build(cfg) - assert.Nil(t, err) - assert.NotNil(t, reqs) -} diff --git a/stream_client.go b/stream_client.go index e3c4865..bf48d87 100644 --- a/stream_client.go +++ b/stream_client.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,18 +19,20 @@ import ( greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" "google.golang.org/grpc" + + "github.com/GreptimeTeam/greptimedb-ingester-go/config" ) // StreamClient is only for inserting type StreamClient struct { client greptimepb.GreptimeDatabase_HandleRequestsClient - cfg *Config + cfg *config.Config } // NewStreamClient helps to create a stream insert client. // If Client has performance issue, you can try the stream client. -func NewStreamClient(cfg *Config) (*StreamClient, error) { - conn, err := grpc.Dial(cfg.getGRPCAddr(), cfg.DialOptions...) +func NewStreamClient(cfg *config.Config) (*StreamClient, error) { + conn, err := grpc.Dial(cfg.GetGRPCAddr(), cfg.DialOptions...) if err != nil { return nil, err } diff --git a/stream_client_test.go b/stream_client_test.go index 9958a5d..6abb751 100644 --- a/stream_client_test.go +++ b/stream_client_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,109 +15,8 @@ package greptime import ( - "context" - "fmt" "testing" - "time" - - "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) func TestStreamInsert(t *testing.T) { - table := "test_stream_insert" - genBatchInsertionData := func(size int) []monitor { - monitors := make([]monitor, 0, size) - for i := 0; i < size; i++ { - ts := time.Now().UnixMilli() - one := monitor{ - host: "127.0.0.1", - // default precision is millisecond, this conversion - // is to make the Equal assertion passed - ts: time.UnixMilli(ts), // you can directly use time.Now() - memory: 22, - cpu: 0.45, - temperature: -1, - isAuthed: true, - } - - monitors = append(monitors, one) - time.Sleep(time.Millisecond) - } - return monitors - } - - // Insert - options := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - } - cfg := NewCfg(host).WithPort(grpcPort).WithDatabase(database).WithDialOptions(options...).WithCallOptions() - streamClient, err := NewStreamClient(cfg) - assert.Nil(t, err) - - size := 10 - insertMonitors := genBatchInsertionData(size) - for _, monitor := range insertMonitors { - metric := Metric{} - - series := Series{} - series.AddTag("host", monitor.host) - series.SetTimestamp(monitor.ts) - series.AddField("memory", monitor.memory) - series.AddField("cpu", monitor.cpu) - series.AddField("temperature", monitor.temperature) - series.AddField("is_authed", monitor.isAuthed) - metric.AddSeries(series) - - req := InsertRequest{} - req.WithTable(table).WithMetric(metric) - - reqs := InsertsRequest{} - reqs.Append(req) - - err = streamClient.Send(context.Background(), reqs) - assert.Nil(t, err) - } - - affectedRows, err := streamClient.CloseAndRecv(context.Background()) - assert.Nil(t, err) - assert.Equal(t, uint32(size), affectedRows.Value) - - // Query with metric - queryReq := QueryRequest{} - queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s", table)) - - client, err := NewClient(cfg) - assert.Nil(t, err) - resMetric, err := client.Query(context.Background(), queryReq) - assert.Nil(t, err) - assert.Equal(t, size, len(resMetric.GetSeries())) - - queryMonitors := []monitor{} - for _, series := range resMetric.GetSeries() { - host, ok := series.Get("host") - assert.True(t, ok) - - ts, ok := series.GetTimestamp("ts") - assert.True(t, ok) - - temperature, ok := series.Get("temperature") - assert.True(t, ok) - memory, ok := series.Get("memory") - assert.True(t, ok) - cpu, ok := series.Get("cpu") - assert.True(t, ok) - isAuthed, ok := series.Get("is_authed") - assert.True(t, ok) - queryMonitors = append(queryMonitors, monitor{ - host: host.(string), - ts: ts, - memory: memory.(uint64), - cpu: cpu.(float64), - temperature: temperature.(int64), - isAuthed: isAuthed.(bool), - }) - } - assert.Equal(t, insertMonitors, queryMonitors) } diff --git a/mask.go b/util/mask.go similarity index 87% rename from mask.go rename to util/mask.go index d686599..00858c4 100644 --- a/mask.go +++ b/util/mask.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package greptime +package util import ( "bytes" @@ -22,19 +22,19 @@ import ( ) // Mask is help to set null bits. -type mask struct { +type Mask struct { bs bitset.BitSet } // set is to set which position is to be set to 1 -func (n *mask) set(idx uint) *mask { +func (n *Mask) Set(idx uint) *Mask { n.bs.Set(idx) return n } // shrink is to help to generate the bytes number the caller is interested // via LittleEndian -func (n *mask) shrink(bSize int) ([]byte, error) { +func (n *Mask) Shrink(bSize int) ([]byte, error) { if n.bs.Len() == 0 { return nil, nil } diff --git a/mask_test.go b/util/mask_test.go similarity index 87% rename from mask_test.go rename to util/mask_test.go index 5899486..29b8287 100644 --- a/mask_test.go +++ b/util/mask_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package greptime +package util import ( "testing" @@ -25,10 +25,10 @@ import ( // // []byte{1, 4} // LittleEndian func TestMask(t *testing.T) { - mask := mask{} - mask.set(0).set(10) + mask := Mask{} + mask.Set(0).Set(10) - b, err := mask.shrink(2) + b, err := mask.Shrink(2) assert.Nil(t, err) assert.Equal(t, []byte{1, 4}, b) } diff --git a/util.go b/util/util.go similarity index 87% rename from util.go rename to util/util.go index ce66e47..7cffdb9 100644 --- a/util.go +++ b/util/util.go @@ -1,4 +1,4 @@ -// Copyright 2023 Greptime Team +// Copyright 2024 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package greptime +package util import ( "fmt" @@ -21,18 +21,20 @@ import ( greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" "github.com/stoewer/go-strcase" + + gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error" ) -type value struct { - val any - typ greptimepb.ColumnDataType +type Value struct { + Val any + Type greptimepb.ColumnDataType } -func newValue(val any, typ greptimepb.ColumnDataType) *value { - return &value{val, typ} +func newValue(val any, typ greptimepb.ColumnDataType) *Value { + return &Value{val, typ} } -func convert(v any) (*value, error) { +func Convert(v any) (*Value, error) { switch t := v.(type) { case bool: return newValue(t, greptimepb.ColumnDataType_BOOLEAN), nil @@ -104,14 +106,14 @@ func convert(v any) (*value, error) { } } -func isValidPrecision(t time.Duration) bool { +func IsValidPrecision(t time.Duration) bool { return t == time.Second || t == time.Millisecond || t == time.Microsecond || t == time.Nanosecond } -func precisionToDataType(d time.Duration) (greptimepb.ColumnDataType, error) { +func PrecisionToDataType(d time.Duration) (greptimepb.ColumnDataType, error) { // if the precision has not been set, use default precision `time.Millisecond` if d == 0 { d = time.Millisecond @@ -126,19 +128,19 @@ func precisionToDataType(d time.Duration) (greptimepb.ColumnDataType, error) { case time.Nanosecond: return greptimepb.ColumnDataType_TIMESTAMP_NANOSECOND, nil default: - return 0, ErrInvalidTimePrecision + return 0, gerr.ErrInvalidTimePrecision } } -func isEmptyString(s string) bool { +func IsEmptyString(s string) bool { return len(strings.TrimSpace(s)) == 0 } -func toColumnName(s string) (string, error) { +func ToColumnName(s string) (string, error) { s = strings.TrimSpace(s) if len(s) == 0 { - return "", ErrEmptyKey + return "", gerr.ErrEmptyKey } if len(s) >= 100 { diff --git a/util/util_test.go b/util/util_test.go new file mode 100644 index 0000000..e08da62 --- /dev/null +++ b/util/util_test.go @@ -0,0 +1,315 @@ +// Copyright 2024 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "strings" + "testing" + "time" + + greptime "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" + "github.com/stretchr/testify/assert" + + gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error" +) + +func TestConvertValue(t *testing.T) { + // bool + var expectBool bool = true + val, err := Convert(expectBool) + assert.Nil(t, err) + assert.Equal(t, expectBool, val.Val) + assert.Equal(t, greptime.ColumnDataType_BOOLEAN, val.Type) + + // string + var expectString string = "string" + val, err = Convert(expectString) + assert.Nil(t, err) + assert.Equal(t, expectString, val.Val) + assert.Equal(t, greptime.ColumnDataType_STRING, val.Type) + + // bytes + var expectBytes []byte = []byte("bytes") + val, err = Convert(expectBytes) + assert.Nil(t, err) + assert.Equal(t, []byte("bytes"), val.Val) + assert.Equal(t, greptime.ColumnDataType_BINARY, val.Type) + + // float64 + var expectFloat64 float64 = float64(64.0) + val, err = Convert(expectFloat64) + assert.Nil(t, err) + assert.Equal(t, expectFloat64, val.Val) + assert.Equal(t, greptime.ColumnDataType_FLOAT64, val.Type) + + // float32 + var expectFloat32 float32 = float32(32.0) + val, err = Convert(expectFloat32) + assert.Nil(t, err) + assert.Equal(t, expectFloat32, val.Val) + assert.Equal(t, greptime.ColumnDataType_FLOAT32, val.Type) + + // uint + var originUint uint = uint(64) + var expectUint uint64 = uint64(64) + val, err = Convert(originUint) + assert.Nil(t, err) + assert.Equal(t, expectUint, val.Val) + assert.Equal(t, greptime.ColumnDataType_UINT64, val.Type) + + // uint64 + var expectUint64 uint64 = uint64(64) + val, err = Convert(expectUint64) + assert.Nil(t, err) + assert.Equal(t, expectUint64, val.Val) + assert.Equal(t, greptime.ColumnDataType_UINT64, val.Type) + + // uint32 + var expectUint32 uint32 = uint32(32) + val, err = Convert(expectUint32) + assert.Nil(t, err) + assert.Equal(t, expectUint32, val.Val) + assert.Equal(t, greptime.ColumnDataType_UINT32, val.Type) + + // uint16 + var expectUint16 uint16 = uint16(16) + val, err = Convert(expectUint16) + assert.Nil(t, err) + assert.Equal(t, expectUint16, val.Val) + assert.Equal(t, greptime.ColumnDataType_UINT16, val.Type) + + // uint8 + var expectUint8 uint8 = uint8(8) + val, err = Convert(expectUint8) + assert.Nil(t, err) + assert.Equal(t, expectUint8, val.Val) + assert.Equal(t, greptime.ColumnDataType_UINT8, val.Type) + + // int + var originInt int = int(64) + var expectInt int64 = int64(64) + val, err = Convert(originInt) + assert.Nil(t, err) + assert.Equal(t, expectInt, val.Val) + assert.Equal(t, greptime.ColumnDataType_INT64, val.Type) + + // int64 + var expectInt64 int64 = int64(64) + val, err = Convert(expectInt64) + assert.Nil(t, err) + assert.Equal(t, expectInt64, val.Val) + assert.Equal(t, greptime.ColumnDataType_INT64, val.Type) + + // int32 + var expectInt32 int32 = int32(32) + val, err = Convert(expectInt32) + assert.Nil(t, err) + assert.Equal(t, expectInt32, val.Val) + assert.Equal(t, greptime.ColumnDataType_INT32, val.Type) + + // int16 + var expectInt16 int16 = int16(16) + val, err = Convert(expectInt16) + assert.Nil(t, err) + assert.Equal(t, expectInt16, val.Val) + assert.Equal(t, greptime.ColumnDataType_INT16, val.Type) + + // int8 + var expectInt8 int8 = int8(8) + val, err = Convert(expectInt8) + assert.Nil(t, err) + assert.Equal(t, expectInt8, val.Val) + assert.Equal(t, greptime.ColumnDataType_INT8, val.Type) + + // time.Time + var originTime time.Time = time.UnixMilli(1677571339623) + // var expectTime int64 = int64(1677571339623) + val, err = Convert(originTime) + assert.Nil(t, err) + assert.Equal(t, originTime, val.Val) + assert.Equal(t, greptime.ColumnDataType_DATETIME, val.Type) + + // type not supported + _, err = Convert(time.April) + assert.NotNil(t, err) + _, err = Convert(map[string]any{}) + assert.NotNil(t, err) + _, err = Convert(func() {}) + assert.NotNil(t, err) + +} + +func TestConvertValuePtr(t *testing.T) { + // bool + var expectBool bool = true + val, err := Convert(&expectBool) + assert.Nil(t, err) + assert.Equal(t, expectBool, val.Val) + assert.Equal(t, greptime.ColumnDataType_BOOLEAN, val.Type) + + // string + var expectString string = "string" + val, err = Convert(&expectString) + assert.Nil(t, err) + assert.Equal(t, expectString, val.Val) + assert.Equal(t, greptime.ColumnDataType_STRING, val.Type) + + // bytes + var expectBytes []byte = []byte("bytes") + val, err = Convert(&expectBytes) + assert.Nil(t, err) + assert.Equal(t, []byte("bytes"), val.Val) + assert.Equal(t, greptime.ColumnDataType_BINARY, val.Type) + + // float64 + var expectFloat64 float64 = float64(64.0) + val, err = Convert(&expectFloat64) + assert.Nil(t, err) + assert.Equal(t, expectFloat64, val.Val) + assert.Equal(t, greptime.ColumnDataType_FLOAT64, val.Type) + + // float32 + var expectFloat32 float32 = float32(32.0) + val, err = Convert(&expectFloat32) + assert.Nil(t, err) + assert.Equal(t, expectFloat32, val.Val) + assert.Equal(t, greptime.ColumnDataType_FLOAT32, val.Type) + + // uint + var originUint uint = uint(64) + var expectUint uint64 = uint64(64) + val, err = Convert(&originUint) + assert.Nil(t, err) + assert.Equal(t, expectUint, val.Val) + assert.Equal(t, greptime.ColumnDataType_UINT64, val.Type) + + // uint64 + var expectUint64 uint64 = uint64(64) + val, err = Convert(&expectUint64) + assert.Nil(t, err) + assert.Equal(t, expectUint64, val.Val) + assert.Equal(t, greptime.ColumnDataType_UINT64, val.Type) + + // uint32 + var expectUint32 uint32 = uint32(32) + val, err = Convert(&expectUint32) + assert.Nil(t, err) + assert.Equal(t, expectUint32, val.Val) + assert.Equal(t, greptime.ColumnDataType_UINT32, val.Type) + + // uint16 + var expectUint16 uint16 = uint16(16) + val, err = Convert(&expectUint16) + assert.Nil(t, err) + assert.Equal(t, expectUint16, val.Val) + assert.Equal(t, greptime.ColumnDataType_UINT16, val.Type) + + // uint8 + var expectUint8 uint8 = uint8(8) + val, err = Convert(&expectUint8) + assert.Nil(t, err) + assert.Equal(t, expectUint8, val.Val) + assert.Equal(t, greptime.ColumnDataType_UINT8, val.Type) + + // int + var originInt int = int(64) + var expectInt int64 = int64(64) + val, err = Convert(&originInt) + assert.Nil(t, err) + assert.Equal(t, expectInt, val.Val) + assert.Equal(t, greptime.ColumnDataType_INT64, val.Type) + + // int64 + var expectInt64 int64 = int64(64) + val, err = Convert(&expectInt64) + assert.Nil(t, err) + assert.Equal(t, expectInt64, val.Val) + assert.Equal(t, greptime.ColumnDataType_INT64, val.Type) + + // int32 + var expectInt32 int32 = int32(32) + val, err = Convert(&expectInt32) + assert.Nil(t, err) + assert.Equal(t, expectInt32, val.Val) + assert.Equal(t, greptime.ColumnDataType_INT32, val.Type) + + // int16 + var expectInt16 int16 = int16(16) + val, err = Convert(&expectInt16) + assert.Nil(t, err) + assert.Equal(t, expectInt16, val.Val) + assert.Equal(t, greptime.ColumnDataType_INT16, val.Type) + + // int8 + var expectInt8 int8 = int8(8) + val, err = Convert(&expectInt8) + assert.Nil(t, err) + assert.Equal(t, expectInt8, val.Val) + assert.Equal(t, greptime.ColumnDataType_INT8, val.Type) + + // time.Time + var originTime time.Time = time.UnixMilli(1677571339623) + // var expectTime int64 = int64(1677571339623) + val, err = Convert(&originTime) + assert.Nil(t, err) + assert.Equal(t, originTime, val.Val) + assert.Equal(t, greptime.ColumnDataType_DATETIME, val.Type) + + // type not supported + _, err = Convert(&map[string]any{}) + assert.NotNil(t, err) +} + +func TestEmptyString(t *testing.T) { + assert.True(t, IsEmptyString("")) + assert.True(t, IsEmptyString(" ")) + assert.True(t, IsEmptyString(" ")) + assert.True(t, IsEmptyString("\t")) +} + +func TestColumnName(t *testing.T) { + key, err := ToColumnName("ts ") + assert.Nil(t, err) + assert.Equal(t, "ts", key) + + key, err = ToColumnName(" Ts") + assert.Nil(t, err) + assert.Equal(t, "ts", key) + + key, err = ToColumnName(" TS ") + assert.Nil(t, err) + assert.Equal(t, "ts", key) + + key, err = ToColumnName("DiskUsage ") + assert.Nil(t, err) + assert.Equal(t, "disk_usage", key) + + key, err = ToColumnName("Disk-Usage") + assert.Nil(t, err) + assert.Equal(t, "disk_usage", key) + + key, err = ToColumnName(" ") + assert.NotNil(t, err) + assert.Equal(t, "", key) + + key, err = ToColumnName(strings.Repeat("timestamp", 20)) + assert.NotNil(t, err) + assert.Equal(t, "", key) +} + +func TestPrecisionToDataType(t *testing.T) { + _, err := PrecisionToDataType(123) + assert.Equal(t, gerr.ErrInvalidTimePrecision, err) +} diff --git a/util_test.go b/util_test.go deleted file mode 100644 index f625065..0000000 --- a/util_test.go +++ /dev/null @@ -1,314 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package greptime - -import ( - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - greptime "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" -) - -func TestConvertValue(t *testing.T) { - // bool - var expectBool bool = true - val, err := convert(expectBool) - assert.Nil(t, err) - assert.Equal(t, expectBool, val.val) - assert.Equal(t, greptime.ColumnDataType_BOOLEAN, val.typ) - - // string - var expectString string = "string" - val, err = convert(expectString) - assert.Nil(t, err) - assert.Equal(t, expectString, val.val) - assert.Equal(t, greptime.ColumnDataType_STRING, val.typ) - - // bytes - var expectBytes []byte = []byte("bytes") - val, err = convert(expectBytes) - assert.Nil(t, err) - assert.Equal(t, []byte("bytes"), val.val) - assert.Equal(t, greptime.ColumnDataType_BINARY, val.typ) - - // float64 - var expectFloat64 float64 = float64(64.0) - val, err = convert(expectFloat64) - assert.Nil(t, err) - assert.Equal(t, expectFloat64, val.val) - assert.Equal(t, greptime.ColumnDataType_FLOAT64, val.typ) - - // float32 - var expectFloat32 float32 = float32(32.0) - val, err = convert(expectFloat32) - assert.Nil(t, err) - assert.Equal(t, expectFloat32, val.val) - assert.Equal(t, greptime.ColumnDataType_FLOAT32, val.typ) - - // uint - var originUint uint = uint(64) - var expectUint uint64 = uint64(64) - val, err = convert(originUint) - assert.Nil(t, err) - assert.Equal(t, expectUint, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT64, val.typ) - - // uint64 - var expectUint64 uint64 = uint64(64) - val, err = convert(expectUint64) - assert.Nil(t, err) - assert.Equal(t, expectUint64, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT64, val.typ) - - // uint32 - var expectUint32 uint32 = uint32(32) - val, err = convert(expectUint32) - assert.Nil(t, err) - assert.Equal(t, expectUint32, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT32, val.typ) - - // uint16 - var expectUint16 uint16 = uint16(16) - val, err = convert(expectUint16) - assert.Nil(t, err) - assert.Equal(t, expectUint16, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT16, val.typ) - - // uint8 - var expectUint8 uint8 = uint8(8) - val, err = convert(expectUint8) - assert.Nil(t, err) - assert.Equal(t, expectUint8, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT8, val.typ) - - // int - var originInt int = int(64) - var expectInt int64 = int64(64) - val, err = convert(originInt) - assert.Nil(t, err) - assert.Equal(t, expectInt, val.val) - assert.Equal(t, greptime.ColumnDataType_INT64, val.typ) - - // int64 - var expectInt64 int64 = int64(64) - val, err = convert(expectInt64) - assert.Nil(t, err) - assert.Equal(t, expectInt64, val.val) - assert.Equal(t, greptime.ColumnDataType_INT64, val.typ) - - // int32 - var expectInt32 int32 = int32(32) - val, err = convert(expectInt32) - assert.Nil(t, err) - assert.Equal(t, expectInt32, val.val) - assert.Equal(t, greptime.ColumnDataType_INT32, val.typ) - - // int16 - var expectInt16 int16 = int16(16) - val, err = convert(expectInt16) - assert.Nil(t, err) - assert.Equal(t, expectInt16, val.val) - assert.Equal(t, greptime.ColumnDataType_INT16, val.typ) - - // int8 - var expectInt8 int8 = int8(8) - val, err = convert(expectInt8) - assert.Nil(t, err) - assert.Equal(t, expectInt8, val.val) - assert.Equal(t, greptime.ColumnDataType_INT8, val.typ) - - // time.Time - var originTime time.Time = time.UnixMilli(1677571339623) - // var expectTime int64 = int64(1677571339623) - val, err = convert(originTime) - assert.Nil(t, err) - assert.Equal(t, originTime, val.val) - assert.Equal(t, greptime.ColumnDataType_DATETIME, val.typ) - - // type not supported - _, err = convert(time.April) - assert.NotNil(t, err) - _, err = convert(map[string]any{}) - assert.NotNil(t, err) - _, err = convert(func() {}) - assert.NotNil(t, err) - -} - -func TestConvertValuePtr(t *testing.T) { - // bool - var expectBool bool = true - val, err := convert(&expectBool) - assert.Nil(t, err) - assert.Equal(t, expectBool, val.val) - assert.Equal(t, greptime.ColumnDataType_BOOLEAN, val.typ) - - // string - var expectString string = "string" - val, err = convert(&expectString) - assert.Nil(t, err) - assert.Equal(t, expectString, val.val) - assert.Equal(t, greptime.ColumnDataType_STRING, val.typ) - - // bytes - var expectBytes []byte = []byte("bytes") - val, err = convert(&expectBytes) - assert.Nil(t, err) - assert.Equal(t, []byte("bytes"), val.val) - assert.Equal(t, greptime.ColumnDataType_BINARY, val.typ) - - // float64 - var expectFloat64 float64 = float64(64.0) - val, err = convert(&expectFloat64) - assert.Nil(t, err) - assert.Equal(t, expectFloat64, val.val) - assert.Equal(t, greptime.ColumnDataType_FLOAT64, val.typ) - - // float32 - var expectFloat32 float32 = float32(32.0) - val, err = convert(&expectFloat32) - assert.Nil(t, err) - assert.Equal(t, expectFloat32, val.val) - assert.Equal(t, greptime.ColumnDataType_FLOAT32, val.typ) - - // uint - var originUint uint = uint(64) - var expectUint uint64 = uint64(64) - val, err = convert(&originUint) - assert.Nil(t, err) - assert.Equal(t, expectUint, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT64, val.typ) - - // uint64 - var expectUint64 uint64 = uint64(64) - val, err = convert(&expectUint64) - assert.Nil(t, err) - assert.Equal(t, expectUint64, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT64, val.typ) - - // uint32 - var expectUint32 uint32 = uint32(32) - val, err = convert(&expectUint32) - assert.Nil(t, err) - assert.Equal(t, expectUint32, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT32, val.typ) - - // uint16 - var expectUint16 uint16 = uint16(16) - val, err = convert(&expectUint16) - assert.Nil(t, err) - assert.Equal(t, expectUint16, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT16, val.typ) - - // uint8 - var expectUint8 uint8 = uint8(8) - val, err = convert(&expectUint8) - assert.Nil(t, err) - assert.Equal(t, expectUint8, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT8, val.typ) - - // int - var originInt int = int(64) - var expectInt int64 = int64(64) - val, err = convert(&originInt) - assert.Nil(t, err) - assert.Equal(t, expectInt, val.val) - assert.Equal(t, greptime.ColumnDataType_INT64, val.typ) - - // int64 - var expectInt64 int64 = int64(64) - val, err = convert(&expectInt64) - assert.Nil(t, err) - assert.Equal(t, expectInt64, val.val) - assert.Equal(t, greptime.ColumnDataType_INT64, val.typ) - - // int32 - var expectInt32 int32 = int32(32) - val, err = convert(&expectInt32) - assert.Nil(t, err) - assert.Equal(t, expectInt32, val.val) - assert.Equal(t, greptime.ColumnDataType_INT32, val.typ) - - // int16 - var expectInt16 int16 = int16(16) - val, err = convert(&expectInt16) - assert.Nil(t, err) - assert.Equal(t, expectInt16, val.val) - assert.Equal(t, greptime.ColumnDataType_INT16, val.typ) - - // int8 - var expectInt8 int8 = int8(8) - val, err = convert(&expectInt8) - assert.Nil(t, err) - assert.Equal(t, expectInt8, val.val) - assert.Equal(t, greptime.ColumnDataType_INT8, val.typ) - - // time.Time - var originTime time.Time = time.UnixMilli(1677571339623) - // var expectTime int64 = int64(1677571339623) - val, err = convert(&originTime) - assert.Nil(t, err) - assert.Equal(t, originTime, val.val) - assert.Equal(t, greptime.ColumnDataType_DATETIME, val.typ) - - // type not supported - _, err = convert(&map[string]any{}) - assert.NotNil(t, err) -} - -func TestEmptyString(t *testing.T) { - assert.True(t, isEmptyString("")) - assert.True(t, isEmptyString(" ")) - assert.True(t, isEmptyString(" ")) - assert.True(t, isEmptyString("\t")) -} - -func TestColumnName(t *testing.T) { - key, err := toColumnName("ts ") - assert.Nil(t, err) - assert.Equal(t, "ts", key) - - key, err = toColumnName(" Ts") - assert.Nil(t, err) - assert.Equal(t, "ts", key) - - key, err = toColumnName(" TS ") - assert.Nil(t, err) - assert.Equal(t, "ts", key) - - key, err = toColumnName("DiskUsage ") - assert.Nil(t, err) - assert.Equal(t, "disk_usage", key) - - key, err = toColumnName("Disk-Usage") - assert.Nil(t, err) - assert.Equal(t, "disk_usage", key) - - key, err = toColumnName(" ") - assert.NotNil(t, err) - assert.Equal(t, "", key) - - key, err = toColumnName(strings.Repeat("timestamp", 20)) - assert.NotNil(t, err) - assert.Equal(t, "", key) -} - -func TestPrecisionToDataType(t *testing.T) { - _, err := precisionToDataType(123) - assert.Equal(t, ErrInvalidTimePrecision, err) -}