Skip to content

Commit aa5718c

Browse files
authored
feat: new write protocol implement (#207)
Signed-off-by: Young Xu <[email protected]>
1 parent 9862d2f commit aa5718c

9 files changed

+952
-1
lines changed

opengemini/client.go

+28
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ import (
1818
"context"
1919
"crypto/tls"
2020
"log/slog"
21+
"strconv"
2122
"time"
2223

2324
"github.com/prometheus/client_golang/prometheus"
25+
26+
"github.com/openGemini/opengemini-client-go/proto"
2427
)
2528

2629
const (
@@ -33,6 +36,7 @@ const (
3336
type Codec string
3437

3538
type ContentType string
39+
3640
type CompressMethod string
3741

3842
const (
@@ -63,6 +67,9 @@ type Client interface {
6367
WriteBatchPoints(ctx context.Context, database string, bp []*Point) error
6468
// WriteBatchPointsWithRp write batch points with retention policy
6569
WriteBatchPointsWithRp(ctx context.Context, database string, rp string, bp []*Point) error
70+
// WriteByGrpc write batch record to assigned database.retention_policy by gRPC.
71+
// You'd better use NewWriteRequestBuilder to build req.
72+
WriteByGrpc(ctx context.Context, req *proto.WriteRequest) error
6673

6774
// CreateDatabase Create database
6875
CreateDatabase(database string) error
@@ -160,6 +167,8 @@ type Config struct {
160167
CustomMetricsLabels map[string]string
161168
// Logger structured logger for logging operations
162169
Logger *slog.Logger
170+
// GrpcConfig configuration information for write service by gRPC
171+
GrpcConfig *GrpcConfig
163172
}
164173

165174
// Address configuration for providing service.
@@ -170,6 +179,10 @@ type Address struct {
170179
Port int
171180
}
172181

182+
func (a *Address) String() string {
183+
return a.Host + ":" + strconv.Itoa(a.Port)
184+
}
185+
173186
// AuthType type of identity authentication.
174187
type AuthType int
175188

@@ -206,6 +219,21 @@ type RpConfig struct {
206219
IndexDuration string
207220
}
208221

222+
// GrpcConfig represents the configuration information for write service by gRPC.
223+
type GrpcConfig struct {
224+
// Addresses Configure the service endpoints for the openGemini grpc write service.
225+
// This parameter is required.
226+
Addresses []Address
227+
// AuthConfig configuration information for authentication.
228+
AuthConfig *AuthConfig
229+
// TlsConfig configuration information for tls.
230+
TlsConfig *tls.Config
231+
// CompressMethod determines the compress method used for data transmission.
232+
CompressMethod CompressMethod
233+
// Timeout default 30s
234+
Timeout time.Duration
235+
}
236+
209237
// NewClient Creates a openGemini client instance
210238
func NewClient(config *Config) (Client, error) {
211239
return newClient(config)

opengemini/client_impl.go

+11
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type client struct {
3939
prevIdx atomic.Int32
4040
dataChanMap syncx.Map[dbRp, chan *sendBatchWithCB]
4141
metrics *metrics
42+
rpcClient *writerClient
4243

4344
batchContext context.Context
4445
batchContextCancel context.CancelFunc
@@ -91,6 +92,13 @@ func newClient(c *Config) (Client, error) {
9192
} else {
9293
dbClient.logger = slog.Default()
9394
}
95+
if c.GrpcConfig != nil {
96+
rc, err := newWriterClient(c.GrpcConfig)
97+
if err != nil {
98+
return nil, errors.New("failed to create rpc client: " + err.Error())
99+
}
100+
dbClient.rpcClient = rc
101+
}
94102
dbClient.prevIdx.Store(-1)
95103
if len(c.Addresses) > 1 {
96104
// if there are multiple addresses, start the health check
@@ -106,6 +114,9 @@ func (c *client) Close() error {
106114
c.dataChanMap.Delete(key)
107115
return true
108116
})
117+
if c.rpcClient != nil {
118+
_ = c.rpcClient.Close()
119+
}
109120
c.cli.CloseIdleConnections()
110121
return nil
111122
}

opengemini/error.go

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ var (
2929
ErrNoAddress = errors.New("must have at least one address")
3030
ErrRetentionPolicy = errors.New("empty retention policy")
3131
ErrUnsupportedFieldValueType = errors.New("unsupported field value type")
32+
ErrEmptyRecord = errors.New("empty record")
3233
)
3334

3435
// checkDatabaseName checks if the database name is empty and returns an error if it is.

opengemini/query.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import (
2222
"net/http"
2323
"time"
2424

25-
compressionPool "github.com/openGemini/opengemini-client-go/lib/pool"
2625
"github.com/vmihailenco/msgpack/v5"
26+
27+
compressionPool "github.com/openGemini/opengemini-client-go/lib/pool"
2728
)
2829

2930
const (

opengemini/query_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func TestQueryWithEpoch(t *testing.T) {
7070
assert.Equal(t, length, getTimestampLength(v))
7171
}
7272
}
73+
7374
func TestQueryWithMsgPack(t *testing.T) {
7475
c := testNewClient(t, &Config{
7576
Addresses: []Address{{

opengemini/record_builder.go

+256
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
// Copyright 2024 openGemini Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package opengemini
16+
17+
import (
18+
"errors"
19+
"fmt"
20+
"math/rand"
21+
"time"
22+
23+
"github.com/openGemini/opengemini-client-go/lib/record"
24+
"github.com/openGemini/opengemini-client-go/proto"
25+
)
26+
27+
var (
28+
_ WriteRequestBuilder = (*writeRequestBuilderImpl)(nil)
29+
random = rand.New(rand.NewSource(time.Now().UnixNano()))
30+
)
31+
32+
// RecordLine define an abstract record line structure.
33+
type RecordLine any
34+
35+
// RecordBuilder build record line, it is not thread safe
36+
type RecordBuilder interface {
37+
// NewLine start a new line, otherwise the added attributes will be in the default row
38+
NewLine() RecordBuilder
39+
// AddTag add a tag to the record.
40+
// If the key exists, it will be overwritten.
41+
// If the key is `time`, it will cause an error.
42+
// If the key is empty or the value is empty, it will be ignored.
43+
AddTag(key string, value string) RecordBuilder
44+
// AddTags add multiple tags to the record.
45+
// Each entry in the map represents a tag where the key is the tag name and the value is the tag value.
46+
AddTags(tags map[string]string) RecordBuilder
47+
// AddField add a field to the record.
48+
// If the key is empty, it will be ignored.
49+
// If the key is `time`, it will cause an error.
50+
// If the key already exists, its value will be overwritten.
51+
AddField(key string, value interface{}) RecordBuilder
52+
// AddFields add multiple fields to the record.
53+
// Each entry in the map represents a field where the key is the field name and the value is the field value.
54+
AddFields(fields map[string]interface{}) RecordBuilder
55+
// CompressMethod set compress method for request data.
56+
CompressMethod(method CompressMethod) RecordBuilder
57+
// Build specifies the time of the record.
58+
// If the time is not specified or zero value, the current time will be used.
59+
Build(timestamp int64) RecordLine
60+
}
61+
62+
type WriteRequestBuilder interface {
63+
// Authenticate configuration write request information for authentication.
64+
Authenticate(username, password string) WriteRequestBuilder
65+
// AddRecord append Record for WriteRequest, you'd better use NewRecordBuilder to build RecordLine.
66+
AddRecord(rlb ...RecordLine) WriteRequestBuilder
67+
// Build generate WriteRequest.
68+
Build() (*proto.WriteRequest, error)
69+
}
70+
71+
type fieldTuple struct {
72+
record.Field
73+
value interface{}
74+
}
75+
76+
type writeRequestBuilderImpl struct {
77+
database string
78+
retentionPolicy string
79+
username string
80+
password string
81+
transform transform
82+
err error
83+
}
84+
85+
func (r *writeRequestBuilderImpl) reset() {
86+
r.transform.reset()
87+
}
88+
89+
func (r *writeRequestBuilderImpl) Authenticate(username, password string) WriteRequestBuilder {
90+
r.username = username
91+
r.password = password
92+
return r
93+
}
94+
95+
func NewWriteRequestBuilder(database, retentionPolicy string) (WriteRequestBuilder, error) {
96+
if err := checkDatabaseName(database); err != nil {
97+
return nil, err
98+
}
99+
return &writeRequestBuilderImpl{database: database, retentionPolicy: retentionPolicy, transform: make(transform)}, nil
100+
}
101+
102+
func (r *writeRequestBuilderImpl) AddRecord(rlb ...RecordLine) WriteRequestBuilder {
103+
for _, lineBuilder := range rlb {
104+
lb, ok := lineBuilder.(*recordLineBuilderImpl)
105+
if !ok {
106+
continue
107+
}
108+
if lb.err != nil {
109+
r.err = errors.Join(r.err, lb.err)
110+
continue
111+
}
112+
err := r.transform.AppendRecord(lb)
113+
if err != nil {
114+
r.err = errors.Join(r.err, err)
115+
continue
116+
}
117+
}
118+
return r
119+
}
120+
121+
func (r *writeRequestBuilderImpl) Build() (*proto.WriteRequest, error) {
122+
defer r.reset()
123+
124+
if r.err != nil {
125+
return nil, r.err
126+
}
127+
128+
if r.database == "" {
129+
return nil, ErrEmptyDatabaseName
130+
}
131+
132+
if r.retentionPolicy == "" {
133+
r.retentionPolicy = "autogen"
134+
}
135+
136+
var req = &proto.WriteRequest{
137+
Database: r.database,
138+
RetentionPolicy: r.retentionPolicy,
139+
Username: r.username,
140+
Password: r.password,
141+
}
142+
143+
for mst, rawRecord := range r.transform {
144+
rec, err := rawRecord.toSrvRecords()
145+
if err != nil {
146+
return nil, fmt.Errorf("failed to convert records: %v", err)
147+
}
148+
var buff []byte
149+
buff, err = rec.Marshal(buff)
150+
if err != nil {
151+
return nil, fmt.Errorf("failed to marshal record: %v", err)
152+
}
153+
154+
req.Records = append(req.Records, &proto.Record{
155+
Measurement: mst,
156+
MinTime: rawRecord.MinTime,
157+
MaxTime: rawRecord.MaxTime,
158+
Block: buff,
159+
})
160+
}
161+
162+
return req, nil
163+
}
164+
165+
type recordLineBuilderImpl struct {
166+
measurement string
167+
tags []*fieldTuple
168+
fields []*fieldTuple
169+
timestamp int64
170+
compressMethod CompressMethod
171+
err error
172+
}
173+
174+
func (r *recordLineBuilderImpl) NewLine() RecordBuilder {
175+
return &recordLineBuilderImpl{measurement: r.measurement}
176+
}
177+
178+
func NewRecordBuilder(measurement string) (RecordBuilder, error) {
179+
if err := checkMeasurementName(measurement); err != nil {
180+
return nil, err
181+
}
182+
return &recordLineBuilderImpl{measurement: measurement}, nil
183+
}
184+
185+
func (r *recordLineBuilderImpl) CompressMethod(method CompressMethod) RecordBuilder {
186+
r.compressMethod = method
187+
return r
188+
}
189+
190+
func (r *recordLineBuilderImpl) AddTag(key string, value string) RecordBuilder {
191+
if key == "" {
192+
r.err = errors.Join(r.err, fmt.Errorf("miss tag name: %w", ErrEmptyName))
193+
return r
194+
}
195+
if key == record.TimeField {
196+
r.err = errors.Join(r.err, fmt.Errorf("tag name %s invalid: %w", key, ErrInvalidTimeColumn))
197+
return r
198+
}
199+
r.tags = append(r.tags, &fieldTuple{
200+
Field: record.Field{
201+
Name: key,
202+
Type: record.FieldTypeTag,
203+
},
204+
value: value,
205+
})
206+
return r
207+
}
208+
209+
func (r *recordLineBuilderImpl) AddTags(tags map[string]string) RecordBuilder {
210+
for key, value := range tags {
211+
r.AddTag(key, value)
212+
}
213+
return r
214+
}
215+
216+
func (r *recordLineBuilderImpl) AddField(key string, value interface{}) RecordBuilder {
217+
if key == "" {
218+
r.err = errors.Join(r.err, fmt.Errorf("miss field name: %w", ErrEmptyName))
219+
return r
220+
}
221+
if key == record.TimeField {
222+
r.err = errors.Join(r.err, fmt.Errorf("field name %s invalid: %w", key, ErrInvalidTimeColumn))
223+
return r
224+
}
225+
typ := record.FieldTypeUnknown
226+
switch value.(type) {
227+
case string:
228+
typ = record.FieldTypeString
229+
case float32, float64:
230+
typ = record.FieldTypeFloat
231+
case bool:
232+
typ = record.FieldTypeBoolean
233+
case int8, int16, int32, int64, uint8, uint16, uint32, uint64, int:
234+
typ = record.FieldTypeInt
235+
}
236+
r.fields = append(r.fields, &fieldTuple{
237+
Field: record.Field{
238+
Name: key,
239+
Type: typ,
240+
},
241+
value: value,
242+
})
243+
return r
244+
}
245+
246+
func (r *recordLineBuilderImpl) AddFields(fields map[string]interface{}) RecordBuilder {
247+
for key, value := range fields {
248+
r.AddField(key, value)
249+
}
250+
return r
251+
}
252+
253+
func (r *recordLineBuilderImpl) Build(t int64) RecordLine {
254+
r.timestamp = t
255+
return r
256+
}

0 commit comments

Comments
 (0)