Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37375][checkpointing] Checkpoint supports the Operator to cust… #26330

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

/**
* This class is an implementation for StateSnapshotContext, used for asynchronous snapshot state.
*/
public class StateSnapshotContextAsynchronousImpl implements StateSnapshotContext {

/** Checkpoint id of the snapshot. */
private final long checkpointId;

/** Checkpoint timestamp of the snapshot. */
private final long checkpointTimestamp;

public StateSnapshotContextAsynchronousImpl(long checkpointId, long checkpointTimestamp) {
this.checkpointId = checkpointId;
this.checkpointTimestamp = checkpointTimestamp;
}

@Override
public long getCheckpointId() {
return checkpointId;
}

@Override
public long getCheckpointTimestamp() {
return checkpointTimestamp;
}

@Override
public KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput() throws Exception {
return null;
}

@Override
public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,16 @@ public interface CheckpointedFunction {
* @throws Exception Thrown, if state could not be created ot restored.
*/
void initializeState(FunctionInitializationContext context) throws Exception;

/**
* This method is called when a snapshot for a checkpoint is requested. Execution of this method
* does not block the main thread, as it is performed in an asynchronous thread pool.
*
* @param context the context for drawing a snapshot of the operator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what drawing means here. Maybe simplify to the context from which a snapshot of the operator is created

* @throws Exception Thrown, if asyncOperate fails to execute. A RetriableAsyncOperateException
* thrown by this method indicates that the current checkpoint will fail but the Async
* operation can be retried in next checkpoint. Other Exceptions indicate that both the
* current checkpoint and the task will fail.
*/
default void asyncOperate(FunctionSnapshotContext context) throws Exception {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.checkpoint;

import org.apache.flink.annotation.Public;

/**
* Throwing an exception in AsyncOperate causes the task to fail by default. If you do not want the
* task to fail, you can throw this RetriableAsyncOperateException. In this case, the current
* checkpoint will fail, and the AsyncOperate logic will try again the next checkpoint.
*/
@Public
public class RetriableAsyncOperateException extends RuntimeException {

private static final long serialVersionUID = 1L;

public RetriableAsyncOperateException(String message, Throwable cause) {
super(message, cause);
}

public RetriableAsyncOperateException(String message) {
super(message);
}

public RetriableAsyncOperateException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
Expand Down Expand Up @@ -72,6 +73,7 @@
import java.util.Collections;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.RunnableFuture;

import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -422,6 +424,20 @@ public OperatorSnapshotFutures snapshotState(
isAsyncStateProcessingEnabled());
}

/**
* Stream operators with state, which want to participate in an asynchronous snapshot need to
* override this hook method.
*
* @param context context that provides information and means required for taking a snapshot
* @throws Exception Thrown and task will be failed, if state could not be created ot restored.
* If you don't want to task fail, can throw a RetriableAsyncOperateException, this will
* only cause the failure of Checkpoint.
*/
@Override
public RunnableFuture<Void> asyncOperate(StateSnapshotContext context) throws Exception {
return DoneFuture.of(null);
}

/**
* Stream operators with state, which want to participate in a snapshot need to override this
* hook method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
Expand Down Expand Up @@ -68,6 +69,7 @@
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.RunnableFuture;

import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -347,6 +349,20 @@ public OperatorSnapshotFutures snapshotState(
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {}

/**
* Stream operators with state, which want to participate in an asynchronous snapshot need to
* override this hook method.
*
* @param context context that provides information and means required for taking a snapshot
* @throws Exception Thrown and task will be failed, if state could not be created ot restored.
* If you don't want to task fail, can throw a RetriableAsyncOperateException, this will
* only cause the failure of Checkpoint.
*/
@Override
public RunnableFuture<Void> asyncOperate(StateSnapshotContext context) throws Exception {
return DoneFuture.of(null);
}

/**
* Stream operators with state which can be restored need to override this hook method.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;

import java.util.concurrent.FutureTask;

import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -95,6 +97,11 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
context, getOperatorStateBackend(), userFunction);
}

@Override
public FutureTask<Void> asyncOperate(StateSnapshotContext context) throws Exception {
return StreamingFunctionUtils.getAsyncOperateFunctionStateFuture(context, userFunction);
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import org.apache.flink.shaded.guava33.com.google.common.io.Closer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.util.ArrayList;
Expand All @@ -42,6 +45,8 @@
/** Result of {@link StreamOperator#snapshotState}. */
public class OperatorSnapshotFutures {

private static final Logger LOG = LoggerFactory.getLogger(OperatorSnapshotFutures.class);

@Nonnull private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;

@Nonnull private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;
Expand All @@ -57,14 +62,17 @@ public class OperatorSnapshotFutures {
private Future<SnapshotResult<StateObjectCollection<OutputStateHandle>>>
resultSubpartitionStateFuture;

private RunnableFuture<Void> asyncOperateFuture;

public OperatorSnapshotFutures() {
this(
DoneFuture.of(SnapshotResult.empty()),
DoneFuture.of(SnapshotResult.empty()),
DoneFuture.of(SnapshotResult.empty()),
DoneFuture.of(SnapshotResult.empty()),
DoneFuture.of(SnapshotResult.empty()),
DoneFuture.of(SnapshotResult.empty()));
DoneFuture.of(SnapshotResult.empty()),
DoneFuture.of(null));
}

public OperatorSnapshotFutures(
Expand All @@ -78,12 +86,35 @@ public OperatorSnapshotFutures(
@Nonnull
Future<SnapshotResult<StateObjectCollection<OutputStateHandle>>>
resultSubpartitionStateFuture) {
this(
keyedStateManagedFuture,
keyedStateRawFuture,
operatorStateManagedFuture,
operatorStateRawFuture,
inputChannelStateFuture,
resultSubpartitionStateFuture,
DoneFuture.of(null));
}

public OperatorSnapshotFutures(
@Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture,
@Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture,
@Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture,
@Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture,
@Nonnull
Future<SnapshotResult<StateObjectCollection<InputStateHandle>>>
inputChannelStateFuture,
@Nonnull
Future<SnapshotResult<StateObjectCollection<OutputStateHandle>>>
resultSubpartitionStateFuture,
RunnableFuture<Void> asyncOperateFuture) {
this.keyedStateManagedFuture = keyedStateManagedFuture;
this.keyedStateRawFuture = keyedStateRawFuture;
this.operatorStateManagedFuture = operatorStateManagedFuture;
this.operatorStateRawFuture = operatorStateRawFuture;
this.inputChannelStateFuture = inputChannelStateFuture;
this.resultSubpartitionStateFuture = resultSubpartitionStateFuture;
this.asyncOperateFuture = asyncOperateFuture;
}

@Nonnull
Expand Down Expand Up @@ -154,10 +185,25 @@ public void setResultSubpartitionStateFuture(
this.resultSubpartitionStateFuture = resultSubpartitionStateFuture;
}

public RunnableFuture<Void> getAsyncOperateFuture() {
return asyncOperateFuture;
}

public void setAsyncOperateFuture(RunnableFuture<Void> asyncOperateFuture) {
this.asyncOperateFuture = asyncOperateFuture;
}

/**
* @return discarded state size (if available).
*/
public Tuple2<Long, Long> cancel() throws Exception {

// cancel async snapshot state task
if (asyncOperateFuture != null && !asyncOperateFuture.isDone()) {
asyncOperateFuture.cancel(true);
LOG.info("Async snapshot state task canceled.");
}

List<Tuple2<Future<? extends StateObject>, String>> pairs = new ArrayList<>();
pairs.add(new Tuple2<>(getKeyedStateManagedFuture(), "managed keyed"));
pairs.add(new Tuple2<>(getKeyedStateRawFuture(), "managed operator"));
Expand Down Expand Up @@ -194,7 +240,8 @@ public Future<?>[] getAllFutures() {
operatorStateManagedFuture,
operatorStateRawFuture,
inputChannelStateFuture,
resultSubpartitionStateFuture
resultSubpartitionStateFuture,
asyncOperateFuture
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextAsynchronousImpl;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.IOUtils;
Expand All @@ -69,6 +70,7 @@
import java.io.IOException;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.RunnableFuture;

import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -199,6 +201,9 @@ public OperatorSnapshotFutures snapshotState(
new StateSnapshotContextSynchronousImpl(
checkpointId, timestamp, factory, keyGroupRange, closeableRegistry);

StateSnapshotContextAsynchronousImpl asyncSnapshotContext =
new StateSnapshotContextAsynchronousImpl(checkpointId, timestamp);

snapshotState(
streamOperator,
timeServiceManager,
Expand All @@ -209,6 +214,7 @@ public OperatorSnapshotFutures snapshotState(
factory,
snapshotInProgress,
snapshotContext,
asyncSnapshotContext,
isUsingCustomRawKeyedState,
useAsyncState);

Expand All @@ -226,6 +232,7 @@ void snapshotState(
CheckpointStreamFactory factory,
OperatorSnapshotFutures snapshotInProgress,
StateSnapshotContextSynchronousImpl snapshotContext,
StateSnapshotContextAsynchronousImpl asyncSnapshotContext,
boolean isUsingCustomRawKeyedState,
boolean useAsyncState)
throws CheckpointException {
Expand Down Expand Up @@ -263,6 +270,9 @@ void snapshotState(
}
streamOperator.snapshotState(snapshotContext);

snapshotInProgress.setAsyncOperateFuture(
streamOperator.asyncOperate(asyncSnapshotContext));

snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(
snapshotContext.getOperatorStateStreamFuture());
Expand Down Expand Up @@ -486,5 +496,7 @@ public interface CheckpointedStreamOperator {
void initializeState(StateInitializationContext context) throws Exception;

void snapshotState(StateSnapshotContext context) throws Exception;

RunnableFuture<Void> asyncOperate(StateSnapshotContext context) throws Exception;
}
}
Loading