Skip to content

Commit

Permalink
improvement(prettyCQL): make prettyCQL return an error
Browse files Browse the repository at this point in the history
everything that can produce a `Pretty CQL` for logging
now returns an error, to indicate bad types pass on to it.

Signed-off-by: Dusan Malusev <[email protected]>
  • Loading branch information
CodeLieutenant committed Oct 31, 2024
1 parent 084f2ca commit 8cebb9c
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 113 deletions.
55 changes: 46 additions & 9 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,17 @@ func validationJob(
case errors.Is(err, context.Canceled):
return nil
default:
query, prettyErr := stmt.PrettyCQL()
globalStatus.AddReadError(&joberror.JobError{
Timestamp: time.Now(),
StmtType: stmt.QueryType.ToString(),
Message: "Validation failed: " + err.Error(),
Query: stmt.PrettyCQL(),
Query: query,
})

if prettyErr != nil {
return prettyErr
}
}

if failFast && globalStatus.HasErrors() {
Expand Down Expand Up @@ -327,18 +332,31 @@ func ddl(
}
for _, ddlStmt := range ddlStmts.List {
if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil {
w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL()))
prettyCQL, prettyCQLErr := ddlStmt.PrettyCQL()
if prettyCQLErr != nil {
logger.Error("Failed! DDL PrettyCQL failed", zap.Error(prettyCQLErr))
} else {
w.Write(zap.String("pretty_cql", prettyCQL))
}
}

if err = s.Mutate(ctx, ddlStmt); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}

prettyCQL, prettyCQLErr := ddlStmt.PrettyCQL()
globalStatus.AddWriteError(&joberror.JobError{
Timestamp: time.Now(),
StmtType: ddlStmts.QueryType.ToString(),
Message: "DDL failed: " + err.Error(),
Query: ddlStmt.PrettyCQL(),
Query: prettyCQL,
})

if prettyCQLErr != nil {
logger.Error("Failed! DDL PrettyCQL failed", zap.Error(prettyCQLErr))
}

return err
}
globalStatus.WriteOps.Add(1)
Expand Down Expand Up @@ -378,22 +396,36 @@ func mutation(
}

if w := logger.Check(zap.DebugLevel, "mutation statement"); w != nil {
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
prettyCQL, prettyCQLErr := mutateStmt.PrettyCQL()
if prettyCQLErr != nil {
logger.Error("Failed! mutation PrettyCQL failed", zap.Error(prettyCQLErr))
} else {
w.Write(zap.String("pretty_cql", prettyCQL))
}
}
if err = s.Mutate(ctx, mutateStmt); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}

prettyCQL, prettyCQLErr := mutateStmt.PrettyCQL()
globalStatus.AddWriteError(&joberror.JobError{
Timestamp: time.Now(),
StmtType: mutateStmt.QueryType.ToString(),
Message: "Mutation failed: " + err.Error(),
Query: mutateStmt.PrettyCQL(),
Query: prettyCQL,
})
} else {
globalStatus.WriteOps.Add(1)
g.GiveOlds(mutateStmt.ValuesWithToken)

if prettyCQLErr != nil {
logger.Error("Failed! DDL PrettyCQL failed", zap.Error(prettyCQLErr))
}

return err
}

globalStatus.WriteOps.Add(1)
g.GiveOlds(mutateStmt.ValuesWithToken)

return nil
}

Expand All @@ -406,7 +438,12 @@ func validation(
logger *zap.Logger,
) error {
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
w.Write(zap.String("pretty_cql", stmt.PrettyCQL()))
prettyCQL, prettyCQLErr := stmt.PrettyCQL()
if prettyCQLErr != nil {
logger.Error("Failed! validation PrettyCQL failed", zap.Error(prettyCQLErr))
} else {
w.Write(zap.String("pretty_cql", prettyCQL))
}
}

maxAttempts := 1
Expand Down
42 changes: 28 additions & 14 deletions pkg/stmtlogger/filelogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package stmtlogger

import (
"io"
"log"
"os"
"strconv"
Expand All @@ -37,11 +38,10 @@ type StmtToFile interface {
Close() error
}

type fileLogger struct {
fd *os.File
type logger struct {
fd io.Writer
activeChannel atomic.Pointer[loggerChan]
channel loggerChan
filename string
isFileNonOperational bool
}

Expand All @@ -52,7 +52,7 @@ type logRec struct {
ts time.Time
}

func (fl *fileLogger) LogStmt(stmt *typedef.Stmt) {
func (fl *logger) LogStmt(stmt *typedef.Stmt) {
ch := fl.activeChannel.Load()
if ch != nil {
*ch <- logRec{
Expand All @@ -61,7 +61,7 @@ func (fl *fileLogger) LogStmt(stmt *typedef.Stmt) {
}
}

func (fl *fileLogger) LogStmtWithTimeStamp(stmt *typedef.Stmt, ts time.Time) {
func (fl *logger) LogStmtWithTimeStamp(stmt *typedef.Stmt, ts time.Time) {
ch := fl.activeChannel.Load()
if ch != nil {
*ch <- logRec{
Expand All @@ -71,11 +71,15 @@ func (fl *fileLogger) LogStmtWithTimeStamp(stmt *typedef.Stmt, ts time.Time) {
}
}

func (fl *fileLogger) Close() error {
return fl.fd.Close()
func (fl *logger) Close() error {
if closer, ok := fl.fd.(io.Closer); ok {
return closer.Close()
}

return nil
}

func (fl *fileLogger) committer() {
func (fl *logger) committer() {
var err2 error

defer func() {
Expand All @@ -90,7 +94,13 @@ func (fl *fileLogger) committer() {
continue
}

_, err1 := fl.fd.Write([]byte(rec.stmt.PrettyCQL()))
query, err := rec.stmt.PrettyCQL()
if err != nil {
log.Printf("failed to pretty print query: %s", err)
continue
}

_, err1 := fl.fd.Write([]byte(query))
opType := rec.stmt.QueryType.OpType()
if rec.ts.IsZero() || !(opType == typedef.OpInsert || opType == typedef.OpUpdate || opType == typedef.OpDelete) {
_, err2 = fl.fd.Write([]byte(";\n"))
Expand All @@ -115,7 +125,8 @@ func (fl *fileLogger) committer() {
if err2 != nil {
err1 = err2
}
log.Printf("failed to write to file %q: %s", fl.filename, err1)

log.Printf("failed to write to writer %v", err1)
return
}
}
Expand All @@ -129,10 +140,13 @@ func NewFileLogger(filename string) (StmtToFile, error) {
return nil, err
}

out := &fileLogger{
filename: filename,
fd: fd,
channel: make(loggerChan, defaultChanSize),
return NewLogger(fd)
}

func NewLogger(w io.Writer) (StmtToFile, error) {
out := &logger{
fd: w,
channel: make(loggerChan, defaultChanSize),
}
out.activeChannel.Store(&out.channel)

Expand Down
16 changes: 12 additions & 4 deletions pkg/typedef/bag.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package typedef

import (
"fmt"
"math"
"reflect"
"strings"

"github.com/pkg/errors"

"github.com/gocql/gocql"
"golang.org/x/exp/rand"

Expand Down Expand Up @@ -59,9 +60,11 @@ func (ct *BagType) CQLHolder() string {
return "?"
}

func (ct *BagType) CQLPretty(builder *strings.Builder, value any) {
type Tuple []any

func (ct *BagType) CQLPretty(builder *strings.Builder, value any) error {
if reflect.TypeOf(value).Kind() != reflect.Slice {
panic(fmt.Sprintf("set cql pretty, unknown type %v", ct))
return errors.Errorf("expected slice, got [%T]%v", value, value)
}

if ct.ComplexType == TYPE_SET {
Expand All @@ -75,11 +78,16 @@ func (ct *BagType) CQLPretty(builder *strings.Builder, value any) {
s := reflect.ValueOf(value)

for i := 0; i < s.Len(); i++ {
ct.ValueType.CQLPretty(builder, s.Index(i).Interface())
if err := ct.ValueType.CQLPretty(builder, s.Index(i).Interface()); err != nil {
return err
}

if i < s.Len()-1 {
builder.WriteRune(',')
}
}

return nil
}

func (ct *BagType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []any {
Expand Down
4 changes: 2 additions & 2 deletions pkg/typedef/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Type interface {
Name() string
CQLDef() string
CQLHolder() string
CQLPretty(*strings.Builder, any)
CQLPretty(*strings.Builder, any) error
GenValue(*rand.Rand, *PartitionRangeConfig) []any
GenJSONValue(*rand.Rand, *PartitionRangeConfig) any
LenValue() int
Expand All @@ -37,7 +37,7 @@ type Type interface {

type Statement interface {
ToCql() (stmt string, names []string)
PrettyCQL() string
PrettyCQL() (string, error)
}

type Types []Type
Expand Down
Loading

0 comments on commit 8cebb9c

Please sign in to comment.