diff --git a/README.md b/README.md index 11c9a03..28b8452 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,3 @@ -[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/GreptimeTeam/greptimedb-ingester-go/blob/main/LICENSE) [![Build Status](https://github.com/greptimeteam/greptimedb-ingester-go/actions/workflows/ci.yml/badge.svg)](https://github.com/GreptimeTeam/greptimedb-ingester-go/blob/main/.github/workflows/ci.yml) [![codecov](https://codecov.io/gh/GreptimeTeam/greptimedb-ingester-go/branch/main/graph/badge.svg?token=76KIKITADQ)](https://codecov.io/gh/GreptimeTeam/greptimedb-ingester-go) [![Go Reference](https://pkg.go.dev/badge/github.com/GreptimeTeam/greptimedb-ingester-go.svg)](https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-ingester-go) @@ -7,7 +6,7 @@ NOTE: the project is still in its early stages. -Provide API for using GreptimeDB client in Go. +Provide API for using GreptimeDB ingester in Go. ## Installation @@ -17,32 +16,4 @@ go get -u github.com/GreptimeTeam/greptimedb-ingester-go ## Documentation -visit [docs](./docs) to get complete examples. You can also visit [Documentation][document] more details. - -## API reference - -### Datatype Supported - -- int8, int16, int32, int64, int -- uint8, uint16, uint32, uint64, uint -- float32, float64 -- bool -- []byte -- string -- time.Time - -### Customize metric Timestamp - -you can customize timestamp index via calling methods of [Metric][metric_doc] - -- `metric.SetTimePrecision(time.Microsecond)` -- `metric.SetTimestampAlias("timestamp")` - -## License - -This greptimedb-ingester-go uses the __Apache 2.0 license__ to strike a balance -between open contributions and allowing you to use the software however you want. - - -[document]: https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-ingester-go -[metric_doc]: https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-ingester-go#Metric +TODO diff --git a/client/client.go b/client/client.go index a7956e4..2fc469a 100644 --- a/client/client.go +++ b/client/client.go @@ -35,7 +35,7 @@ type Client struct { // New helps to create the greptimedb client, which will be responsible write data into GreptimeDB. func New(cfg *config.Config) (*Client, error) { - conn, err := grpc.Dial(cfg.GetGRPCAddr(), cfg.DialOptions...) + conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.DialOptions...) if err != nil { return nil, err } diff --git a/client/client_test.go b/client/client_test.go index bc227ff..2772f1f 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -18,7 +18,9 @@ import ( "context" "fmt" "log" + "math/rand" "strconv" + "strings" "testing" "time" @@ -35,9 +37,15 @@ import ( "github.com/GreptimeTeam/greptimedb-ingester-go/table/types" ) +//TODO(yuanbohan): +// unmatched length of columns in rows and columns in schema +// support pointer +// write pojo + var ( - tableName = "" - timezone = "UTC" + monitorTableName = "monitor" + datatypesTableName = "datatypes" + timezone = "UTC" database = "public" host = "127.0.0.1" @@ -81,7 +89,7 @@ type datatype struct { } func (datatype) TableName() string { - return tableName + return datatypesTableName } type monitor struct { @@ -95,7 +103,7 @@ type monitor struct { } func (monitor) TableName() string { - return tableName + return monitorTableName } type Mysql struct { @@ -155,6 +163,22 @@ func newClient() *Client { return client } +func randomId() int64 { + s := rand.NewSource(time.Now().UnixNano()) + r := rand.New(s) + return r.Int63() +} + +func getMonitorsIds(monitors []monitor) string { + ids := make([]string, 0) + + for _, monitor := range monitors { + ids = append(ids, strconv.Itoa(int(monitor.ID))) + } + + return fmt.Sprintf("(%s)", strings.Join(ids, ",")) +} + func newMysql() *Mysql { db := &Mysql{ Host: host, @@ -224,11 +248,10 @@ func init() { cli = newClient() db = newMysql() + streamClient = newStreamClient() } func TestInsertMonitors(t *testing.T) { - tableName = "test_insert_monitor" - loc, err := time.LoadLocation(timezone) assert.Nil(t, err) ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() @@ -238,7 +261,7 @@ func TestInsertMonitors(t *testing.T) { monitors := []monitor{ { - ID: 1, + ID: randomId(), Host: "127.0.0.1", Memory: 1, Cpu: 1.0, @@ -247,7 +270,7 @@ func TestInsertMonitors(t *testing.T) { Running: true, }, { - ID: 2, + ID: randomId(), Host: "127.0.0.2", Memory: 2, Cpu: 2.0, @@ -257,7 +280,7 @@ func TestInsertMonitors(t *testing.T) { }, } - table, err := tbl.New(tableName) + table, err := tbl.New(monitorTableName) assert.Nil(t, err) assert.Nil(t, table.AddTagColumn("id", types.INT64)) @@ -281,7 +304,7 @@ func TestInsertMonitors(t *testing.T) { assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg()) assert.Equal(t, uint32(len(monitors)), resp.GetAffectedRows().GetValue()) - monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in (1, 2) order by id asc", tableName)) + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors))) assert.Nil(t, err) assert.Equal(t, len(monitors), len(monitors_)) @@ -292,14 +315,12 @@ func TestInsertMonitors(t *testing.T) { } func TestInsertMonitorWithNilFields(t *testing.T) { - tableName = "test_insert_monitor_with_nil_fields" - loc, err := time.LoadLocation(timezone) assert.Nil(t, err) ts := time.Now().Add(-1 * time.Minute).UnixMilli() time := time.UnixMilli(ts).In(loc) monitor := monitor{ - ID: 11, + ID: randomId(), Host: "127.0.0.1", Memory: 1, Cpu: 1.0, @@ -308,7 +329,7 @@ func TestInsertMonitorWithNilFields(t *testing.T) { Running: true, } - table, err := tbl.New(tableName) + table, err := tbl.New(monitorTableName) assert.Nil(t, err) assert.Nil(t, table.AddTagColumn("id", types.INT64)) @@ -328,7 +349,7 @@ func TestInsertMonitorWithNilFields(t *testing.T) { assert.Zero(t, resp.GetHeader().GetStatus().GetStatusCode()) assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg()) - monitors_, err := db.Query(fmt.Sprintf("select * from %s where id = %d", tableName, monitor.ID)) + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id = %d", monitorTableName, monitor.ID)) assert.Nil(t, err) assert.Equal(t, 1, len(monitors_)) monitor_ := monitors_[0] @@ -343,9 +364,7 @@ func TestInsertMonitorWithNilFields(t *testing.T) { assert.Zero(t, monitor_.Temperature) } -func TestInsertMonitorWithAllDatatypes(t *testing.T) { - tableName = "test_insert_monitor_with_all_datatypes" - +func TestInsertAllDatatypes(t *testing.T) { loc, err := time.LoadLocation(timezone) assert.Nil(t, err) @@ -367,7 +386,7 @@ func TestInsertMonitorWithAllDatatypes(t *testing.T) { BINARY := []byte{1, 2, 3} STRING := "string" - table, err := tbl.New(tableName) + table, err := tbl.New(datatypesTableName) assert.Nil(t, err) assert.Nil(t, table.AddTagColumn("int8", types.INT8)) @@ -457,8 +476,3 @@ func TestInsertMonitorWithAllDatatypes(t *testing.T) { // MySQL protocol only supports microsecond precision for TIMESTAMP assert.EqualValues(t, time_.UnixNano()/1000, result.TIMESTAMP_NANOSECOND_INT.UnixNano()/1000) } - -//TODO(yuanbohan): -// unmatched length of columns in rows and columns in schema -// support pointer -// write pojo diff --git a/client/stream_client.go b/client/stream_client.go index bebf1c1..8baa793 100644 --- a/client/stream_client.go +++ b/client/stream_client.go @@ -32,7 +32,7 @@ type StreamClient struct { } func NewStreamClient(cfg *config.Config) (*StreamClient, error) { - conn, err := grpc.Dial(cfg.GetGRPCAddr(), cfg.DialOptions...) + conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.DialOptions...) if err != nil { return nil, err } diff --git a/client/stream_client_test.go b/client/stream_client_test.go index d7f7b8a..88ee57c 100644 --- a/client/stream_client_test.go +++ b/client/stream_client_test.go @@ -15,8 +15,100 @@ package client import ( + "context" + "fmt" + "log" "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/GreptimeTeam/greptimedb-ingester-go/config" + tbl "github.com/GreptimeTeam/greptimedb-ingester-go/table" + "github.com/GreptimeTeam/greptimedb-ingester-go/table/types" +) + +var ( + streamClient *StreamClient ) +func newStreamClient() *StreamClient { + options := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + cfg := config.New(host). + WithPort(grpcPort). + WithDatabase(database). + WithDialOptions(options...) + + client, err := NewStreamClient(cfg) + if err != nil { + log.Fatalf("failed to create client: %s", err.Error()) + } + return client +} + func TestStreamInsert(t *testing.T) { + loc, err := time.LoadLocation(timezone) + assert.Nil(t, err) + ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() + time1 := time.UnixMilli(ts1).In(loc) + ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() + time2 := time.UnixMilli(ts2).In(loc) + + monitors := []monitor{ + { + ID: randomId(), + Host: "127.0.0.1", + Memory: 1, + Cpu: 1.0, + Temperature: -1, + Ts: time1, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.2", + Memory: 2, + Cpu: 2.0, + Temperature: -2, + Ts: time2, + Running: true, + }, + } + + table, err := tbl.New(monitorTableName) + assert.Nil(t, err) + + assert.Nil(t, table.AddTagColumn("id", types.INT64)) + assert.Nil(t, table.AddTagColumn("host", types.STRING)) + assert.Nil(t, table.AddFieldColumn("memory", types.UINT64)) + assert.Nil(t, table.AddFieldColumn("cpu", types.FLOAT64)) + assert.Nil(t, table.AddFieldColumn("temperature", types.INT64)) + assert.Nil(t, table.AddFieldColumn("running", types.BOOLEAN)) + assert.Nil(t, table.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)) + + for _, monitor := range monitors { + err := table.AddRow(monitor.ID, monitor.Host, + monitor.Memory, monitor.Cpu, monitor.Temperature, monitor.Running, + monitor.Ts) + assert.Nil(t, err) + } + + err = streamClient.Send(context.Background(), table) + assert.Nil(t, err) + affected, err := streamClient.CloseAndRecv(context.Background()) + assert.EqualValues(t, 2, affected.GetValue()) + assert.Nil(t, err) + + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors))) + assert.Nil(t, err) + + assert.Equal(t, len(monitors), len(monitors_)) + + for i, monitor_ := range monitors_ { + assert.Equal(t, monitors[i], monitor_) + } } diff --git a/config/config.go b/config/config.go index f0b4014..d707884 100644 --- a/config/config.go +++ b/config/config.go @@ -116,6 +116,6 @@ func (c *Config) BuildAuthHeader() *greptimepb.AuthHeader { } -func (c *Config) GetGRPCAddr() string { +func (c *Config) GetEndpoint() string { return fmt.Sprintf("%s:%d", c.Host, c.Port) }