From 7c5dda0b32f51920e2cb513c3cbf8ab385db6a4c Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Sun, 24 Nov 2024 20:37:55 +0100 Subject: [PATCH] refactor(filelogger): make filelogger return a error instead of void Up until this point, we have been ignoring the errors that the filelogger, from now on, we will return the error and let the caller handle it. This makes the statement that needs to be executed, run first (both at oracle and at test cluster), after that we log the statement that was executed. If the statement cannot be logged, operation will be aborted and the error will be returned. Signed-off-by: Dusan Malusev --- pkg/stmtlogger/filelogger.go | 11 +++--- pkg/store/cqlstore.go | 65 +++++++++++++++++------------------- pkg/store/store.go | 19 +++++++---- 3 files changed, 49 insertions(+), 46 deletions(-) diff --git a/pkg/stmtlogger/filelogger.go b/pkg/stmtlogger/filelogger.go index b785ead..d367bb8 100644 --- a/pkg/stmtlogger/filelogger.go +++ b/pkg/stmtlogger/filelogger.go @@ -40,7 +40,7 @@ const ( type ( StmtToFile interface { - LogStmt(stmt *typedef.Stmt, ts ...time.Time) + LogStmt(stmt *typedef.Stmt, ts ...time.Time) error Close() error } @@ -88,11 +88,10 @@ func NewLogger(w io.Writer) (StmtToFile, error) { return out, nil } -func (fl *logger) LogStmt(stmt *typedef.Stmt, ts ...time.Time) { +func (fl *logger) LogStmt(stmt *typedef.Stmt, ts ...time.Time) error { buffer := fl.pool.Get().(*bytes.Buffer) if err := stmt.PrettyCQLBuffered(buffer); err != nil { - log.Printf("failed to pretty print query: %s", err) - return + return err } opType := stmt.QueryType.OpType() @@ -107,6 +106,8 @@ func (fl *logger) LogStmt(stmt *typedef.Stmt, ts ...time.Time) { if fl.active.Load() { fl.channel <- buffer } + + return nil } func (fl *logger) Close() error { @@ -167,6 +168,6 @@ func (fl *logger) committer(ctx context.Context) { type nopFileLogger struct{} -func (n *nopFileLogger) LogStmt(_ *typedef.Stmt, _ ...time.Time) {} +func (n *nopFileLogger) LogStmt(_ *typedef.Stmt, _ ...time.Time) error { return nil } func (n *nopFileLogger) Close() error { return nil } diff --git a/pkg/store/cqlstore.go b/pkg/store/cqlstore.go index cf6e0a0..0a8c4b7 100644 --- a/pkg/store/cqlstore.go +++ b/pkg/store/cqlstore.go @@ -16,11 +16,9 @@ package store import ( "context" - "os" + "io" "time" - errs "errors" - "github.com/gocql/gocql" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -47,63 +45,62 @@ func (cs *cqlStore) name() string { return cs.system } -func (cs *cqlStore) mutate(ctx context.Context, stmt *typedef.Stmt) (err error) { - var i int - for i = 0; i < cs.maxRetriesMutate; i++ { - // retry with new timestamp as list modification with the same ts - // will produce duplicated values, see https://github.com/scylladb/scylladb/issues/7937 - err = cs.doMutate(ctx, stmt, time.Now()) - if err == nil { +func (cs *cqlStore) mutate(ctx context.Context, stmt *typedef.Stmt) error { + for range cs.maxRetriesMutate { + if err := cs.doMutate(ctx, stmt); err == nil { cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc() return nil } + select { case <-ctx.Done(): return ctx.Err() case <-time.After(cs.maxRetriesMutateSleep): } } - if w := cs.logger.Check(zap.ErrorLevel, "failed to apply mutation"); w != nil { - w.Write(zap.Int("attempts", i), zap.Error(err)) - } - return err + + return errors.Errorf("failed to mutate after %d retries", cs.maxRetriesMutate) } -func (cs *cqlStore) doMutate(ctx context.Context, stmt *typedef.Stmt, ts time.Time) error { +func (cs *cqlStore) doMutate(ctx context.Context, stmt *typedef.Stmt) error { queryBody, _ := stmt.Query.ToCql() - query := cs.session.Query(queryBody, stmt.Values...).WithContext(ctx) + query := cs.session.Query(queryBody, stmt.Values...).WithContext(ctx).DefaultTimestamp(false) defer query.Release() - if cs.useServerSideTimestamps { - query = query.DefaultTimestamp(false) - cs.stmtLogger.LogStmt(stmt) - } else { - query = query.WithTimestamp(ts.UnixNano() / 1000) - cs.stmtLogger.LogStmt(stmt, ts) + var ts time.Time + + if !cs.useServerSideTimestamps { + ts = time.Now() + query = query.WithTimestamp(ts.UnixMicro()) } if err := query.Exec(); err != nil { - if errs.Is(err, context.DeadlineExceeded) { - if w := cs.logger.Check(zap.DebugLevel, "deadline exceeded for mutation query"); w != nil { - w.Write(zap.String("system", cs.system), zap.String("query", queryBody), zap.Error(err)) - } - } - if !ignore(err) { - return errors.Wrapf(err, "[cluster = %s, query = '%s']", cs.system, queryBody) - } + return errors.Wrapf(err, "[cluster = %s, query = '%s']", cs.system, queryBody) + } + + if err := cs.stmtLogger.LogStmt(stmt, ts); err != nil { + return err } + return nil } -func (cs *cqlStore) load(ctx context.Context, stmt *typedef.Stmt) (result []map[string]any, err error) { +func (cs *cqlStore) load(ctx context.Context, stmt *typedef.Stmt) ([]map[string]any, error) { cql, _ := stmt.Query.ToCql() - cs.stmtLogger.LogStmt(stmt) + query := cs.session.Query(cql, stmt.Values...).WithContext(ctx) defer query.Release() iter := query.Iter() cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc() - return loadSet(iter), iter.Close() + + result := loadSet(iter) + + if err := cs.stmtLogger.LogStmt(stmt); err != nil { + return nil, err + } + + return result, iter.Close() } func (cs *cqlStore) close() error { @@ -111,7 +108,7 @@ func (cs *cqlStore) close() error { return nil } -func newSession(cluster *gocql.ClusterConfig, out *os.File) (*gocql.Session, error) { +func newSession(cluster *gocql.ClusterConfig, out io.Writer) (*gocql.Session, error) { session, err := cluster.CreateSession() if err != nil { return nil, err diff --git a/pkg/store/store.go b/pkg/store/store.go index 51095b2..8eeb3c3 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -17,8 +17,8 @@ package store import ( "context" "fmt" + "io" "math/big" - "os" "reflect" "sort" "sync" @@ -71,7 +71,7 @@ type Config struct { UseServerSideTimestamps bool } -func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig, cfg Config, traceOut *os.File, logger *zap.Logger) (Store, error) { +func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig, cfg Config, traceOut io.Writer, logger *zap.Logger) (Store, error) { ops := promauto.NewCounterVec(prometheus.CounterOpts{ Name: "gemini_cql_requests", Help: "How many CQL requests processed, partitioned by system and CQL query type aka 'method' (batch, delete, insert, update).", @@ -132,15 +132,20 @@ type delegatingStore struct { } func (ds delegatingStore) Create(ctx context.Context, testBuilder, oracleBuilder *typedef.Stmt) error { - if ds.statementLogger != nil { - ds.statementLogger.LogStmt(testBuilder) - } if err := mutate(ctx, ds.oracleStore, oracleBuilder); err != nil { return errors.Wrap(err, "oracle failed store creation") } + if err := mutate(ctx, ds.testStore, testBuilder); err != nil { return errors.Wrap(err, "test failed store creation") } + + if ds.statementLogger != nil { + if err := ds.statementLogger.LogStmt(testBuilder); err != nil { + return errors.Wrap(err, "failed to log test create statement") + } + } + return nil } @@ -184,8 +189,8 @@ func (ds delegatingStore) Check(ctx context.Context, table *typedef.Table, stmt wg.Add(1) go func() { + defer wg.Done() testRows, testErr = ds.testStore.load(ctx, stmt) - wg.Done() }() oracleRows, oracleErr = ds.oracleStore.load(ctx, stmt) if oracleErr != nil { @@ -258,7 +263,7 @@ func getStore( clusterConfig *gocql.ClusterConfig, cfg Config, stmtLogFile string, - traceOut *os.File, + traceOut io.Writer, logger *zap.Logger, ops *prometheus.CounterVec, ) (out storeLoader, err error) {