Skip to content

Commit

Permalink
vstreamclient: framework for robust + simple usage
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Perkins <[email protected]>
  • Loading branch information
derekperkins committed Jan 10, 2025
1 parent 555f1d7 commit ecb3720
Show file tree
Hide file tree
Showing 7 changed files with 1,506 additions and 0 deletions.
84 changes: 84 additions & 0 deletions go/vt/vstreamclient/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package vstreamclient

import (
"fmt"
"reflect"
"time"

"vitess.io/vitess/go/sqltypes"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
)

// VStreamScanner allows for custom scan implementations
type VStreamScanner interface {
VStreamScan(fields []*querypb.Field, row []sqltypes.Value, rowEvent *binlogdatapb.RowEvent, rowChange *binlogdatapb.RowChange) error
}

// copyRowToStruct builds a customer from a row event
// TODO: this is very rudimentary mapping that only works for top-level fields
func copyRowToStruct(shard shardConfig, row []sqltypes.Value, vPtr reflect.Value) error {
for fieldName, m := range shard.fieldMap {
structField := reflect.Indirect(vPtr).FieldByIndex(m.structIndex)

switch m.kind {
case reflect.Bool:
rowVal, err := row[m.rowIndex].ToBool()
if err != nil {
return fmt.Errorf("error converting row value to bool for field %s: %w", fieldName, err)
}
structField.SetBool(rowVal)

case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
rowVal, err := row[m.rowIndex].ToInt64()
if err != nil {
return fmt.Errorf("error converting row value to int64 for field %s: %w", fieldName, err)
}
structField.SetInt(rowVal)

case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
rowVal, err := row[m.rowIndex].ToUint64()
if err != nil {
return fmt.Errorf("error converting row value to uint64 for field %s: %w", fieldName, err)
}
structField.SetUint(rowVal)

case reflect.Float32, reflect.Float64:
rowVal, err := row[m.rowIndex].ToFloat64()
if err != nil {
return fmt.Errorf("error converting row value to float64 for field %s: %w", fieldName, err)
}
structField.SetFloat(rowVal)

case reflect.String:
rowVal := row[m.rowIndex].ToString()
structField.SetString(rowVal)

case reflect.Struct:
switch m.structType.(type) {
case time.Time, *time.Time:
rowVal, err := row[m.rowIndex].ToTime()
if err != nil {
return fmt.Errorf("error converting row value to time.Time for field %s: %w", fieldName, err)
}
structField.Set(reflect.ValueOf(rowVal))
}

case reflect.Pointer,
reflect.Slice,
reflect.Array,
reflect.Invalid,
reflect.Uintptr,
reflect.Complex64,
reflect.Complex128,
reflect.Chan,
reflect.Func,
reflect.Interface,
reflect.Map,
reflect.UnsafePointer:
return fmt.Errorf("vstreamclient: unsupported field type: %s", m.kind.String())
}
}

return nil
}
115 changes: 115 additions & 0 deletions go/vt/vstreamclient/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package vstreamclient

import (
"fmt"
"time"

"vitess.io/vitess/go/sqlescape"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

var (
// DefaultMinFlushDuration is the default minimum duration between flushes, used if not explicitly
// set using WithMinFlushDuration. This can be safely modified if needed before calling New.
DefaultMinFlushDuration = 5 * time.Second

// DefaultMaxRowsPerFlush is the default number of rows to buffer per table, used if not explicitly
// set in the table configuration. This same number is also used to chunk rows when calling flush.
// This can be safely modified if needed before calling New.
DefaultMaxRowsPerFlush = 1000
)

// Option is a function that can be used to configure a VStreamClient
type Option func(v *VStreamClient) error

// WithMinFlushDuration sets the minimum duration between flushes. This is useful for ensuring that data
// isn't flushed too often, which can be inefficient. The default is 30 seconds.
func WithMinFlushDuration(d time.Duration) Option {
return func(v *VStreamClient) error {
if d <= 0 {
return fmt.Errorf("vstreamclient: minimum flush duration must be positive, got %s", d.String())
}

v.minFlushDuration = d
return nil
}
}

func WithHeartbeatSeconds(seconds int) Option {
return func(v *VStreamClient) error {
if seconds <= 0 {
return fmt.Errorf("vstreamclient: heartbeat seconds must be positive, got %d", seconds)
}

v.heartbeatSeconds = seconds
return nil
}
}

func WithStateTable(keyspace, table string) Option {
return func(v *VStreamClient) error {
shards, ok := v.shardsByKeyspace[keyspace]
if !ok {
return fmt.Errorf("vstreamclient: keyspace %s not found", keyspace)
}

// this could allow for shard pinning, but we can support that if it becomes useful
if len(shards) > 1 {
return fmt.Errorf("vstreamclient: keyspace %s is sharded, only unsharded keyspaces are supported", keyspace)
}

v.vgtidStateKeyspace = sqlescape.EscapeID(keyspace)
v.vgtidStateTable = sqlescape.EscapeID(table)
return nil
}
}

// DefaultFlags returns a default set of flags for a VStreamClient, safe to use in most cases, but can be customized
func DefaultFlags() *vtgatepb.VStreamFlags {
return &vtgatepb.VStreamFlags{
HeartbeatInterval: 1,
}
}

// WithFlags lets you manually control all the flag options, instead of using helper functions
func WithFlags(flags *vtgatepb.VStreamFlags) Option {
return func(v *VStreamClient) error {
v.flags = flags
return nil
}
}

// WithEventFunc provides for custom event handling functions for specific event types. Only one function
// can be registered per event type, and it is called before the default event handling function. Returning
// an error from the custom function will exit the stream before the default function is called.
func WithEventFunc(fn EventFunc, eventTypes ...binlogdatapb.VEventType) Option {
return func(v *VStreamClient) error {
if len(eventTypes) == 0 {
return fmt.Errorf("vstreamclient: no event types provided")
}

if v.eventFuncs == nil {
v.eventFuncs = make(map[binlogdatapb.VEventType]EventFunc)
}

for _, eventType := range eventTypes {
if _, ok := v.eventFuncs[eventType]; ok {
return fmt.Errorf("vstreamclient: event type %s already has a function", eventType.String())
}

v.eventFuncs[eventType] = fn
}

return nil
}
}

// WithStartingVGtid sets the starting VGtid for the VStreamClient. This is useful for resuming a stream from a
// specific point, vs what might be stored in the state table.
func WithStartingVGtid(vgtid *binlogdatapb.VGtid) Option {
return func(v *VStreamClient) error {
v.latestVgtid = vgtid
return nil
}
}
Loading

0 comments on commit ecb3720

Please sign in to comment.