2020/11/15
注:源代码为Flink1.11.0版本
Checkpoint是Flink容错机制的核心。在Checkpoint过程中,Flink算子会生成算子中状态的快照(snapshotting)并存储到状态后端中;在算子从故障中恢复时,可以通过快照恢复故障前的状态,从而实现流式处理的Exactly-Once或At-Least-Once语义。状态快照由数据流中的Barrier触发。在Flink运行时,数据源会定期向数据流中插入Barrier,当算子收到Barrier时,即开始进行状态快照。Flink在1.11版本加入了Unaligned Checkpointing机制,允许具有多个上游的算子在全部Barrier到达之前进行Checkpoint流程。Flink的Checkpoint机制的理论基础在Lightweight Asynchronous Snapshots for Distributed Dataflows中进行了详细解释,该机制由Chandy-Lamport算法启发得到(在加入Unaligned Checkpointing后更接近Chandy-Lamport算法了)
系统的Checkpointing由JobManager中的CheckpointCoordinator#triggerCheckpoint(CheckpointProperties, String, boolean, boolean)方法启动。如果系统设置了env.enableCheckpointing(long interval),triggerCheckpoint方法会由定时任务根据设置的间隔触发triggerCheckpoint(boolean);同时用户可以通过手动启动一个savepoint来触发checkpointing(triggerSavepointInternal方法)。
// CheckpointCoordinator.class第483行
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic) {
return triggerCheckpoint(checkpointProperties, null, isPeriodic, false);
}
@VisibleForTesting
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
boolean advanceToEndOfTime) {
if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
return FutureUtils.completedExceptionally(new IllegalArgumentException(
"Only synchronous savepoints are allowed to advance the watermark to MAX."));
}
CheckpointTriggerRequest request = new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic, advanceToEndOfTime);
requestDecider
.chooseRequestToExecute(request, isTriggering, lastCheckpointCompletionRelativeTime)
.ifPresent(this::startTriggeringCheckpoint);
return request.onCompletionPromise;
}Checkpointing请求首先会由CheckpointRequestDecider#chooseRequestToExecute决定具体执行等待队列中的哪一个checkpointing请求。Decider总是会从等待队列中选择一个优先级最高的请求(savepoint对应的请求优先级总是高于定时checkpointing任务发出的请求,因此如果等待队列的最低优先级元素是非定时任务请求,意味着等待队列中所有的请求都是savepoint请求,新的请求就会被丢弃并且返回空值)。
CheckpointRequestDecider
// CheckpointRequestDecider.class第96行
class CheckpointRequestDecider {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointRequestDecider.class);
private static final int LOG_TIME_IN_QUEUE_THRESHOLD_MS = 100;
private static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
private final int maxConcurrentCheckpointAttempts;
private final Consumer<Long> rescheduleTrigger;
private final Clock clock;
private final long minPauseBetweenCheckpoints;
private final Supplier<Integer> pendingCheckpointsSizeSupplier;
private final Object lock;
@GuardedBy("lock")
private final NavigableSet<CheckpointTriggerRequest> queuedRequests = new TreeSet<>(checkpointTriggerRequestsComparator());
private final int maxQueuedRequests;
CheckpointRequestDecider(
int maxConcurrentCheckpointAttempts,
Consumer<Long> rescheduleTrigger,
Clock clock,
long minPauseBetweenCheckpoints,
Supplier<Integer> pendingCheckpointsSizeSupplier,
Object lock) {
this(
maxConcurrentCheckpointAttempts,
rescheduleTrigger,
clock,
minPauseBetweenCheckpoints,
pendingCheckpointsSizeSupplier,
lock,
DEFAULT_MAX_QUEUED_REQUESTS
);
}
CheckpointRequestDecider(
int maxConcurrentCheckpointAttempts,
Consumer<Long> rescheduleTrigger,
Clock clock,
long minPauseBetweenCheckpoints,
Supplier<Integer> pendingCheckpointsSizeSupplier,
Object lock,
int maxQueuedRequests) {
Preconditions.checkArgument(maxConcurrentCheckpointAttempts > 0);
Preconditions.checkArgument(maxQueuedRequests > 0);
this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
this.rescheduleTrigger = rescheduleTrigger;
this.clock = clock;
this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.pendingCheckpointsSizeSupplier = pendingCheckpointsSizeSupplier;
this.lock = lock;
this.maxQueuedRequests = maxQueuedRequests;
}
Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean isTriggering, long lastCompletionMs) {
synchronized (lock) {
if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) {
// there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one
newRequest.completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
return Optional.empty();
} else {
queuedRequests.add(newRequest);
if (queuedRequests.size() > maxQueuedRequests) {
queuedRequests.pollLast().completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
}
Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
request.ifPresent(CheckpointRequestDecider::logInQueueTime);
return request;
}
}
}
Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) {
synchronized (lock) {
Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs);
request.ifPresent(CheckpointRequestDecider::logInQueueTime);
return request;
}
}
/**
* Choose the next {@link CheckpointTriggerRequest request} to execute based on the provided candidate and the
* current state. Acquires a lock and may update the state.
* @return request to execute, if any.
*/
private Optional<CheckpointTriggerRequest> chooseRequestToExecute(boolean isTriggering, long lastCompletionMs) {
Preconditions.checkState(Thread.holdsLock(lock));
if (isTriggering || queuedRequests.isEmpty()) {
return Optional.empty();
}
if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
return Optional.of(queuedRequests.first())
.filter(CheckpointTriggerRequest::isForce)
.map(unused -> queuedRequests.pollFirst());
}
long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
if (nextTriggerDelayMillis > 0) {
return onTooEarly(nextTriggerDelayMillis);
}
return Optional.of(queuedRequests.pollFirst());
}
private Optional<CheckpointTriggerRequest> onTooEarly(long nextTriggerDelayMillis) {
CheckpointTriggerRequest first = queuedRequests.first();
if (first.isForce()) {
return Optional.of(queuedRequests.pollFirst());
} else if (first.isPeriodic) {
queuedRequests.pollFirst().completeExceptionally(new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
rescheduleTrigger.accept(nextTriggerDelayMillis);
return Optional.empty();
} else {
return Optional.empty();
}
}
private long nextTriggerDelayMillis(long lastCheckpointCompletionRelativeTime) {
return lastCheckpointCompletionRelativeTime - clock.relativeTimeMillis() + minPauseBetweenCheckpoints;
}
@VisibleForTesting
@Deprecated
PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
synchronized (lock) {
return new PriorityQueue<>(queuedRequests);
}
}
void abortAll(CheckpointException exception) {
Preconditions.checkState(Thread.holdsLock(lock));
while (!queuedRequests.isEmpty()) {
queuedRequests.pollFirst().completeExceptionally(exception);
}
}
int getNumQueuedRequests() {
synchronized (lock) {
return queuedRequests.size();
}
}
private static Comparator<CheckpointTriggerRequest> checkpointTriggerRequestsComparator() {
return (r1, r2) -> {
if (r1.props.isSavepoint() != r2.props.isSavepoint()) {
return r1.props.isSavepoint() ? -1 : 1;
} else if (r1.isForce() != r2.isForce()) {
return r1.isForce() ? -1 : 1;
} else if (r1.isPeriodic != r2.isPeriodic) {
return r1.isPeriodic ? 1 : -1;
} else if (r1.timestamp != r2.timestamp) {
return Long.compare(r1.timestamp, r2.timestamp);
} else {
return Integer.compare(identityHashCode(r1), identityHashCode(r2));
}
};
}
private static void logInQueueTime(CheckpointTriggerRequest request) {
if (LOG.isInfoEnabled()) {
long timeInQueue = request.timestamp - currentTimeMillis();
if (timeInQueue > LOG_TIME_IN_QUEUE_THRESHOLD_MS) {
LOG.info("checkpoint request time in queue: {}", timeInQueue);
}
}
}
}在获得要执行的checkpointing请求后,由CheckpointCoordinator#startTriggeringCheckpoint启动checkpointing:
// CheckpointCoordinator.class第506行
private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
try {
synchronized (lock) {
preCheckGlobalState(request.isPeriodic);
}
final Execution[] executions = getTriggerExecutions();
final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();
// we will actually trigger this checkpoint!
Preconditions.checkState(!isTriggering);
isTriggering = true;
final long timestamp = System.currentTimeMillis();
final CompletableFuture<PendingCheckpoint> pendingCheckpointCompletableFuture =
initializeCheckpoint(request.props, request.externalSavepointLocation)
.thenApplyAsync(
(checkpointIdAndStorageLocation) -> createPendingCheckpoint(
timestamp,
request.props,
ackTasks,
request.isPeriodic,
checkpointIdAndStorageLocation.checkpointId,
checkpointIdAndStorageLocation.checkpointStorageLocation,
request.getOnCompletionFuture()),
timer);
final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture
.thenCompose(this::snapshotMasterState);
final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
.thenComposeAsync((pendingCheckpoint) ->
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
FutureUtils.assertNoException(
CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
.handleAsync(
(ignored, throwable) -> {
final PendingCheckpoint checkpoint =
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
Preconditions.checkState(
checkpoint != null || throwable != null,
"Either the pending checkpoint needs to be created or an error must have been occurred.");
if (throwable != null) {
// the initialization might not be finished yet
if (checkpoint == null) {
onTriggerFailure(request, throwable);
} else {
onTriggerFailure(checkpoint, throwable);
}
} else {
if (checkpoint.isDiscarded()) {
onTriggerFailure(
checkpoint,
new CheckpointException(
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
checkpoint.getFailureCause()));
} else {
// no exception, no discarding, everything is OK
final long checkpointId = checkpoint.getCheckpointId();
snapshotTaskState(
timestamp,
checkpointId,
checkpoint.getCheckpointStorageLocation(),
request.props,
executions,
request.advanceToEndOfTime);
coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
onTriggerSuccess();
}
}
return null;
},
timer)
.exceptionally(error -> {
if (!isShutdown()) {
throw new CompletionException(error);
} else if (error instanceof RejectedExecutionException) {
LOG.debug("Execution rejected during shutdown");
} else {
LOG.warn("Error encountered during shutdown", error);
}
return null;
}));
} catch (Throwable throwable) {
onTriggerFailure(request, throwable);
}
}启动checkpointing的整个过程是异步的,包括以下几个步骤:
- 通过
initializeCheckpoint方法初始化checkpoint并通过createPendingCheckpoint方法构造一个pendingCheckpoint(已经启动、但没有收到所有task ack的checkpoint) - 通过
snapshotMasterState方法快照master的状态 - (与2不分先后)通过
OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion通知并触发算子协调器的checkpointing流程(只有Source算子有对应的协调器SourceCoordinator),该操作会关闭OperatorEventValve - 通过
snapshotTaskState方法使用RPC调用每个source算子的task executor执行Task#triggerCheckpointBarrier向数据流中插入Barrier - 调用所有算子协调器checkpoint上下文的
OperatorCoordinatorCheckpointContext#afterSourceBarrierInjection方法重新打开OperatorEventValve
初始化checkpoint包括initializeCheckpoint和createPendingCheckpoint两个部分,其中前者负责确定checkpoint的ID和存储位置CheckpointIdAndStorageLocation(POJO);后者创建一个PendingCheckpoint类实例,保存整个checkpointing过程中checkpoint元数据。PendingCheckpoint包含的元数据如下:
PendingCheckpoint实例变量
// PendingCheckpoint.class第93行
private final Object lock = new Object();
private final JobID jobId;
private final long checkpointId;
private final long checkpointTimestamp;
private final Map<OperatorID, OperatorState> operatorStates;
private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
private final Set<OperatorID> notYetAcknowledgedOperatorCoordinators;
private final List<MasterState> masterStates;
private final Set<String> notYetAcknowledgedMasterStates;
private final Set<ExecutionAttemptID> acknowledgedTasks;
private final CheckpointProperties props;
private final CheckpointStorageLocation targetLocation;
private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
private final Executor executor;
private int numAcknowledgedTasks;
private boolean discarded;
@Nullable
private PendingCheckpointStats statsCallback;
private volatile ScheduledFuture<?> cancellerHandle;
private CheckpointException failureCause;这些元数据从变量名中基本能看出其对应的信息,值得注意的是operatorStates。这个Map保存的OperatorState并不是在State系列前两篇中提到Operator State,而是一个算子在物理层面上的状态,包括该算子在实际运行中各个并行子任务实例的Operator State和Keyed State快照:
OperatorState实例变量和OperatorSubtaskState实例变量
// OperatorState.class第46行
private final OperatorID operatorID;
private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;
@Nullable
private ByteStreamStateHandle coordinatorState;
private final int parallelism;
private final int maxParallelism;
// OperatorSubtaskState.class第67行
@Nonnull
private final StateObjectCollection<OperatorStateHandle> managedOperatorState;
/**
* Snapshot written using {@link org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream}.
*/
@Nonnull
private final StateObjectCollection<OperatorStateHandle> rawOperatorState;
/**
* Snapshot from {@link org.apache.flink.runtime.state.KeyedStateBackend}.
*/
@Nonnull
private final StateObjectCollection<KeyedStateHandle> managedKeyedState;
/**
* Snapshot written using {@link org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}.
*/
@Nonnull
private final StateObjectCollection<KeyedStateHandle> rawKeyedState;
@Nonnull
private final StateObjectCollection<InputChannelStateHandle> inputChannelState;
@Nonnull
private final StateObjectCollection<ResultSubpartitionStateHandle> resultSubpartitionState;
/**
* The state size. This is also part of the deserialized state handle.
* We store it here in order to not deserialize the state handle when
* gathering stats.
*/
private final long stateSize;snapshotMasterState方法会触发所有Master钩子的triggerCheckpoint方法。当一个UDF的Source算子使用ExternallyInducedSource接口实现的UDF source方法,在创建计算流图时会将调用WithMasterCheckpointHook#createMasterTriggerRestoreHook方法创建一个Master钩子,并添加到CheckpointCoordinator中。ExternallyInducedSource接口的实现在系统Checkpointing时不会触发checkpoint,而是根据从数据源接收到的数据/元素决定何时触发一个checkpoint。因此,在Flink进行checkpointing时,Flink会使用钩子要求数据源准备一个checkpoint数据/元素,而实际该source触发checkpoint的时间依然是根据收到的数据确定的(如果数据源确实立即准备了一个checkpoint数据/元素,那么实际的checkpoint时间不会与Flink的checkpointing结束相差很多)。
snapshotMasterState
// CheckpointCoordinator.class第696行
private CompletableFuture<Void> snapshotMasterState(PendingCheckpoint checkpoint) {
if (masterHooks.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
final long checkpointID = checkpoint.getCheckpointId();
final long timestamp = checkpoint.getCheckpointTimestamp();
final CompletableFuture<Void> masterStateCompletableFuture = new CompletableFuture<>();
for (MasterTriggerRestoreHook<?> masterHook : masterHooks.values()) {
MasterHooks
.triggerHook(masterHook, checkpointID, timestamp, executor)
.whenCompleteAsync(
(masterState, throwable) -> {
try {
synchronized (lock) {
if (masterStateCompletableFuture.isDone()) {
return;
}
if (checkpoint.isDiscarded()) {
throw new IllegalStateException(
"Checkpoint " + checkpointID + " has been discarded");
}
if (throwable == null) {
checkpoint.acknowledgeMasterState(
masterHook.getIdentifier(), masterState);
if (checkpoint.areMasterStatesFullyAcknowledged()) {
masterStateCompletableFuture.complete(null);
}
} else {
masterStateCompletableFuture.completeExceptionally(throwable);
}
}
} catch (Throwable t) {
masterStateCompletableFuture.completeExceptionally(t);
}
},
timer);
}
return masterStateCompletableFuture;
}OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion方法会触发所有算子协调器(位于JobManager中)的checkpointing流程:
OperatorCoordinatorCheckpoints
// OperatorCoordinatorCheckpoints.class
final class OperatorCoordinatorCheckpoints {
public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoint(
final OperatorCoordinatorCheckpointContext coordinatorContext,
final long checkpointId) throws Exception {
final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
coordinatorContext.checkpointCoordinator(checkpointId, checkpointFuture);
return checkpointFuture.thenApply(
(state) -> new CoordinatorSnapshot(
coordinatorContext, new ByteStreamStateHandle(coordinatorContext.operatorId().toString(), state))
);
}
public static CompletableFuture<AllCoordinatorSnapshots> triggerAllCoordinatorCheckpoints(
final Collection<OperatorCoordinatorCheckpointContext> coordinators,
final long checkpointId) throws Exception {
final Collection<CompletableFuture<CoordinatorSnapshot>> individualSnapshots = new ArrayList<>(coordinators.size());
for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {
final CompletableFuture<CoordinatorSnapshot> checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
individualSnapshots.add(checkpointFuture);
}
return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
}
public static CompletableFuture<Void> triggerAndAcknowledgeAllCoordinatorCheckpoints(
final Collection<OperatorCoordinatorCheckpointContext> coordinators,
final PendingCheckpoint checkpoint,
final Executor acknowledgeExecutor) throws Exception {
final CompletableFuture<AllCoordinatorSnapshots> snapshots =
triggerAllCoordinatorCheckpoints(coordinators, checkpoint.getCheckpointId());
return snapshots
.thenAcceptAsync(
(allSnapshots) -> {
try {
acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots);
}
catch (Exception e) {
throw new CompletionException(e);
}
},
acknowledgeExecutor);
}
public static CompletableFuture<Void> triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
final Collection<OperatorCoordinatorCheckpointContext> coordinators,
final PendingCheckpoint checkpoint,
final Executor acknowledgeExecutor) throws CompletionException {
try {
return triggerAndAcknowledgeAllCoordinatorCheckpoints(coordinators, checkpoint, acknowledgeExecutor);
} catch (Exception e) {
throw new CompletionException(e);
}
}
// ------------------------------------------------------------------------
private static void acknowledgeAllCoordinators(PendingCheckpoint checkpoint, Collection<CoordinatorSnapshot> snapshots) throws CheckpointException {
for (final CoordinatorSnapshot snapshot : snapshots) {
final PendingCheckpoint.TaskAcknowledgeResult result =
checkpoint.acknowledgeCoordinatorState(snapshot.coordinator, snapshot.state);
if (result != PendingCheckpoint.TaskAcknowledgeResult.SUCCESS) {
final String errorMessage = "Coordinator state not acknowledged successfully: " + result;
final Throwable error = checkpoint.isDiscarded() ? checkpoint.getFailureCause() : null;
if (error != null) {
throw new CheckpointException(errorMessage, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, error);
} else {
throw new CheckpointException(errorMessage, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
}
}
}
}
// ------------------------------------------------------------------------
static final class AllCoordinatorSnapshots {
private final Collection<CoordinatorSnapshot> snapshots;
AllCoordinatorSnapshots(Collection<CoordinatorSnapshot> snapshots) {
this.snapshots = snapshots;
}
public Iterable<CoordinatorSnapshot> snapshots() {
return snapshots;
}
}
static final class CoordinatorSnapshot {
final OperatorInfo coordinator;
final ByteStreamStateHandle state;
CoordinatorSnapshot(OperatorInfo coordinator, ByteStreamStateHandle state) {
this.coordinator = coordinator;
this.state = state;
}
}
}首先执行每个算子协调器的OperatorCoordinatorCheckpointContext#checkpointCoordinator方法得到每个算子协调器的ack,然后将异步结果转换成算子协调器快照CoordinatorSnapshot(包含协调器上下文、以及由算子ID字符串和算子协调器ack的checkpoint ID字节数组组成的字节流句柄),最后将所有的快照合并成AllCoordinatorSnapshots。其中算子协调器ack的过程如下:
// OperatorCoordinatorHolder.class第202行
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
mainThreadExecutor.execute(() -> checkpointCoordinatorInternal(checkpointId, result));
}
// OperatorCoordinatorHolder.class第233行
private void checkpointCoordinatorInternal(final long checkpointId, final CompletableFuture<byte[]> result) {
mainThreadExecutor.assertRunningInMainThread();
// synchronously!!!, with the completion, we need to shut the event valve
result.whenComplete((success, failure) -> {
if (failure != null) {
result.completeExceptionally(failure);
} else {
try {
eventValve.shutValve(checkpointId);
result.complete(success);
} catch (Exception e) {
result.completeExceptionally(e);
}
}
});
try {
eventValve.markForCheckpoint(checkpointId);
coordinator.checkpointCoordinator(checkpointId, result);
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
result.completeExceptionally(t);
globalFailureHandler.accept(t);
}
}这部分的实际执行顺序是如下:
eventValve标记checkpoint ID- 算子协调器触发ack checkpoint(正常执行时
SourceCoordinator#checkpointCoordinator在CompletableFuture中返回checkpoint ID的byte数组,否则抛出异常) eventValue收到ack,关闭阀
OperatorEventValve是一个从OperatorCoordinator(JobManager侧)向OperatorEventHandle(Operator侧)发送算子事件的控制器,决定了算子事件实际是发出还是在阀中缓存(待阀打开后再发出)。
当上述两个步骤都正确完成后,CheckpointCoordinator通过snapshotTaskState方法调用RPC通知Source算子向数据流中插入Barrier。跟踪snapshotTaskState方法内的方法调用栈,其实际插入Barrier的执行部分为SubtaskCheckpointCoordinatorImpl#checkpointState(所有的checkpoint操作均由checkpointCoordinator发起,job级别是CheckpointCoordinator,task级别是SubtaskCheckpointCoordinator(Impl)):
CheckpointCoordinator#snapshotTaskStateExecution#triggerCheckpointorExecution#triggerSynchronousSavepointExecution#triggerCheckpointHelperRpcTaskManagerGateway#triggerCheckpointTaskExecutor#triggerCheckpointTask#triggerCheckpointBarrierStreamTask#triggerCheckpointAsyncStreamTask#triggerCheckpoint(在这一步初始化算子checkpoint)StreamTask#performCheckpoint(算子Checkpointing开始)SubtaskCheckpointCoordinatorImpl#checkpointState
// SubtaskCheckpointCoordinatorImpl.class第216行
public void checkpointState(
CheckpointMetaData metadata,
CheckpointOptions options,
CheckpointMetrics metrics,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isCanceled) throws Exception {
checkNotNull(options);
checkNotNull(metrics);
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
if (lastCheckpointId >= metadata.getCheckpointId()) {
LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true);
checkAndClearAbortedStatus(metadata.getCheckpointId());
return;
}
// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.
operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
return;
}
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
options.isUnalignedCheckpoint());
// Step (3): Prepare to spill the in-flight buffers for input and output
if (options.isUnalignedCheckpoint()) {
prepareInflightDataSnapshot(metadata.getCheckpointId());
}
// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact progress of the
// streaming topology
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
try {
if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {
finishAndReportAsync(snapshotFutures, metadata, metrics, options);
} else {
cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
}
} catch (Exception ex) {
cleanup(snapshotFutures, metadata, metrics, ex);
throw ex;
}
}具体发出Barrier的过程如下:
- 如果上一个checkpoint是失败的checkpoint(
abortedCheckpointIds中存在上一个checkpoint id),向下游算子广播中止上一个checkpoint的事件CancelCheckpointMarker。注意operatorChain.broadcastEvent虽然和Watermark、数据元素使用了同一个RecordWriterOutput,但走的不是同一个路径(Watermark、数据元素等StreamElement使用的是XXXemit方法) - 告知Source以及和Source链接在一起的所有算子准备Barrier前的快照(在AbstractStreamOperator中该过程是不做任何事情的,没有发现哪个算子override了该方法)
- 向下游算子广播Barrier事件
CheckpointBarrier。 - 如果是Unaligned Barrier,对正在发送过程中的数据元素进行快照(见Unaligned Barrier章)
- 调用
takeSnapshotSync方法对算子状态进行快照(见Snapshotting章),如果在这个步骤发生错误,清理失败的快照并向channelStateWriter发出checkpoint失败消息
在成功插入Barrier后,CheckpointCoordinator执行OperatorCoordinatorCheckpointContext#afterSourceBarrierInjection方法进行后处理,重新打开OperatorEventValve。
// OperatorCoordinatorHolder.class第265行
public void afterSourceBarrierInjection(long checkpointId) {
// this method is commonly called by the CheckpointCoordinator's executor thread (timer thread).
// we ideally want the scheduler main-thread to be the one that sends the blocked events
// however, we need to react synchronously here, to maintain consistency and not allow
// another checkpoint injection in-between (unlikely, but possible).
// fortunately, the event-sending goes pretty much directly to the RPC gateways, which are
// thread safe.
// this will automatically be fixed once the checkpoint coordinator runs in the
// scheduler's main thread executor
eventValve.openValveAndUnmarkCheckpoint();
}
// OperatorEventValve.class第149行
public void openValveAndUnmarkCheckpoint() {
final ArrayList<FuturePair> futures;
// send all events under lock, so that no new event can sneak between
synchronized (lock) {
currentCheckpointId = NO_CHECKPOINT;
if (!shut) {
return;
}
futures = new ArrayList<>(blockedEvents.size());
for (List<BlockedEvent> eventsForTask : blockedEvents.values()) {
for (BlockedEvent blockedEvent : eventsForTask) {
final CompletableFuture<Acknowledge> ackFuture = eventSender.apply(blockedEvent.event, blockedEvent.subtask);
futures.add(new FuturePair(blockedEvent.future, ackFuture));
}
}
blockedEvents.clear();
shut = false;
}
// apply the logic on the future outside the lock, to be safe
for (FuturePair pair : futures) {
FutureUtils.forward(pair.ackFuture, pair.originalFuture);
}
}重新打开事件阀后,OperatorEventValve将所有缓冲的事件按顺序逐一发出。虽然使用CompletableFuture异步,但Flink底层的RPC机制保证了事件的顺序。