Skip to content

Commit

Permalink
row inserts request
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanbohan committed Jan 29, 2024
1 parent 0301cc9 commit e747ea3
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 740 deletions.
10 changes: 6 additions & 4 deletions client.go → client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package greptime
package client

import (
"context"
Expand All @@ -22,6 +22,7 @@ import (

"github.com/GreptimeTeam/greptimedb-ingester-go/config"
"github.com/GreptimeTeam/greptimedb-ingester-go/insert"
"github.com/GreptimeTeam/greptimedb-ingester-go/table"
)

// Client helps to Insert/Query data Into/From GreptimeDB. A Client is safe for concurrent
Expand All @@ -34,7 +35,7 @@ type Client struct {
}

// NewClient helps to create the greptimedb client, which will be responsible Write/Read data To/From GreptimeDB
func NewClient(cfg *config.Config) (*Client, error) {
func New(cfg *config.Config) (*Client, error) {
conn, err := grpc.Dial(cfg.GetGRPCAddr(), cfg.DialOptions...)
if err != nil {
return nil, err
Expand All @@ -48,8 +49,9 @@ func NewClient(cfg *config.Config) (*Client, error) {
}, nil
}

// Insert helps to insert multiple rows of multiple tables into greptimedb
func (c *Client) Insert(ctx context.Context, req insert.InsertsRequest) (*greptimepb.GreptimeResponse, error) {
func (c *Client) Write(ctx context.Context, tables ...*table.Table) (*greptimepb.GreptimeResponse, error) {
req := insert.RowInsertsRequest{}
req.AddTable(tables...)
request, err := req.Build(c.cfg)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion client_test.go → client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package greptime
package client

import (
"testing"
Expand Down
File renamed without changes.
File renamed without changes.
14 changes: 4 additions & 10 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,12 @@ import (
)

var (
ErrEmptyDatabase = errors.New("name of database should not be empty")
ErrEmptyTable = errors.New("name of table should not be be empty")
ErrEmptyInserts = errors.New("at least one insert is required in InsertsRequest")
ErrEmptyDatabaseName = errors.New("name of database should not be empty")
ErrEmptyTableName = errors.New("name of table should not be be empty")
ErrEmptyColumnName = errors.New("name of column should not be be empty")
ErrEmptyTables = errors.New("please add at least one record before sending insert request")
ErrEmptyTimestamp = errors.New("timestamp should not be empty")
ErrEmptyQuery = errors.New("query should not be empty, assign Sql, InstantPromql or RangePromql")
ErrEmptyKey = errors.New("key should not be empty")
ErrEmptySql = errors.New("sql is required in querying")
ErrEmptyPromql = errors.New("promql is required in promql querying")
ErrEmptyStep = errors.New("step is required in range promql")
ErrEmptyRange = errors.New("start and end is required in range promql")
ErrInvalidTimePrecision = errors.New("precision of timestamp is not valid")
ErrNoSeriesInMetric = errors.New("empty series in Metric")

ErrColumnNotSet = errors.New("column not set, please call AddColumn first")
)
12 changes: 6 additions & 6 deletions insert/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ import (
greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"

"github.com/GreptimeTeam/greptimedb-ingester-go/config"
gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error"
gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util"
err "github.com/GreptimeTeam/greptimedb-ingester-go/error"
"github.com/GreptimeTeam/greptimedb-ingester-go/util"
)

type reqHeader struct {
database string
}

func (h *reqHeader) build(cfg *config.Config) (*greptimepb.RequestHeader, error) {
if gutil.IsEmptyString(h.database) {
if util.IsEmptyString(h.database) {
h.database = cfg.Database
}

if gutil.IsEmptyString(h.database) {
return nil, gerr.ErrEmptyDatabase
if util.IsEmptyString(h.database) {
return nil, err.ErrEmptyDatabaseName
}

header := &greptimepb.RequestHeader{
Expand All @@ -57,7 +57,7 @@ func (h RespHeader) IsRateLimited() bool {
}

func (h RespHeader) IsNil() bool {
return h.Code == 0 && gutil.IsEmptyString(h.Msg)
return h.Code == 0 && util.IsEmptyString(h.Msg)
}

type getRespHeader interface {
Expand Down
2 changes: 1 addition & 1 deletion insert/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestHeaderBuild(t *testing.T) {
h := &reqHeader{}

gh, err := h.build(&config.Config{})
assert.ErrorIs(t, err, gerr.ErrEmptyDatabase)
assert.ErrorIs(t, err, gerr.ErrEmptyDatabaseName)
assert.Nil(t, gh)

gh, err = h.build(&config.Config{Database: "database"})
Expand Down
85 changes: 24 additions & 61 deletions insert/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,101 +15,64 @@
package insert

import (
greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"
gpb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"

"github.com/GreptimeTeam/greptimedb-ingester-go/config"
gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error"
"github.com/GreptimeTeam/greptimedb-ingester-go/table"
gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util"
)

type InsertsRequest struct {
header reqHeader
inserts []InsertRequest
type RowInsertsRequest struct {
header reqHeader
tables []*table.Table
}

// WithDatabase helps to specify different database from the default one.
func (r *InsertsRequest) WithDatabase(database string) *InsertsRequest {
func (r *RowInsertsRequest) IsTablesEmpty() bool {
return r.tables == nil || len(r.tables) == 0
}

func (r *RowInsertsRequest) WithDatabase(database string) *RowInsertsRequest {
r.header = reqHeader{
database: database,
}
return r
}

// Append will include one insert into this InsertsRequest
func (r *InsertsRequest) Append(insert InsertRequest) *InsertsRequest {
if r.inserts == nil {
r.inserts = make([]InsertRequest, 0)
func (r *RowInsertsRequest) AddTable(tables ...*table.Table) *RowInsertsRequest {
if r.tables == nil {
r.tables = make([]*table.Table, 0)
}

r.inserts = append(r.inserts, insert)

r.tables = append(r.tables, tables...)
return r
}

func (r InsertsRequest) Build(cfg *config.Config) (*greptimepb.GreptimeRequest, error) {
func (r RowInsertsRequest) Build(cfg *config.Config) (*gpb.GreptimeRequest, error) {
header, err := r.header.build(cfg)
if err != nil {
return nil, err
}

if len(r.inserts) == 0 {
return nil, gerr.ErrEmptyInserts
if r.IsTablesEmpty() {
return nil, gerr.ErrEmptyTables
}

reqs := make([]*greptimepb.InsertRequest, 0, len(r.inserts))
for _, insert := range r.inserts {
req, err := insert.build()
if err != nil {
return nil, err
reqs := make([]*gpb.RowInsertRequest, 0, len(r.tables))
for _, tbl := range r.tables {
req := &gpb.RowInsertRequest{
TableName: tbl.Schema.Name,
Rows: tbl.Rows,
}
reqs = append(reqs, req)
}

req := greptimepb.GreptimeRequest_Inserts{
Inserts: &greptimepb.InsertRequests{Inserts: reqs},
req := gpb.GreptimeRequest_RowInserts{
RowInserts: &gpb.RowInsertRequests{Inserts: reqs},
}

return &greptimepb.GreptimeRequest{
return &gpb.GreptimeRequest{
Header: header,
Request: &req,
}, nil

}

// InsertRequest insert metric to specified table. You can also specify the database in header.
type InsertRequest struct {
table string
metric table.Metric
}

func (r *InsertRequest) WithTable(table string) *InsertRequest {
r.table = table
return r
}

func (r *InsertRequest) WithMetric(metric table.Metric) *InsertRequest {
r.metric = metric
return r
}

func (r *InsertRequest) RowCount() uint32 {
return uint32(len(r.metric.GetSeries()))
}

func (r *InsertRequest) build() (*greptimepb.InsertRequest, error) {
if gutil.IsEmptyString(r.table) {
return nil, gerr.ErrEmptyTable
}

columns, err := r.metric.IntoGreptimeColumn()
if err != nil {
return nil, err
}

return &greptimepb.InsertRequest{
TableName: r.table,
Columns: columns,
RowCount: r.RowCount(),
}, nil
}
Loading

0 comments on commit e747ea3

Please sign in to comment.