Skip to content

Commit

Permalink
keeplive options
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanbohan committed Feb 2, 2024
1 parent 61f6d0c commit ef3fcd2
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 53 deletions.
8 changes: 4 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Client struct {

// New helps to create the greptimedb client, which will be responsible write data into GreptimeDB.
func New(cfg *config.Config) (*Client, error) {
conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.DialOptions...)
conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.Options().Build()...)
if err != nil {
return nil, err
}
Expand All @@ -46,10 +46,10 @@ func New(cfg *config.Config) (*Client, error) {
}

func (c *Client) Write(ctx context.Context, tables ...*table.Table) (*greptimepb.GreptimeResponse, error) {
header_ := header.New().WithDatabase(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New().WithTables(tables...).WithHeader(header_).Build()
header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New(header_, tables...).Build()
if err != nil {
return nil, err
}
return c.client.Handle(ctx, request_, c.cfg.CallOptions...)
return c.client.Handle(ctx, request_)
}
8 changes: 2 additions & 6 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/ory/dockertest/v3"
dc "github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gorm.io/driver/mysql"
"gorm.io/gorm"

Expand All @@ -41,6 +39,7 @@ import (
// unmatched length of columns in rows and columns in schema
// support pointer
// write pojo
// timeout test

var (
monitorTableName = "monitor"
Expand Down Expand Up @@ -148,13 +147,10 @@ func (p *Mysql) AllDatatypes() ([]datatype, error) {
}

func newClient() *Client {
options := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
cfg := config.New(host).
WithPort(grpcPort).
WithDatabase(database).
WithDialOptions(options...)
WithKeepalive(30*time.Second, 5*time.Second)

client, err := New(cfg)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,18 @@ import (

// StreamClient is only for inserting
type StreamClient struct {
cfg *config.Config
cfg *config.Config

client greptimepb.GreptimeDatabase_HandleRequestsClient
}

func NewStreamClient(cfg *config.Config) (*StreamClient, error) {
conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.DialOptions...)
conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.Options().Build()...)
if err != nil {
return nil, err
}

client, err := greptimepb.NewGreptimeDatabaseClient(conn).HandleRequests(context.Background(), cfg.CallOptions...)
client, err := greptimepb.NewGreptimeDatabaseClient(conn).HandleRequests(context.Background())
if err != nil {
return nil, err
}
Expand All @@ -47,8 +48,8 @@ func NewStreamClient(cfg *config.Config) (*StreamClient, error) {
}

func (c *StreamClient) Send(ctx context.Context, tables ...*table.Table) error {
header_ := header.New().WithDatabase(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New().WithTables(tables...).WithHeader(header_).Build()
header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New(header_, tables...).Build()
if err != nil {
return err
}
Expand Down
7 changes: 1 addition & 6 deletions client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/GreptimeTeam/greptimedb-ingester-go/config"
tbl "github.com/GreptimeTeam/greptimedb-ingester-go/table"
Expand All @@ -35,13 +33,10 @@ var (
)

func newStreamClient() *StreamClient {
options := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
cfg := config.New(host).
WithPort(grpcPort).
WithDatabase(database).
WithDialOptions(options...)
WithKeepalive(30*time.Second, 5*time.Second)

client, err := NewStreamClient(cfg)
if err != nil {
Expand Down
51 changes: 24 additions & 27 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ package config

import (
"fmt"
"time"

"google.golang.org/grpc"
"github.com/GreptimeTeam/greptimedb-ingester-go/config/options"
)

// Config is to define how the Client behaves.
Expand All @@ -37,25 +38,15 @@ type Config struct {
Password string
Database string // the default database

// DialOptions are passed to grpc.DialContext
// when a new gRPC connection is to be created.
DialOptions []grpc.DialOption

// CallOptions are passed to StreamClient
CallOptions []grpc.CallOption
keepaliveInterval time.Duration
keepaliveTimeout time.Duration
}

// New helps to init Config with host only
func New(host string) *Config {
return &Config{
Host: host,
Port: 4001,

DialOptions: []grpc.DialOption{
grpc.WithUserAgent("greptimedb-ingester-go"),
},

CallOptions: []grpc.CallOption{},
}
}

Expand All @@ -78,24 +69,30 @@ func (c *Config) WithAuth(username, password string) *Config {
return c
}

func (c *Config) WithDialOptions(options ...grpc.DialOption) *Config {
if c.DialOptions == nil {
c.DialOptions = []grpc.DialOption{}
}

c.DialOptions = append(c.DialOptions, options...)
func (c *Config) WithKeepalive(interval, timeout time.Duration) *Config {
c.keepaliveInterval = interval
c.keepaliveTimeout = timeout
return c
}

func (c *Config) WithCallOptions(options ...grpc.CallOption) *Config {
if c.CallOptions == nil {
c.CallOptions = []grpc.CallOption{}
func (c *Config) GetEndpoint() string {
return fmt.Sprintf("%s:%d", c.Host, c.Port)
}

func (c *Config) Options() *options.Options {
if c.keepaliveInterval == 0 && c.keepaliveTimeout == 0 {
return nil
}

c.CallOptions = append(c.CallOptions, options...)
return c
}
keepalive := options.NewKeepaliveOptions()

func (c *Config) GetEndpoint() string {
return fmt.Sprintf("%s:%d", c.Host, c.Port)
if c.keepaliveInterval != 0 {
keepalive.WithInterval(c.keepaliveInterval)
}

if c.keepaliveTimeout != 0 {
keepalive.WithTimeout(c.keepaliveTimeout)
}

return options.New(keepalive)
}
103 changes: 103 additions & 0 deletions config/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package options

import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"

greptime "github.com/GreptimeTeam/greptimedb-ingester-go"
)

var (
uaOpt = grpc.WithUserAgent("greptimedb-ingester-go/" + greptime.Version)

// TODO(yuanbohan): SecurityOptions
insecureOpt = grpc.WithTransportCredentials(insecure.NewCredentials())

defaultKeepaliveInterval = 30 * time.Second
defaultKeepaliveTimeout = 5 * time.Second
)

type Options struct {
keepalive *KeepaliveOption
}

func New(keepalive *KeepaliveOption) *Options {
return &Options{
keepalive: keepalive,
}
}

func (o *Options) WithKeepalive(keepalive *KeepaliveOption) *Options {
o.keepalive = keepalive
return o
}

func (o *Options) Build() []grpc.DialOption {
options := []grpc.DialOption{uaOpt, insecureOpt}

if o == nil {
return options
}

if opt := o.keepalive.Build(); opt != nil {
options = append(options, *opt)
}

return options
}

type KeepaliveOption struct {
Interval time.Duration // default value is 30 seconds.
Timeout time.Duration // default value is 5 seconds.
}

func NewKeepaliveOptions() *KeepaliveOption {
return &KeepaliveOption{
Interval: defaultKeepaliveInterval,
Timeout: defaultKeepaliveTimeout,
}
}

func (o *KeepaliveOption) WithInterval(d time.Duration) *KeepaliveOption {
o.Interval = d
return o
}

func (o *KeepaliveOption) WithTimeout(d time.Duration) *KeepaliveOption {
o.Timeout = d
return o
}

func (o *KeepaliveOption) Build() *grpc.DialOption {
if o.Interval == 0 && o.Timeout == 0 {
return nil
}

param := keepalive.ClientParameters{PermitWithoutStream: true}
if o.Interval != 0 {
param.Time = o.Interval
}
if o.Timeout != 0 {
param.Timeout = o.Timeout
}
option := grpc.WithKeepaliveParams(param)

return &option
}
6 changes: 4 additions & 2 deletions request/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ type Header struct {
auth Auth
}

func New() *Header {
return &Header{}
func New(database string) *Header {
return &Header{
database: database,
}
}

func (h *Header) WithDatabase(database string) *Header {
Expand Down
1 change: 0 additions & 1 deletion request/header/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,5 @@ func TestHeaderBuild(t *testing.T) {

gh, err = h.WithAuth("user", "pass").Build()
assert.Nil(t, err)
assert.Equal(t, &gpb.RequestHeader{Dbname: "public"}, gh)
assert.NotNil(t, gh.Authorization)
}
7 changes: 5 additions & 2 deletions request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ type Request struct {
tables []*table.Table
}

func New() *Request {
return &Request{}
func New(header *header.Header, tables ...*table.Table) *Request {
return &Request{
header: header,
tables: tables,
}
}

func (r *Request) WithHeader(header *header.Header) *Request {
Expand Down
17 changes: 17 additions & 0 deletions version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package greptime

const Version = "v0.1.0" // THIS MUST BE THE SAME AS THE VERSION in GitHub release

0 comments on commit ef3fcd2

Please sign in to comment.