Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36455] Sinks retry synchronously [1.20] #25661

Open
wants to merge 6 commits into
base: release-1.20
Choose a base branch
from

Commits on Nov 15, 2024

  1. [FLINK-36368] Do not prematurely merge CommittableManager (apache#25405)

    When a sink contains a shuffle between writer and committer, a committer may receive committables coming from multiple subtasks. So far, we immediately merged them on receiving. However, that makes it later impossible to trace whether we received all messages from an upstream task.
    
    It also made rescaling of the committer awkward: during normal processing all committables of a committer have the same subtaskId as the committer. On downscale, these subtaskIds suddenly don't match and need to be replaced, which we solved by merging the SubtaskCommittableManagers.
    
    This commit decouples the collection of committables from changing the subtaskId for emission. Committables retain the upstream subtask id in the CommittableCollection, which survives serialization and deserialization. Only upon emission, we substitute the subtask id with the one of the emitting committer.
    
    This is, in particular, useful for a global committer, where all subtasks are collected. As a side fix, the new serialization also contains the numberOfSubtasks such that different checkpoints may have different degree of parallelism.
    
    The old approach probably has edge cases where scaling a UC would result in stalled pipelines because certain metadata doesn't match. This would not affect pipelines which  chain Writer/Committer (no channel state), Writer and Committer have same DOP (results in a Forward channel, which doesn't use UC for exactly these reasons), and a non-keyed shuffles (because they don't provide any guarantees). Since a keyed shuffle must use the subtask id of the committables, the new approach should be safe. However, since we disabled UC entirely for sinks to adhere to the contract of notifyCheckpointComplete, this shouldn't matter going forward. It's still important to consider these cases though for restoring from Flink 1 checkpoints.
    
    (cherry picked from commit 8dc212c)
    Update version to 3.4.0
    AHeise committed Nov 15, 2024
    Configuration menu
    Copy the full SHA
    876774b View commit details
    Browse the repository at this point in the history
  2. [FLINK-36379] Refactor sink test assertions

    Use more of the assertj native patterns to compare results.
    AHeise committed Nov 15, 2024
    Configuration menu
    Copy the full SHA
    29002fe View commit details
    Browse the repository at this point in the history
  3. [FLINK-36379] Optimize global committers

    Global committers used to trail a full checkpoint behind committer. That means that data appeared only after >2*checkpoint interval in the sinks that use it (e.g. delta). However, committables in the global committers are already part of the first checkpoint and are idempotent: On recovery, they are resend from the committer to the global committer. Thus, the global committer can actually be seen as stateless and doesn't need to conduct its own 2PC protocol.
    
    This commit lets the global committer collect all committables on input (as before) but immediately tries to commit when it has received all (deducible from CommitterSummary - which was always the original intent of that message). Thus, in most cases, GlobalCommitter ignores notifyCheckpointCompleted now as the state of the checkpoint can be inferred by received committables from upstream.
    
    There are special cases where a global committer is directly chained to a writer. In this case, the global committer does need to conduct a 2PC protocol in place of the committer. To differentiate these cases, the global committer now has its own transformation.
    
    (cherry picked from commit 67be29a)
    AHeise committed Nov 15, 2024
    Configuration menu
    Copy the full SHA
    ecf1253 View commit details
    Browse the repository at this point in the history
  4. [FLINK-36379] Optimize committers with UC disabled

    Without UCs, a committer doesn't need to do anything on #processInput except collecting. It emits only on notifyCheckpointCompleted (or endInput for batch).
    
    We can also harden some contracts:
    * NotifyCheckpointCompleted can assert that all committables are received.
    * Emit committables downstream only if all committables are finished.
    
    (cherry picked from commit 7f40ab9)
    AHeise committed Nov 15, 2024
    Configuration menu
    Copy the full SHA
    cfcf7a9 View commit details
    Browse the repository at this point in the history
  5. [FLINK-36455] Sinks retry synchronously

    Sinks so far retried asynchronously to increase commit throughput in case of temporary issues. However, the contract of notifyCheckpointCompleted states that checkpoints must be side-effect free meaning all transactions have to be committed on return of the PRC call.
    
    This commit retries a fixed number of times and then fails in notifyCheckpointCompleted.
    
    Note that sync retries significantly simplify the committable handling. This commit starts a few simplifications; the next commit clears up more.
    
    (cherry picked from commit bc0f241)
    AHeise committed Nov 15, 2024
    Configuration menu
    Copy the full SHA
    b27a037 View commit details
    Browse the repository at this point in the history
  6. [FLINK-36455] Fix PendingCommittable metric in sink

    We can only set the gauge once.
    
    (cherry picked from commit 21c344c)
    AHeise committed Nov 15, 2024
    Configuration menu
    Copy the full SHA
    47477d9 View commit details
    Browse the repository at this point in the history