Skip to content

Commit

Permalink
feat: Keeplive Options (#14)
Browse files Browse the repository at this point in the history
* refactor auth header

* keeplive options
  • Loading branch information
yuanbohan authored Feb 2, 2024
1 parent b66d47c commit 24d8f6e
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 99 deletions.
8 changes: 5 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/GreptimeTeam/greptimedb-ingester-go/config"
"github.com/GreptimeTeam/greptimedb-ingester-go/request"
"github.com/GreptimeTeam/greptimedb-ingester-go/request/header"
"github.com/GreptimeTeam/greptimedb-ingester-go/table"
)

Expand All @@ -35,7 +36,7 @@ type Client struct {

// New helps to create the greptimedb client, which will be responsible write data into GreptimeDB.
func New(cfg *config.Config) (*Client, error) {
conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.DialOptions...)
conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.Options().Build()...)
if err != nil {
return nil, err
}
Expand All @@ -45,9 +46,10 @@ func New(cfg *config.Config) (*Client, error) {
}

func (c *Client) Write(ctx context.Context, tables ...*table.Table) (*greptimepb.GreptimeResponse, error) {
req, err := request.New(tables...).Build(c.cfg)
header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New(header_, tables...).Build()
if err != nil {
return nil, err
}
return c.client.Handle(ctx, req, c.cfg.CallOptions...)
return c.client.Handle(ctx, request_)
}
8 changes: 2 additions & 6 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/ory/dockertest/v3"
dc "github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gorm.io/driver/mysql"
"gorm.io/gorm"

Expand All @@ -41,6 +39,7 @@ import (
// unmatched length of columns in rows and columns in schema
// support pointer
// write pojo
// timeout test

var (
monitorTableName = "monitor"
Expand Down Expand Up @@ -148,13 +147,10 @@ func (p *Mysql) AllDatatypes() ([]datatype, error) {
}

func newClient() *Client {
options := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
cfg := config.New(host).
WithPort(grpcPort).
WithDatabase(database).
WithDialOptions(options...)
WithKeepalive(30*time.Second, 5*time.Second)

client, err := New(cfg)
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,24 @@ import (

"github.com/GreptimeTeam/greptimedb-ingester-go/config"
"github.com/GreptimeTeam/greptimedb-ingester-go/request"
"github.com/GreptimeTeam/greptimedb-ingester-go/request/header"
"github.com/GreptimeTeam/greptimedb-ingester-go/table"
)

// StreamClient is only for inserting
type StreamClient struct {
cfg *config.Config
cfg *config.Config

client greptimepb.GreptimeDatabase_HandleRequestsClient
}

func NewStreamClient(cfg *config.Config) (*StreamClient, error) {
conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.DialOptions...)
conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.Options().Build()...)
if err != nil {
return nil, err
}

client, err := greptimepb.NewGreptimeDatabaseClient(conn).HandleRequests(context.Background(), cfg.CallOptions...)
client, err := greptimepb.NewGreptimeDatabaseClient(conn).HandleRequests(context.Background())
if err != nil {
return nil, err
}
Expand All @@ -46,11 +48,12 @@ func NewStreamClient(cfg *config.Config) (*StreamClient, error) {
}

func (c *StreamClient) Send(ctx context.Context, tables ...*table.Table) error {
req, err := request.New(tables...).Build(c.cfg)
header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New(header_, tables...).Build()
if err != nil {
return err
}
return c.client.Send(req)
return c.client.Send(request_)
}

func (c *StreamClient) CloseAndRecv(ctx context.Context) (*greptimepb.AffectedRows, error) {
Expand Down
7 changes: 1 addition & 6 deletions client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/GreptimeTeam/greptimedb-ingester-go/config"
tbl "github.com/GreptimeTeam/greptimedb-ingester-go/table"
Expand All @@ -35,13 +33,10 @@ var (
)

func newStreamClient() *StreamClient {
options := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
cfg := config.New(host).
WithPort(grpcPort).
WithDatabase(database).
WithDialOptions(options...)
WithKeepalive(30*time.Second, 5*time.Second)

client, err := NewStreamClient(cfg)
if err != nil {
Expand Down
61 changes: 19 additions & 42 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ package config

import (
"fmt"
"time"

greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"
"google.golang.org/grpc"

gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util"
"github.com/GreptimeTeam/greptimedb-ingester-go/config/options"
)

// Config is to define how the Client behaves.
Expand All @@ -40,25 +38,15 @@ type Config struct {
Password string
Database string // the default database

// DialOptions are passed to grpc.DialContext
// when a new gRPC connection is to be created.
DialOptions []grpc.DialOption

// CallOptions are passed to StreamClient
CallOptions []grpc.CallOption
keepaliveInterval time.Duration
keepaliveTimeout time.Duration
}

// New helps to init Config with host only
func New(host string) *Config {
return &Config{
Host: host,
Port: 4001,

DialOptions: []grpc.DialOption{
grpc.WithUserAgent("greptimedb-ingester-go"),
},

CallOptions: []grpc.CallOption{},
}
}

Expand All @@ -81,41 +69,30 @@ func (c *Config) WithAuth(username, password string) *Config {
return c
}

func (c *Config) WithDialOptions(options ...grpc.DialOption) *Config {
if c.DialOptions == nil {
c.DialOptions = []grpc.DialOption{}
}

c.DialOptions = append(c.DialOptions, options...)
func (c *Config) WithKeepalive(interval, timeout time.Duration) *Config {
c.keepaliveInterval = interval
c.keepaliveTimeout = timeout
return c
}

func (c *Config) WithCallOptions(options ...grpc.CallOption) *Config {
if c.CallOptions == nil {
c.CallOptions = []grpc.CallOption{}
}

c.CallOptions = append(c.CallOptions, options...)
return c
func (c *Config) GetEndpoint() string {
return fmt.Sprintf("%s:%d", c.Host, c.Port)
}

// buildAuthHeader only supports Basic Auth so far
func (c *Config) BuildAuthHeader() *greptimepb.AuthHeader {
if gutil.IsEmptyString(c.Username) || gutil.IsEmptyString(c.Password) {
func (c *Config) Options() *options.Options {
if c.keepaliveInterval == 0 && c.keepaliveTimeout == 0 {
return nil
}

return &greptimepb.AuthHeader{
AuthScheme: &greptimepb.AuthHeader_Basic{
Basic: &greptimepb.Basic{
Username: c.Username,
Password: c.Password,
},
},
keepalive := options.NewKeepaliveOptions()

if c.keepaliveInterval != 0 {
keepalive.WithInterval(c.keepaliveInterval)
}

}
if c.keepaliveTimeout != 0 {
keepalive.WithTimeout(c.keepaliveTimeout)
}

func (c *Config) GetEndpoint() string {
return fmt.Sprintf("%s:%d", c.Host, c.Port)
return options.New(keepalive)
}
103 changes: 103 additions & 0 deletions config/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package options

import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"

greptime "github.com/GreptimeTeam/greptimedb-ingester-go"
)

var (
uaOpt = grpc.WithUserAgent("greptimedb-ingester-go/" + greptime.Version)

// TODO(yuanbohan): SecurityOptions
insecureOpt = grpc.WithTransportCredentials(insecure.NewCredentials())

defaultKeepaliveInterval = 30 * time.Second
defaultKeepaliveTimeout = 5 * time.Second
)

type Options struct {
keepalive *KeepaliveOption
}

func New(keepalive *KeepaliveOption) *Options {
return &Options{
keepalive: keepalive,
}
}

func (o *Options) WithKeepalive(keepalive *KeepaliveOption) *Options {
o.keepalive = keepalive
return o
}

func (o *Options) Build() []grpc.DialOption {
options := []grpc.DialOption{uaOpt, insecureOpt}

if o == nil {
return options
}

if opt := o.keepalive.Build(); opt != nil {
options = append(options, *opt)
}

return options
}

type KeepaliveOption struct {
Interval time.Duration // default value is 30 seconds.
Timeout time.Duration // default value is 5 seconds.
}

func NewKeepaliveOptions() *KeepaliveOption {
return &KeepaliveOption{
Interval: defaultKeepaliveInterval,
Timeout: defaultKeepaliveTimeout,
}
}

func (o *KeepaliveOption) WithInterval(d time.Duration) *KeepaliveOption {
o.Interval = d
return o
}

func (o *KeepaliveOption) WithTimeout(d time.Duration) *KeepaliveOption {
o.Timeout = d
return o
}

func (o *KeepaliveOption) Build() *grpc.DialOption {
if o.Interval == 0 && o.Timeout == 0 {
return nil
}

param := keepalive.ClientParameters{PermitWithoutStream: true}
if o.Interval != 0 {
param.Time = o.Interval
}
if o.Timeout != 0 {
param.Timeout = o.Timeout
}
option := grpc.WithKeepaliveParams(param)

return &option
}
50 changes: 50 additions & 0 deletions request/header/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package header

import (
gpb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"

"github.com/GreptimeTeam/greptimedb-ingester-go/util"
)

type Auth struct {
username string
password string
}

func newAuth(username, password string) Auth {
return Auth{
username: username,
password: password,
}
}

// buildAuthHeader only supports Basic Auth so far
func (a Auth) buildAuthHeader() *gpb.AuthHeader {
if util.IsEmptyString(a.username) || util.IsEmptyString(a.password) {
return nil
}

return &gpb.AuthHeader{
AuthScheme: &gpb.AuthHeader_Basic{
Basic: &gpb.Basic{
Username: a.username,
Password: a.password,
},
},
}

}
Loading

0 comments on commit 24d8f6e

Please sign in to comment.