diff --git a/client/client.go b/client/client.go index 68789a0..b4bda08 100644 --- a/client/client.go +++ b/client/client.go @@ -36,7 +36,7 @@ type Client struct { // New helps to create the greptimedb client, which will be responsible write data into GreptimeDB. func New(cfg *config.Config) (*Client, error) { - conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.DialOptions...) + conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.Options().Build()...) if err != nil { return nil, err } @@ -46,10 +46,10 @@ func New(cfg *config.Config) (*Client, error) { } func (c *Client) Write(ctx context.Context, tables ...*table.Table) (*greptimepb.GreptimeResponse, error) { - header_ := header.New().WithDatabase(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password) - request_, err := request.New().WithTables(tables...).WithHeader(header_).Build() + header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password) + request_, err := request.New(header_, tables...).Build() if err != nil { return nil, err } - return c.client.Handle(ctx, request_, c.cfg.CallOptions...) + return c.client.Handle(ctx, request_) } diff --git a/client/client_test.go b/client/client_test.go index 2772f1f..09862c7 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -27,8 +27,6 @@ import ( "github.com/ory/dockertest/v3" dc "github.com/ory/dockertest/v3/docker" "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "gorm.io/driver/mysql" "gorm.io/gorm" @@ -41,6 +39,7 @@ import ( // unmatched length of columns in rows and columns in schema // support pointer // write pojo +// timeout test var ( monitorTableName = "monitor" @@ -148,13 +147,10 @@ func (p *Mysql) AllDatatypes() ([]datatype, error) { } func newClient() *Client { - options := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - } cfg := config.New(host). WithPort(grpcPort). WithDatabase(database). - WithDialOptions(options...) + WithKeepalive(30*time.Second, 5*time.Second) client, err := New(cfg) if err != nil { diff --git a/client/stream_client.go b/client/stream_client.go index b26996f..9d26bcc 100644 --- a/client/stream_client.go +++ b/client/stream_client.go @@ -28,17 +28,18 @@ import ( // StreamClient is only for inserting type StreamClient struct { - cfg *config.Config + cfg *config.Config + client greptimepb.GreptimeDatabase_HandleRequestsClient } func NewStreamClient(cfg *config.Config) (*StreamClient, error) { - conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.DialOptions...) + conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.Options().Build()...) if err != nil { return nil, err } - client, err := greptimepb.NewGreptimeDatabaseClient(conn).HandleRequests(context.Background(), cfg.CallOptions...) + client, err := greptimepb.NewGreptimeDatabaseClient(conn).HandleRequests(context.Background()) if err != nil { return nil, err } @@ -47,8 +48,8 @@ func NewStreamClient(cfg *config.Config) (*StreamClient, error) { } func (c *StreamClient) Send(ctx context.Context, tables ...*table.Table) error { - header_ := header.New().WithDatabase(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password) - request_, err := request.New().WithTables(tables...).WithHeader(header_).Build() + header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password) + request_, err := request.New(header_, tables...).Build() if err != nil { return err } diff --git a/client/stream_client_test.go b/client/stream_client_test.go index 88ee57c..63f809e 100644 --- a/client/stream_client_test.go +++ b/client/stream_client_test.go @@ -22,8 +22,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "github.com/GreptimeTeam/greptimedb-ingester-go/config" tbl "github.com/GreptimeTeam/greptimedb-ingester-go/table" @@ -35,13 +33,10 @@ var ( ) func newStreamClient() *StreamClient { - options := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - } cfg := config.New(host). WithPort(grpcPort). WithDatabase(database). - WithDialOptions(options...) + WithKeepalive(30*time.Second, 5*time.Second) client, err := NewStreamClient(cfg) if err != nil { diff --git a/config/config.go b/config/config.go index 7fe210f..cc96d2b 100644 --- a/config/config.go +++ b/config/config.go @@ -16,8 +16,9 @@ package config import ( "fmt" + "time" - "google.golang.org/grpc" + "github.com/GreptimeTeam/greptimedb-ingester-go/config/options" ) // Config is to define how the Client behaves. @@ -37,12 +38,8 @@ type Config struct { Password string Database string // the default database - // DialOptions are passed to grpc.DialContext - // when a new gRPC connection is to be created. - DialOptions []grpc.DialOption - - // CallOptions are passed to StreamClient - CallOptions []grpc.CallOption + keepaliveInterval time.Duration + keepaliveTimeout time.Duration } // New helps to init Config with host only @@ -50,12 +47,6 @@ func New(host string) *Config { return &Config{ Host: host, Port: 4001, - - DialOptions: []grpc.DialOption{ - grpc.WithUserAgent("greptimedb-ingester-go"), - }, - - CallOptions: []grpc.CallOption{}, } } @@ -78,24 +69,30 @@ func (c *Config) WithAuth(username, password string) *Config { return c } -func (c *Config) WithDialOptions(options ...grpc.DialOption) *Config { - if c.DialOptions == nil { - c.DialOptions = []grpc.DialOption{} - } - - c.DialOptions = append(c.DialOptions, options...) +func (c *Config) WithKeepalive(interval, timeout time.Duration) *Config { + c.keepaliveInterval = interval + c.keepaliveTimeout = timeout return c } -func (c *Config) WithCallOptions(options ...grpc.CallOption) *Config { - if c.CallOptions == nil { - c.CallOptions = []grpc.CallOption{} +func (c *Config) GetEndpoint() string { + return fmt.Sprintf("%s:%d", c.Host, c.Port) +} + +func (c *Config) Options() *options.Options { + if c.keepaliveInterval == 0 && c.keepaliveTimeout == 0 { + return nil } - c.CallOptions = append(c.CallOptions, options...) - return c -} + keepalive := options.NewKeepaliveOptions() -func (c *Config) GetEndpoint() string { - return fmt.Sprintf("%s:%d", c.Host, c.Port) + if c.keepaliveInterval != 0 { + keepalive.WithInterval(c.keepaliveInterval) + } + + if c.keepaliveTimeout != 0 { + keepalive.WithTimeout(c.keepaliveTimeout) + } + + return options.New(keepalive) } diff --git a/config/options/options.go b/config/options/options.go new file mode 100644 index 0000000..fbeccfb --- /dev/null +++ b/config/options/options.go @@ -0,0 +1,103 @@ +// Copyright 2024 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package options + +import ( + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" + + greptime "github.com/GreptimeTeam/greptimedb-ingester-go" +) + +var ( + uaOpt = grpc.WithUserAgent("greptimedb-ingester-go/" + greptime.Version) + + // TODO(yuanbohan): SecurityOptions + insecureOpt = grpc.WithTransportCredentials(insecure.NewCredentials()) + + defaultKeepaliveInterval = 30 * time.Second + defaultKeepaliveTimeout = 5 * time.Second +) + +type Options struct { + keepalive *KeepaliveOption +} + +func New(keepalive *KeepaliveOption) *Options { + return &Options{ + keepalive: keepalive, + } +} + +func (o *Options) WithKeepalive(keepalive *KeepaliveOption) *Options { + o.keepalive = keepalive + return o +} + +func (o *Options) Build() []grpc.DialOption { + options := []grpc.DialOption{uaOpt, insecureOpt} + + if o == nil { + return options + } + + if opt := o.keepalive.Build(); opt != nil { + options = append(options, *opt) + } + + return options +} + +type KeepaliveOption struct { + Interval time.Duration // default value is 30 seconds. + Timeout time.Duration // default value is 5 seconds. +} + +func NewKeepaliveOptions() *KeepaliveOption { + return &KeepaliveOption{ + Interval: defaultKeepaliveInterval, + Timeout: defaultKeepaliveTimeout, + } +} + +func (o *KeepaliveOption) WithInterval(d time.Duration) *KeepaliveOption { + o.Interval = d + return o +} + +func (o *KeepaliveOption) WithTimeout(d time.Duration) *KeepaliveOption { + o.Timeout = d + return o +} + +func (o *KeepaliveOption) Build() *grpc.DialOption { + if o.Interval == 0 && o.Timeout == 0 { + return nil + } + + param := keepalive.ClientParameters{PermitWithoutStream: true} + if o.Interval != 0 { + param.Time = o.Interval + } + if o.Timeout != 0 { + param.Timeout = o.Timeout + } + option := grpc.WithKeepaliveParams(param) + + return &option +} diff --git a/request/header/header.go b/request/header/header.go index 3e19383..a7dc47c 100644 --- a/request/header/header.go +++ b/request/header/header.go @@ -26,8 +26,10 @@ type Header struct { auth Auth } -func New() *Header { - return &Header{} +func New(database string) *Header { + return &Header{ + database: database, + } } func (h *Header) WithDatabase(database string) *Header { diff --git a/request/header/header_test.go b/request/header/header_test.go index 53f60e9..cd7ad90 100644 --- a/request/header/header_test.go +++ b/request/header/header_test.go @@ -37,6 +37,5 @@ func TestHeaderBuild(t *testing.T) { gh, err = h.WithAuth("user", "pass").Build() assert.Nil(t, err) - assert.Equal(t, &gpb.RequestHeader{Dbname: "public"}, gh) assert.NotNil(t, gh.Authorization) } diff --git a/request/request.go b/request/request.go index ad688a0..43aef3b 100644 --- a/request/request.go +++ b/request/request.go @@ -27,8 +27,11 @@ type Request struct { tables []*table.Table } -func New() *Request { - return &Request{} +func New(header *header.Header, tables ...*table.Table) *Request { + return &Request{ + header: header, + tables: tables, + } } func (r *Request) WithHeader(header *header.Header) *Request { diff --git a/version.go b/version.go new file mode 100644 index 0000000..9ac7b4b --- /dev/null +++ b/version.go @@ -0,0 +1,17 @@ +// Copyright 2024 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package greptime + +const Version = "v0.1.0" // THIS MUST BE THE SAME AS THE VERSION in GitHub release