From 63f2eea613cef2584f49f81912de9a2aa9079d91 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 18 Dec 2024 18:08:40 +0100 Subject: [PATCH] Let retry policy to decide what to do with potentially executed non idempotent queries --- conn.go | 12 ++++++++++++ conn_test.go | 4 ++++ policies.go | 21 +++++++++++++++++++++ query_executor.go | 13 +++++++++++-- 4 files changed, 48 insertions(+), 2 deletions(-) diff --git a/conn.go b/conn.go index 0db60c5de..fff643e5b 100644 --- a/conn.go +++ b/conn.go @@ -1936,3 +1936,15 @@ func (e *QueryError) Error() string { func (e *QueryError) Unwrap() error { return e.err } + +type PotentiallyExecutedNotIdempotentError struct { + err error +} + +func (e *PotentiallyExecutedNotIdempotentError) Error() string { + return fmt.Sprintf("Potentially executed not idempotent query: %s", e.err.Error()) +} + +func (e *PotentiallyExecutedNotIdempotentError) Unwrap() error { + return e.err +} diff --git a/conn_test.go b/conn_test.go index 0d4a46885..67c665e07 100644 --- a/conn_test.go +++ b/conn_test.go @@ -456,6 +456,10 @@ func (t *testRetryPolicy) Attempt(qry RetryableQuery) bool { return qry.Attempts() <= t.NumRetries } func (t *testRetryPolicy) GetRetryType(err error) RetryType { + var executedErr *PotentiallyExecutedNotIdempotentError + if errors.As(err, &executedErr) { + return Rethrow + } return Retry } diff --git a/policies.go b/policies.go index ca89aecba..871266336 100644 --- a/policies.go +++ b/policies.go @@ -160,6 +160,10 @@ func (s *SimpleRetryPolicy) AttemptLWT(q RetryableQuery) bool { } func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType { + var executedErr *PotentiallyExecutedNotIdempotentError + if errors.As(err, &executedErr) { + return Rethrow + } return RetryNextHost } @@ -168,6 +172,10 @@ func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType { // even timeouts if other clients send statements touching the same // partition to the original node at the same time. func (s *SimpleRetryPolicy) GetRetryTypeLWT(err error) RetryType { + var executedErr *PotentiallyExecutedNotIdempotentError + if errors.As(err, &executedErr) { + return Rethrow + } return Retry } @@ -208,6 +216,10 @@ func getExponentialTime(min time.Duration, max time.Duration, attempts int) time } func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType { + var executedErr *PotentiallyExecutedNotIdempotentError + if errors.As(err, &executedErr) { + return Rethrow + } return RetryNextHost } @@ -216,6 +228,10 @@ func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType { // even timeouts if other clients send statements touching the same // partition to the original node at the same time. func (e *ExponentialBackoffRetryPolicy) GetRetryTypeLWT(err error) RetryType { + var executedErr *PotentiallyExecutedNotIdempotentError + if errors.As(err, &executedErr) { + return Rethrow + } return Retry } @@ -250,6 +266,11 @@ func (d *DowngradingConsistencyRetryPolicy) Attempt(q RetryableQuery) bool { } func (d *DowngradingConsistencyRetryPolicy) GetRetryType(err error) RetryType { + var executedErr *PotentiallyExecutedNotIdempotentError + if errors.As(err, &executedErr) { + return Rethrow + } + switch t := err.(type) { case *RequestErrUnavailable: if t.Alive > 0 { diff --git a/query_executor.go b/query_executor.go index 64b984168..51ff7c261 100644 --- a/query_executor.go +++ b/query_executor.go @@ -116,6 +116,7 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne var lastErr error var iter *Iter var conn *Conn + var potentiallyExecuted bool for selectedHost != nil { host := selectedHost.Info() if host == nil || !host.IsUp() { @@ -149,6 +150,9 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne // those errors represents logical errors, they should not count // toward removing a node from the pool selectedHost.Mark(nil) + if potentiallyExecuted && !qry.IsIdempotent() { + iter.err = &PotentiallyExecutedNotIdempotentError{err: iter.err} + } return iter default: selectedHost.Mark(iter.err) @@ -162,8 +166,13 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne lastErr = iter.err - if customErr, ok := iter.err.(*QueryError); ok && customErr.potentiallyExecuted && !qry.IsIdempotent() { - return iter + if customErr, ok := iter.err.(*QueryError); ok && customErr.potentiallyExecuted { + lastErr = customErr.err + potentiallyExecuted = true + } + + if potentiallyExecuted && !qry.IsIdempotent() { + lastErr = &PotentiallyExecutedNotIdempotentError{err: lastErr} } } }