diff --git a/pkg/stmtlogger/filelogger.go b/pkg/stmtlogger/filelogger.go index b785ead..d367bb8 100644 --- a/pkg/stmtlogger/filelogger.go +++ b/pkg/stmtlogger/filelogger.go @@ -40,7 +40,7 @@ const ( type ( StmtToFile interface { - LogStmt(stmt *typedef.Stmt, ts ...time.Time) + LogStmt(stmt *typedef.Stmt, ts ...time.Time) error Close() error } @@ -88,11 +88,10 @@ func NewLogger(w io.Writer) (StmtToFile, error) { return out, nil } -func (fl *logger) LogStmt(stmt *typedef.Stmt, ts ...time.Time) { +func (fl *logger) LogStmt(stmt *typedef.Stmt, ts ...time.Time) error { buffer := fl.pool.Get().(*bytes.Buffer) if err := stmt.PrettyCQLBuffered(buffer); err != nil { - log.Printf("failed to pretty print query: %s", err) - return + return err } opType := stmt.QueryType.OpType() @@ -107,6 +106,8 @@ func (fl *logger) LogStmt(stmt *typedef.Stmt, ts ...time.Time) { if fl.active.Load() { fl.channel <- buffer } + + return nil } func (fl *logger) Close() error { @@ -167,6 +168,6 @@ func (fl *logger) committer(ctx context.Context) { type nopFileLogger struct{} -func (n *nopFileLogger) LogStmt(_ *typedef.Stmt, _ ...time.Time) {} +func (n *nopFileLogger) LogStmt(_ *typedef.Stmt, _ ...time.Time) error { return nil } func (n *nopFileLogger) Close() error { return nil } diff --git a/pkg/store/cqlstore.go b/pkg/store/cqlstore.go index cf6e0a0..0a8c4b7 100644 --- a/pkg/store/cqlstore.go +++ b/pkg/store/cqlstore.go @@ -16,11 +16,9 @@ package store import ( "context" - "os" + "io" "time" - errs "errors" - "github.com/gocql/gocql" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -47,63 +45,62 @@ func (cs *cqlStore) name() string { return cs.system } -func (cs *cqlStore) mutate(ctx context.Context, stmt *typedef.Stmt) (err error) { - var i int - for i = 0; i < cs.maxRetriesMutate; i++ { - // retry with new timestamp as list modification with the same ts - // will produce duplicated values, see https://github.com/scylladb/scylladb/issues/7937 - err = cs.doMutate(ctx, stmt, time.Now()) - if err == nil { +func (cs *cqlStore) mutate(ctx context.Context, stmt *typedef.Stmt) error { + for range cs.maxRetriesMutate { + if err := cs.doMutate(ctx, stmt); err == nil { cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc() return nil } + select { case <-ctx.Done(): return ctx.Err() case <-time.After(cs.maxRetriesMutateSleep): } } - if w := cs.logger.Check(zap.ErrorLevel, "failed to apply mutation"); w != nil { - w.Write(zap.Int("attempts", i), zap.Error(err)) - } - return err + + return errors.Errorf("failed to mutate after %d retries", cs.maxRetriesMutate) } -func (cs *cqlStore) doMutate(ctx context.Context, stmt *typedef.Stmt, ts time.Time) error { +func (cs *cqlStore) doMutate(ctx context.Context, stmt *typedef.Stmt) error { queryBody, _ := stmt.Query.ToCql() - query := cs.session.Query(queryBody, stmt.Values...).WithContext(ctx) + query := cs.session.Query(queryBody, stmt.Values...).WithContext(ctx).DefaultTimestamp(false) defer query.Release() - if cs.useServerSideTimestamps { - query = query.DefaultTimestamp(false) - cs.stmtLogger.LogStmt(stmt) - } else { - query = query.WithTimestamp(ts.UnixNano() / 1000) - cs.stmtLogger.LogStmt(stmt, ts) + var ts time.Time + + if !cs.useServerSideTimestamps { + ts = time.Now() + query = query.WithTimestamp(ts.UnixMicro()) } if err := query.Exec(); err != nil { - if errs.Is(err, context.DeadlineExceeded) { - if w := cs.logger.Check(zap.DebugLevel, "deadline exceeded for mutation query"); w != nil { - w.Write(zap.String("system", cs.system), zap.String("query", queryBody), zap.Error(err)) - } - } - if !ignore(err) { - return errors.Wrapf(err, "[cluster = %s, query = '%s']", cs.system, queryBody) - } + return errors.Wrapf(err, "[cluster = %s, query = '%s']", cs.system, queryBody) + } + + if err := cs.stmtLogger.LogStmt(stmt, ts); err != nil { + return err } + return nil } -func (cs *cqlStore) load(ctx context.Context, stmt *typedef.Stmt) (result []map[string]any, err error) { +func (cs *cqlStore) load(ctx context.Context, stmt *typedef.Stmt) ([]map[string]any, error) { cql, _ := stmt.Query.ToCql() - cs.stmtLogger.LogStmt(stmt) + query := cs.session.Query(cql, stmt.Values...).WithContext(ctx) defer query.Release() iter := query.Iter() cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc() - return loadSet(iter), iter.Close() + + result := loadSet(iter) + + if err := cs.stmtLogger.LogStmt(stmt); err != nil { + return nil, err + } + + return result, iter.Close() } func (cs *cqlStore) close() error { @@ -111,7 +108,7 @@ func (cs *cqlStore) close() error { return nil } -func newSession(cluster *gocql.ClusterConfig, out *os.File) (*gocql.Session, error) { +func newSession(cluster *gocql.ClusterConfig, out io.Writer) (*gocql.Session, error) { session, err := cluster.CreateSession() if err != nil { return nil, err diff --git a/pkg/store/store.go b/pkg/store/store.go index 51095b2..8eeb3c3 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -17,8 +17,8 @@ package store import ( "context" "fmt" + "io" "math/big" - "os" "reflect" "sort" "sync" @@ -71,7 +71,7 @@ type Config struct { UseServerSideTimestamps bool } -func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig, cfg Config, traceOut *os.File, logger *zap.Logger) (Store, error) { +func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig, cfg Config, traceOut io.Writer, logger *zap.Logger) (Store, error) { ops := promauto.NewCounterVec(prometheus.CounterOpts{ Name: "gemini_cql_requests", Help: "How many CQL requests processed, partitioned by system and CQL query type aka 'method' (batch, delete, insert, update).", @@ -132,15 +132,20 @@ type delegatingStore struct { } func (ds delegatingStore) Create(ctx context.Context, testBuilder, oracleBuilder *typedef.Stmt) error { - if ds.statementLogger != nil { - ds.statementLogger.LogStmt(testBuilder) - } if err := mutate(ctx, ds.oracleStore, oracleBuilder); err != nil { return errors.Wrap(err, "oracle failed store creation") } + if err := mutate(ctx, ds.testStore, testBuilder); err != nil { return errors.Wrap(err, "test failed store creation") } + + if ds.statementLogger != nil { + if err := ds.statementLogger.LogStmt(testBuilder); err != nil { + return errors.Wrap(err, "failed to log test create statement") + } + } + return nil } @@ -184,8 +189,8 @@ func (ds delegatingStore) Check(ctx context.Context, table *typedef.Table, stmt wg.Add(1) go func() { + defer wg.Done() testRows, testErr = ds.testStore.load(ctx, stmt) - wg.Done() }() oracleRows, oracleErr = ds.oracleStore.load(ctx, stmt) if oracleErr != nil { @@ -258,7 +263,7 @@ func getStore( clusterConfig *gocql.ClusterConfig, cfg Config, stmtLogFile string, - traceOut *os.File, + traceOut io.Writer, logger *zap.Logger, ops *prometheus.CounterVec, ) (out storeLoader, err error) {