Skip to content

Commit

Permalink
Let retry policy to decide what to do with potentially executed non i…
Browse files Browse the repository at this point in the history
…dempotent queries
  • Loading branch information
sylwiaszunejko committed Dec 20, 2024
1 parent a7433d8 commit 63f2eea
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 2 deletions.
12 changes: 12 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1936,3 +1936,15 @@ func (e *QueryError) Error() string {
func (e *QueryError) Unwrap() error {
return e.err
}

type PotentiallyExecutedNotIdempotentError struct {
err error
}

func (e *PotentiallyExecutedNotIdempotentError) Error() string {
return fmt.Sprintf("Potentially executed not idempotent query: %s", e.err.Error())
}

func (e *PotentiallyExecutedNotIdempotentError) Unwrap() error {
return e.err
}
4 changes: 4 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,10 @@ func (t *testRetryPolicy) Attempt(qry RetryableQuery) bool {
return qry.Attempts() <= t.NumRetries
}
func (t *testRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}
return Retry
}

Expand Down
21 changes: 21 additions & 0 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func (s *SimpleRetryPolicy) AttemptLWT(q RetryableQuery) bool {
}

func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}
return RetryNextHost
}

Expand All @@ -168,6 +172,10 @@ func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType {
// even timeouts if other clients send statements touching the same
// partition to the original node at the same time.
func (s *SimpleRetryPolicy) GetRetryTypeLWT(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}
return Retry
}

Expand Down Expand Up @@ -208,6 +216,10 @@ func getExponentialTime(min time.Duration, max time.Duration, attempts int) time
}

func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}
return RetryNextHost
}

Expand All @@ -216,6 +228,10 @@ func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType {
// even timeouts if other clients send statements touching the same
// partition to the original node at the same time.
func (e *ExponentialBackoffRetryPolicy) GetRetryTypeLWT(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}
return Retry
}

Expand Down Expand Up @@ -250,6 +266,11 @@ func (d *DowngradingConsistencyRetryPolicy) Attempt(q RetryableQuery) bool {
}

func (d *DowngradingConsistencyRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}

switch t := err.(type) {
case *RequestErrUnavailable:
if t.Alive > 0 {
Expand Down
13 changes: 11 additions & 2 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
var lastErr error
var iter *Iter
var conn *Conn
var potentiallyExecuted bool
for selectedHost != nil {
host := selectedHost.Info()
if host == nil || !host.IsUp() {
Expand Down Expand Up @@ -149,6 +150,9 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
// those errors represents logical errors, they should not count
// toward removing a node from the pool
selectedHost.Mark(nil)
if potentiallyExecuted && !qry.IsIdempotent() {
iter.err = &PotentiallyExecutedNotIdempotentError{err: iter.err}
}
return iter
default:
selectedHost.Mark(iter.err)
Expand All @@ -162,8 +166,13 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne

lastErr = iter.err

if customErr, ok := iter.err.(*QueryError); ok && customErr.potentiallyExecuted && !qry.IsIdempotent() {
return iter
if customErr, ok := iter.err.(*QueryError); ok && customErr.potentiallyExecuted {
lastErr = customErr.err
potentiallyExecuted = true
}

if potentiallyExecuted && !qry.IsIdempotent() {
lastErr = &PotentiallyExecutedNotIdempotentError{err: lastErr}
}
}
}
Expand Down

0 comments on commit 63f2eea

Please sign in to comment.