diff --git a/AUTHORS b/AUTHORS index a6eeeabc4..fd76bebd2 100644 --- a/AUTHORS +++ b/AUTHORS @@ -142,3 +142,4 @@ Dmitry Kropachev Oliver Boyle Jackson Fleming Sylwia Szunejko +Karol BaryƂa \ No newline at end of file diff --git a/doc.go b/doc.go index 6739d98e4..98b74e6fc 100644 --- a/doc.go +++ b/doc.go @@ -321,6 +321,8 @@ // execution. // // Idempotent queries are retried in case of errors based on the configured RetryPolicy. +// If the query is LWT and the configured RetryPolicy additionally implements LWTRetryPolicy +// interface, then the policy will be cast to LWTRetryPolicy and used this way. // // Queries can be retried even before they fail by setting a SpeculativeExecutionPolicy. The policy can // cause the driver to retry on a different node if the query is taking longer than a specified delay even before the diff --git a/policies.go b/policies.go index fb50cdfd0..00ca3fe64 100644 --- a/policies.go +++ b/policies.go @@ -156,6 +156,14 @@ type RetryPolicy interface { GetRetryType(error) RetryType } +// LWTRetryPolicy is a similar interface to RetryPolicy +// If a query is recognized as an LWT query and its RetryPolicy satisfies this +// interface, then this interface will be used instead of RetryPolicy. +type LWTRetryPolicy interface { + AttemptLWT(RetryableQuery) bool + GetRetryTypeLWT(error) RetryType +} + // SimpleRetryPolicy has simple logic for attempting a query a fixed number of times. // // See below for examples of usage: @@ -175,10 +183,22 @@ func (s *SimpleRetryPolicy) Attempt(q RetryableQuery) bool { return q.Attempts() <= s.NumRetries } +func (s *SimpleRetryPolicy) AttemptLWT(q RetryableQuery) bool { + return s.Attempt(q) +} + func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType { return RetryNextHost } +// Retrying on a different host is fine for normal (non-LWT) queries, +// but in case of LWTs it will cause Paxos contention and possibly +// even timeouts if other clients send statements touching the same +// partition to the original node at the same time. +func (s *SimpleRetryPolicy) GetRetryTypeLWT(err error) RetryType { + return Retry +} + // ExponentialBackoffRetryPolicy sleeps between attempts type ExponentialBackoffRetryPolicy struct { NumRetries int @@ -193,6 +213,10 @@ func (e *ExponentialBackoffRetryPolicy) Attempt(q RetryableQuery) bool { return true } +func (e *ExponentialBackoffRetryPolicy) AttemptLWT(q RetryableQuery) bool { + return e.Attempt(q) +} + // used to calculate exponentially growing time func getExponentialTime(min time.Duration, max time.Duration, attempts int) time.Duration { if min <= 0 { @@ -215,6 +239,14 @@ func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType { return RetryNextHost } +// Retrying on a different host is fine for normal (non-LWT) queries, +// but in case of LWTs it will cause Paxos contention and possibly +// even timeouts if other clients send statements touching the same +// partition to the original node at the same time. +func (e *ExponentialBackoffRetryPolicy) GetRetryTypeLWT(err error) RetryType { + return Retry +} + // DowngradingConsistencyRetryPolicy: Next retry will be with the next consistency level // provided in the slice // diff --git a/policies_test.go b/policies_test.go index dd4969a96..2cf2fdcb1 100644 --- a/policies_test.go +++ b/policies_test.go @@ -422,6 +422,14 @@ func TestSimpleRetryPolicy(t *testing.T) { } } +func TestLWTSimpleRetryPolicy(t *testing.T) { + ebrp := &SimpleRetryPolicy{NumRetries: 2} + // Verify that SimpleRetryPolicy implements both interfaces + var _ RetryPolicy = ebrp + var lwt_rt LWTRetryPolicy = ebrp + assertEqual(t, "retry type of LWT policy", lwt_rt.GetRetryTypeLWT(nil), Retry) +} + func TestExponentialBackoffPolicy(t *testing.T) { // test with defaults sut := &ExponentialBackoffRetryPolicy{NumRetries: 2} @@ -450,6 +458,14 @@ func TestExponentialBackoffPolicy(t *testing.T) { } } +func TestLWTExponentialBackoffPolicy(t *testing.T) { + ebrp := &ExponentialBackoffRetryPolicy{NumRetries: 2} + // Verify that ExponentialBackoffRetryPolicy implements both interfaces + var _ RetryPolicy = ebrp + var lwt_rt LWTRetryPolicy = ebrp + assertEqual(t, "retry type of LWT policy", lwt_rt.GetRetryTypeLWT(nil), Retry) +} + func TestDowngradingConsistencyRetryPolicy(t *testing.T) { q := &Query{cons: LocalQuorum, routingInfo: &queryRoutingInfo{}} diff --git a/query_executor.go b/query_executor.go index 7052b82c1..7bbe7f6ab 100644 --- a/query_executor.go +++ b/query_executor.go @@ -109,6 +109,9 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) { func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter NextHost) *Iter { selectedHost := hostIter() rt := qry.retryPolicy() + lwt_rt, use_lwt_rt := rt.(LWTRetryPolicy) + // We only want to apply LWT policy to LWT queries + use_lwt_rt = use_lwt_rt && qry.IsLWT() var lastErr error var iter *Iter @@ -145,14 +148,33 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne } // Exit if the query was successful - // or no retry policy defined or retry attempts were reached - if iter.err == nil || rt == nil || !rt.Attempt(qry) { + // or no retry policy defined + if iter.err == nil || rt == nil { return iter } + + // or retry policy decides to not retry anymore + if use_lwt_rt { + if !lwt_rt.AttemptLWT(qry) { + return iter + } + } else { + if !rt.Attempt(qry) { + return iter + } + } + lastErr = iter.err + var retry_type RetryType + if use_lwt_rt { + retry_type = lwt_rt.GetRetryTypeLWT(iter.err) + } else { + retry_type = rt.GetRetryType(iter.err) + } + // If query is unsuccessful, check the error with RetryPolicy to retry - switch rt.GetRetryType(iter.err) { + switch retry_type { case Retry: // retry on the same host continue