Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement application-level retries #448

Open
adutra opened this issue Aug 10, 2022 · 1 comment
Open

Implement application-level retries #448

adutra opened this issue Aug 10, 2022 · 1 comment
Milestone

Comments

@adutra
Copy link
Contributor

adutra commented Aug 10, 2022

The transition from driver 3.x to driver 4.x a while ago in DSBulk 1.5.0 brought one unexpected consequence: client-side timeouts are now global to the whole statement execution. The driver docs say:

Unlike 3.x, the request timeout now spans the entire request. In other words, it's the maximum amount of time that session.execute will take, including any retry, speculative execution, etc.

What the docs don't say is that because the timeout is global to the session.execute call, timeouts are not retried anymore. The solution to this problem is now to use speculative executions.

Users of DSBulk can use speculative executions if they wish (they are disabled by default). But because speculative executions are hard to tune, and also because they don't work with rate limiters (see #447), I think it would be nice to implement a form of application-level retry when the statement execution fails.

Thanks to the Reactor framework, such a feature could certainly be implemented very easily, using the retryWhen operator, e.g. in LoadWorkflow:

private Flux<WriteResult> executeStatements(Flux<? extends Statement<?>> stmts) {
  Retry spec = new Retry() {

    private Duration delay;
    private long maxRetries = 3;

    @Override
    public Publisher<Boolean> generateCompanion(Flux<RetrySignal> retrySignals) {
      return retrySignals.flatMap(
          signal -> {
            Mono<Boolean> retryDecision;
            if (signal.totalRetries() < maxRetries && signal.failure() instanceof DriverTimeoutException) {
              retryDecision = Mono.just(true);
              if (delay != null) {
                retryDecision = retryDecision.delayElement(delay);
              }
            } else {
                retryDecision = Mono.error(signal.failure());
            }
            return retryDecision;
            });
    }
  };
  return dryRun
      ? stmts.map(EmptyWriteResult::new)
      : stmts.flatMap(statement -> Flux.from(executor.writeReactive(statement)).retryWhen(spec), writeConcurrency);
}

┆Issue is synchronized with this Jira Task by Unito

@adutra
Copy link
Contributor Author

adutra commented Aug 10, 2022

Duplicate of #443, sorry for that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants