Skip to content

Commit

Permalink
refactor(filelogger): make filelogger return a error instead of void
Browse files Browse the repository at this point in the history
Up until this point, we have been ignoring the errors that the filelogger, from now on, we will return the error and let the caller handle it. This makes the statement that needs to be executed, run first (both at oracle and at test cluster), after that we log the statement that was executed.
If the statement cannot be logged, operation will be aborted and the error will be returned.

Signed-off-by: Dusan Malusev <[email protected]>
  • Loading branch information
CodeLieutenant committed Nov 24, 2024
1 parent 536603b commit 7c5dda0
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 46 deletions.
11 changes: 6 additions & 5 deletions pkg/stmtlogger/filelogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (

type (
StmtToFile interface {
LogStmt(stmt *typedef.Stmt, ts ...time.Time)
LogStmt(stmt *typedef.Stmt, ts ...time.Time) error
Close() error
}

Expand Down Expand Up @@ -88,11 +88,10 @@ func NewLogger(w io.Writer) (StmtToFile, error) {
return out, nil
}

func (fl *logger) LogStmt(stmt *typedef.Stmt, ts ...time.Time) {
func (fl *logger) LogStmt(stmt *typedef.Stmt, ts ...time.Time) error {
buffer := fl.pool.Get().(*bytes.Buffer)
if err := stmt.PrettyCQLBuffered(buffer); err != nil {
log.Printf("failed to pretty print query: %s", err)
return
return err
}

opType := stmt.QueryType.OpType()
Expand All @@ -107,6 +106,8 @@ func (fl *logger) LogStmt(stmt *typedef.Stmt, ts ...time.Time) {
if fl.active.Load() {
fl.channel <- buffer
}

return nil
}

func (fl *logger) Close() error {
Expand Down Expand Up @@ -167,6 +168,6 @@ func (fl *logger) committer(ctx context.Context) {

type nopFileLogger struct{}

func (n *nopFileLogger) LogStmt(_ *typedef.Stmt, _ ...time.Time) {}
func (n *nopFileLogger) LogStmt(_ *typedef.Stmt, _ ...time.Time) error { return nil }

func (n *nopFileLogger) Close() error { return nil }
65 changes: 31 additions & 34 deletions pkg/store/cqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ package store

import (
"context"
"os"
"io"
"time"

errs "errors"

"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -47,71 +45,70 @@ func (cs *cqlStore) name() string {
return cs.system
}

func (cs *cqlStore) mutate(ctx context.Context, stmt *typedef.Stmt) (err error) {
var i int
for i = 0; i < cs.maxRetriesMutate; i++ {
// retry with new timestamp as list modification with the same ts
// will produce duplicated values, see https://github.com/scylladb/scylladb/issues/7937
err = cs.doMutate(ctx, stmt, time.Now())
if err == nil {
func (cs *cqlStore) mutate(ctx context.Context, stmt *typedef.Stmt) error {
for range cs.maxRetriesMutate {
if err := cs.doMutate(ctx, stmt); err == nil {
cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc()
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(cs.maxRetriesMutateSleep):
}
}
if w := cs.logger.Check(zap.ErrorLevel, "failed to apply mutation"); w != nil {
w.Write(zap.Int("attempts", i), zap.Error(err))
}
return err

return errors.Errorf("failed to mutate after %d retries", cs.maxRetriesMutate)
}

func (cs *cqlStore) doMutate(ctx context.Context, stmt *typedef.Stmt, ts time.Time) error {
func (cs *cqlStore) doMutate(ctx context.Context, stmt *typedef.Stmt) error {
queryBody, _ := stmt.Query.ToCql()
query := cs.session.Query(queryBody, stmt.Values...).WithContext(ctx)
query := cs.session.Query(queryBody, stmt.Values...).WithContext(ctx).DefaultTimestamp(false)
defer query.Release()

if cs.useServerSideTimestamps {
query = query.DefaultTimestamp(false)
cs.stmtLogger.LogStmt(stmt)
} else {
query = query.WithTimestamp(ts.UnixNano() / 1000)
cs.stmtLogger.LogStmt(stmt, ts)
var ts time.Time

if !cs.useServerSideTimestamps {
ts = time.Now()
query = query.WithTimestamp(ts.UnixMicro())
}

if err := query.Exec(); err != nil {
if errs.Is(err, context.DeadlineExceeded) {
if w := cs.logger.Check(zap.DebugLevel, "deadline exceeded for mutation query"); w != nil {
w.Write(zap.String("system", cs.system), zap.String("query", queryBody), zap.Error(err))
}
}
if !ignore(err) {
return errors.Wrapf(err, "[cluster = %s, query = '%s']", cs.system, queryBody)
}
return errors.Wrapf(err, "[cluster = %s, query = '%s']", cs.system, queryBody)
}

if err := cs.stmtLogger.LogStmt(stmt, ts); err != nil {
return err
}

return nil
}

func (cs *cqlStore) load(ctx context.Context, stmt *typedef.Stmt) (result []map[string]any, err error) {
func (cs *cqlStore) load(ctx context.Context, stmt *typedef.Stmt) ([]map[string]any, error) {
cql, _ := stmt.Query.ToCql()
cs.stmtLogger.LogStmt(stmt)

query := cs.session.Query(cql, stmt.Values...).WithContext(ctx)
defer query.Release()

iter := query.Iter()
cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc()
return loadSet(iter), iter.Close()

result := loadSet(iter)

if err := cs.stmtLogger.LogStmt(stmt); err != nil {
return nil, err
}

return result, iter.Close()
}

func (cs *cqlStore) close() error {
cs.session.Close()
return nil
}

func newSession(cluster *gocql.ClusterConfig, out *os.File) (*gocql.Session, error) {
func newSession(cluster *gocql.ClusterConfig, out io.Writer) (*gocql.Session, error) {
session, err := cluster.CreateSession()
if err != nil {
return nil, err
Expand Down
19 changes: 12 additions & 7 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package store
import (
"context"
"fmt"
"io"
"math/big"
"os"
"reflect"
"sort"
"sync"
Expand Down Expand Up @@ -71,7 +71,7 @@ type Config struct {
UseServerSideTimestamps bool
}

func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig, cfg Config, traceOut *os.File, logger *zap.Logger) (Store, error) {
func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig, cfg Config, traceOut io.Writer, logger *zap.Logger) (Store, error) {
ops := promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gemini_cql_requests",
Help: "How many CQL requests processed, partitioned by system and CQL query type aka 'method' (batch, delete, insert, update).",
Expand Down Expand Up @@ -132,15 +132,20 @@ type delegatingStore struct {
}

func (ds delegatingStore) Create(ctx context.Context, testBuilder, oracleBuilder *typedef.Stmt) error {
if ds.statementLogger != nil {
ds.statementLogger.LogStmt(testBuilder)
}
if err := mutate(ctx, ds.oracleStore, oracleBuilder); err != nil {
return errors.Wrap(err, "oracle failed store creation")
}

if err := mutate(ctx, ds.testStore, testBuilder); err != nil {
return errors.Wrap(err, "test failed store creation")
}

if ds.statementLogger != nil {
if err := ds.statementLogger.LogStmt(testBuilder); err != nil {
return errors.Wrap(err, "failed to log test create statement")
}
}

return nil
}

Expand Down Expand Up @@ -184,8 +189,8 @@ func (ds delegatingStore) Check(ctx context.Context, table *typedef.Table, stmt
wg.Add(1)

go func() {
defer wg.Done()
testRows, testErr = ds.testStore.load(ctx, stmt)
wg.Done()
}()
oracleRows, oracleErr = ds.oracleStore.load(ctx, stmt)
if oracleErr != nil {
Expand Down Expand Up @@ -258,7 +263,7 @@ func getStore(
clusterConfig *gocql.ClusterConfig,
cfg Config,
stmtLogFile string,
traceOut *os.File,
traceOut io.Writer,
logger *zap.Logger,
ops *prometheus.CounterVec,
) (out storeLoader, err error) {
Expand Down

0 comments on commit 7c5dda0

Please sign in to comment.