【Flink状态管理(八)】Checkpoint:CheckpointBarrier对齐后Checkpoint的完成、通知与对学习状态管理源码的思考

文章目录

  • 一. 调用StreamTask执行Checkpoint操作
    • 1. 执行Checkpoint总体代码流程
      • 1.1. StreamTask.checkpointState()
      • 1.2. executeCheckpointing
      • 1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中
      • 1.4. 算子状态进行快照
      • 1.5. 状态数据快照持久化
      • 二. CheckpointCoordinator管理Checkpoint
        • 1. Checkpoint执行完毕后的确认过程
        • 2. 触发并完成Checkpoint操作
        • 3. 通知CheckpointComplete给TaskExecutor
        • 三. 状态管理学习小结

          上文介绍了CheckpointBarrier的对齐操作,当CheckpointBarrier完成对齐操作后,接下来就是通过notifyCheckpoint()方法触发StreamTask节点的Checkpoint操作。

          一. 调用StreamTask执行Checkpoint操作

          如下代码,notifyCheckpoint()方法主要包含如下逻辑。

          > 1. 判断toNotifyOnCheckpoint不为空。
          > 2. 创建CheckpointMetaData和CheckpointMetrics实例,CheckpointMetaData用于存储
          > Checkpoint的元信息,CheckpointMetrics用于记录和监控Checkpoint监控指标。
          > 3. 触发StreamTask中算子的Checkpoint操作。
          protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier, 
                                          long bufferedBytes, 
                                          long alignmentDurationNanos) throws Exception { if (toNotifyOnCheckpoint != null) { // 创建CheckpointMetaData对象用于存储Meta信息
                CheckpointMetaData checkpointMetaData =
                   new CheckpointMetaData(checkpointBarrier.getId(), 
                                          checkpointBarrier.getTimestamp());
                      // 创建CheckpointMetrics对象用于记录监控指标
                CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
                   .setBytesBufferedInAlignment(bufferedBytes)
                   .setAlignmentDurationNanos(alignmentDurationNanos);
                // 调用toNotifyOnCheckpoint.triggerCheckpointOnBarrier()方法触发Checkpoint
                  操作
                toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
                   checkpointMetaData,
                   checkpointBarrier.getCheckpointOptions(),
                   checkpointMetrics);
             }
          }
          

          注意:StreamTask是唯一实现了Checkpoint方法的子类,即只有StreamTask才能触发当前Task实例中的Checkpoint操作。

           

          接下来具体看Checkpoint执行细节

          1. 执行Checkpoint总体代码流程

          Checkpoint触发过程分为两种情况:一种是CheckpointCoordinator周期性地触发数据源节点中的Checkpoint操作;另一种是下游算子通过对齐CheckpointBarrier事件触发本节点算子的Checkpoint操作。

          不管是哪种方式触发Checkpoint,最终都是调用StreamTask.performCheckpoint()方法实现StreamTask实例中状态数据的持久化操作。

           

          在StreamTask.performCheckpoint()方法中,首先判断当前的Task是否运行正常,然后使用actionExecutor线程池执行Checkpoint操作,Checkpoint的实际执行过程如下。

          1. Checkpoint执行前的准备操作,让OperatorChain中所有的Operator执行Pre-barrier工作。
          2. 将CheckpointBarrier事件发送到下游的节点中。
          3. 算子状态数据进行快照

          执行checkpointState()方法,对StreamTask中OperatorChain的所有算子进行状态数据的快照操作,该过程为异步非阻塞过程,不影响数据的正常处理进程,执行完成后会返回True到CheckpointInputGate中。

          1. task挂掉情况处理:
          • 如果isRunning的条件为false,表明Task不在运行状态,此时需要给OperatorChain中的所有算子发送CancelCheckpointMarker消息,这里主要借助recordWriter.broadcastEvent(message)方法向下游算子进行事件广播。
          • 当且仅当OperatorChain中的算子还没有执行完Checkpoint操作的时候,下游的算子接收到CancelCheckpointMarker消息后会立即取消Checkpoint操作。
          private boolean performCheckpoint(
                CheckpointMetaData checkpointMetaData,
                CheckpointOptions checkpointOptions,
                CheckpointMetrics checkpointMetrics,
                boolean advanceToEndOfTime) throws Exception { LOG.debug("Starting checkpoint ({}) {} on task {}",
                       checkpointMetaData.getCheckpointId(), 
                       checkpointOptions.getCheckpointType(), 
                       getName());
             final long checkpointId = checkpointMetaData.getCheckpointId();
             if (isRunning) { // 使用actionExecutor执行Checkpoint逻辑
                actionExecutor.runThrowing(() -> { if (checkpointOptions.getCheckpointType().isSynchronous()) { setSynchronousSavepointId(checkpointId);
                       if (advanceToEndOfTime) { advanceToEndOfEventTime();
                      }
                   }
                   //Checkpoint操作的准备工作
                   operatorChain.prepareSnapshotPreBarrier(checkpointId);
                   //将checkpoint barrier发送到下游的stream中
                   operatorChain.broadcastCheckpointBarrier(
                         checkpointId,
                         checkpointMetaData.getTimestamp(),
                         checkpointOptions);
                   //对算子中的状态进行快照操作,此步骤是异步操作,
                   //不影响streaming拓扑中数据的正常处理
                   checkpointState(checkpointMetaData, checkpointOptions, 
                      checkpointMetrics);
                });
                return true;
             } else { // 如果Task处于其他状态,则向下游广播CancelCheckpointMarker消息
                actionExecutor.runThrowing(() -> { final CancelCheckpointMarker message = 
                       new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
                   recordWriter.broadcastEvent(message);
                });
                return false;
             }
          }
          

           

          1.1. StreamTask.checkpointState()

          接下来我们看StreamTask.checkpointState()方法的具体实现,如下代码。

          1. 创建CheckpointStateOutputStream实例。主要有如下两种实现类:
            • FsCheckpointStateOutputStream:文件类型系统
            • MemoryCheckpointOutputStream:内存的数据流输出。
            • 创建CheckpointingOperation实例,CheckpointingOperation封装了Checkpoint执行的具体操作流程,以及checkpointMetaData、checkpointOptions、storage和checkpointMetrics等Checkpoint执行过程中需要的环境配置信息。
            • 调用CheckpointingOperation.executeCheckpointing()方法执行Checkpoint操作。
          private void checkpointState(
                CheckpointMetaData checkpointMetaData,
                CheckpointOptions checkpointOptions,
                CheckpointMetrics checkpointMetrics) throws Exception { // 创建CheckpointStreamFactory实例
             CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorag
                eLocation(
                   checkpointMetaData.getCheckpointId(),
                   checkpointOptions.getTargetLocation());
               // 创建CheckpointingOperation实例
             CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
                this,
                checkpointMetaData,
                checkpointOptions,
                storage,
                checkpointMetrics);
             // 执行Checkpoint操作
             checkpointingOperation.executeCheckpointing();
          }
          

           

          1.2. executeCheckpointing

          如代码所示,CheckpointingOperation.executeCheckpointing()方法主要包含如下逻辑。

          1. 遍历所有StreamOperator算子,然后调用checkpointStreamOperator()方法为每个算子创建OperatorSnapshotFuture对象。这一步将所有算子的快照操作存储在OperatorSnapshotFutures集合中。
          2. 将OperatorSnapshotFutures存储到operatorSnapshotsInProgress的键值对集合中,其中Key为OperatorID,Value为该算子执行状态快照操作对应的OperatorSnapshotFutures对象
          3. 创建AsyncCheckpointRunnable线程对象,AsyncCheckpointRunnable实例中包含了创建好的OperatorSnapshotFutures集合。
          4. 调用StreamTask.asyncOperationsThreadPool线程池运行asyncCheckpointRunnable线程,执行operatorSnapshotsInProgress集合中算子的异步快照操作。
          public void executeCheckpointing() throws Exception { //通过算子创建执行快照操作的OperatorSnapshotFutures对象
             for (StreamOperator op : allOperators) { checkpointStreamOperator(op);
             }
             // 此处省略部分代码
             startAsyncPartNano = System.nanoTime();
             checkpointMetrics.setSyncDurationMillis(
                (startAsyncPartNano - startSyncPartNano) / 1_000_000);
             AsyncCheckpointRunnable asyncCheckpointRunnable = new 
                AsyncCheckpointRunnable(
                owner,
                operatorSnapshotsInProgress,
                checkpointMetaData,
                checkpointMetrics,
                startAsyncPartNano);
             // 注册Closeable操作
             owner.cancelables.registerCloseable(asyncCheckpointRunnable);
             // 执行asyncCheckpointRunnable
                   owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
           }
          

           

          1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中

          如下代码,AbstractStreamOperator.snapshotState()方法将当前算子的状态快照操作封装在OperatorSnapshotFutures对象中,然后通过asyncOperationsThreadPool线程池异步触发所有的OperatorSnapshotFutures操作,方法主要步骤如下。

          1. 创建OperatorSnapshotFutures对象,封装当前算子对应的状态快照操作。
          2. 创建snapshotContext上下文对象,存储快照过程需要的上下文信息,并调用snapshotState()方法执行快照操作。

          snapshotState()方法由StreamOperator子类实现,例如在AbstractUdfStreamOperator中会调用StreamingFunctionUtils.snapshotFunctionState(context,getOperatorStateBackend(),

          userFunction)方法执行函数中的状态快照操作。

          1. 向snapshotInProgress中指定KeyedStateRawFuture和OperatorStateRawFuture,专门用于处理原生状态数据的快照操作。
          • 如果operatorStateBackend不为空,则将operatorStateBackend.snapshot()方法块设定到OperatorStateManagedFuture中,并注册到snapshotInProgress中等待执行。
          • 如果keyedStateBackend不为空,则将keyedStateBackend.snapshot()方法块设定到KeyedStateManagedFuture中,并注册到snapshotInProgress中等待执行。
          1. 返回创建的snapshotInProgress异步Future对象,snapshotInProgress中封装了当前算子需要执行的所有快照操作。
          public final OperatorSnapshotFutures snapshotState(long checkpointId, 
              long timestamp, 
              CheckpointOptions 
              checkpointOptions,
              CheckpointStreamFactory factory
              ) throws Exception { // 获取KeyGroupRange
             KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                   keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_
                      RANGE;
                // 创建OperatorSnapshotFutures处理对象
             OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
                // 创建snapshotContext上下文对象
             StateSnapshotContextSynchronousImpl snapshotContext = 
             new StateSnapshotContextSynchronousImpl(
                checkpointId,
                timestamp,
                factory,
                keyGroupRange,
                getContainingTask().getCancelables());
             try { snapshotState(snapshotContext);
                // 设定KeyedStateRawFuture和OperatorStateRawFuture
                snapshotInProgress
                .setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
                snapshotInProgress
                .setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
                      // 如果operatorStateBackend不为空,设定OperatorStateManagedFuture
                if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture(
                      operatorStateBackend
                      .snapshot(checkpointId, timestamp, factory, checkpointOptions));
                }
                // 如果keyedStateBackend不为空,设定KeyedStateManagedFuture
                if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture(
                      keyedStateBackend
                      .snapshot(checkpointId, timestamp, factory, checkpointOptions));
                }
             } catch (Exception snapshotException) { // 此处省略部分代码
             }
             return snapshotInProgress;
          }
          

          这里可以看出,原生状态和管理状态的RunnableFuture对象会有所不同

          • RawState主要通过从snapshotContext中获取的RawFuture对象 管理状态的快照操作
          • ManagedState主要通过operatorStateBackend和keyedStateBackend进行状态的管理,并根据StateBackend的不同实现将状态数据写入内存或外部文件系统中。

           

          1.4. 算子状态进行快照

          我们知道所有的状态快照操作都会被封装到OperatorStateManagedFuture对象中,最终通过AsyncCheckpointRunnable线程触发执行。

          下面我们看AsyncCheckpointRunnable线程的定义。如代码所示,AsyncCheckpointRunnable.run()方法主要逻辑如下。

          1. 调用FileSystemSafetyNet.initializeSafetyNetForThread()方法为当前线程初始化文件系统安全网,确保数据能够正常写入。
          2. 创建TaskStateSnapshot实例:

          创建jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates对应的TaskStateSnapshot实例,其中jobManagerTaskOperatorSubtaskStates用于存储和记录发送给JobManager的Checkpoint数据,localTaskOperatorSubtaskStates用于存储TaskExecutor本地的状态数据。

          1. 执行所有状态快照线程操作

          遍历operatorSnapshotsInProgress集合,获取OperatorSnapshotFutures并创建OperatorSnapshotFinalizer实例,用于执行所有状态快照线程操作。在OperatorSnapshotFinalizerz中会调用FutureUtils.runIfNotDoneAndGet()方法执行KeyedState和OperatorState的快照操作。

          1. 从finalizedSnapshots中获取JobManagerOwnedState和TaskLocalState,分别存储在jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates集合中。
          2. 调用checkpointMetrics对象记录Checkpoint执行的时间并汇总到Metric监控系统中。
          3. 如果AsyncCheckpointState为COMPLETED状态,则调用reportCompletedSnapshotStates()方法向JobManager汇报Checkpoint的执行结果。
          4. 如果出现其他异常情况,则调用handleExecutionException()方法进行处理。
          public void run() { FileSystemSafetyNet.initializeSafetyNetForThread();
             try { // 创建TaskStateSnapshot
                TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
                   new TaskStateSnapshot(operatorSnapshotsInProgress.size());
                TaskStateSnapshot localTaskOperatorSubtaskStates =
                   new TaskStateSnapshot(operatorSnapshotsInProgress.size());
                for (Map.Entry entry : 
                     operatorSnapshotsInProgress.entrySet()) { OperatorID operatorID = entry.getKey();
                   OperatorSnapshotFutures snapshotInProgress = entry.getValue();
                   // 创建OperatorSnapshotFinalizer对象
                   OperatorSnapshotFinalizer finalizedSnapshots =
                      new OperatorSnapshotFinalizer(snapshotInProgress);
                   jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                      operatorID,
                      finalizedSnapshots.getJobManagerOwnedState());
                   localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                      operatorID,
                      finalizedSnapshots.getTaskLocalState());
                }
                final long asyncEndNanos = System.nanoTime();
                final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;
                checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
                if (asyncCheckpointState.compareAndSet(
                    CheckpointingOperation.AsyncCheckpointState.RUNNING,
                   CheckpointingOperation.AsyncCheckpointState.COMPLETED)) { reportCompletedSnapshotStates(
                      jobManagerTaskOperatorSubtaskStates,
                      localTaskOperatorSubtaskStates,
                      asyncDurationMillis);
                } else { LOG.debug("{} - asynchronous part of checkpoint {} could not be 
                      completed because it was closed before.",
                      owner.getName(),
                      checkpointMetaData.getCheckpointId());
                }
             } catch (Exception e) { handleExecutionException(e);
             } finally { owner.cancelables.unregisterCloseable(this);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
             }
          }
          

          至此,算子状态数据快照的逻辑基本完成,算子中的托管状态主要借助KeyedStateBackend和OperatorStateBackend管理。

          KeyedStateBackend和OperatorStateBackend都实现了SnapshotStrategy接口,提供了状态快照的方法。SnapshotStrategy根据不同类型存储后端,主要有HeapSnapshotStrategy和RocksDBSnapshotStrategy两种类型。

           

          1.5. 状态数据快照持久化

          这里我们以HeapSnapshotStrategy为例,介绍在StateBackend中对状态数据进行状态快照持久化操作的步骤。如代码所示,

          HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates()方法中定义了对KeyedState以及OperatorState的状态处理逻辑。

          1. 遍历每个StateSnapshotRestore。
          2. 调用StateSnapshotRestore.stateSnapshot()方法,此时会创建StateSnapshot对象。
          3. 将创建的StateSnapshot添加到metaInfoSnapshots和cowStateStableSnapshots集合中,完成堆内存存储类型KvState的快照操作。
          private void processSnapshotMetaInfoForAllStates(
             List metaInfoSnapshots,
             Map cowStateStableSnapshots,
             Map stateNamesToId,
             Map registeredStates,
             StateMetaInfoSnapshot.BackendStateType stateType) { for (Map.Entry kvState :
                  registeredStates.entrySet()) { final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
                stateNamesToId.put(stateUid, stateNamesToId.size());
                StateSnapshotRestore state = kvState.getValue();
                if (null != state) { final StateSnapshot stateSnapshot = state.stateSnapshot();
                   metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
                   cowStateStableSnapshots.put(stateUid, stateSnapshot);
                }
             }
          }
          

           

          二. CheckpointCoordinator管理Checkpoint

          1. Checkpoint执行完毕后的确认过程

          当StreamTask中所有的算子完成状态数据的快照操作后,Task实例会立即将TaskStateSnapshot消息发送到管理节点的CheckpointCoordinator中,并在CheckpointCoordinator中完成后续的操作。如图所示,Checkpoint执行完毕后的确认过程如下。

          1. 调用StreamTask.reportCompletedSnapshotStates

          当StreamTask中的所有算子都完成快照操作后,会调用StreamTask.reportCompletedSnapshotStates()方法将TaskStateSnapshot等Ack消息发送给TaskStateManager。TaskStateManager封装了CheckpointCoordinatorGateway,因此可以直接和CheckpointCoordinator组件进行RPC通信。

          1. 消息传递
          • 将消息传递给CheckpointCoordinatorGateway

            TaskStateManager通过CheckpointResponder.acknowledgeCheckpoint()方法将acknowledgedTaskStateSnapshot消息传递给CheckpointCoordinatorGateway接口实现者,实际上就是JobMasterRPC服务。

          • 消息传递给CheckpointCoordinator

            JobMaster接收到RpcCheckpointResponder返回的Ack消息后,会调用SchedulerNG.acknowledgeCheckpoint()方法将消息传递给调度器。调度器会将Ack消息封装成AcknowledgeCheckpoint,传递给CheckpointCoordinator组件继续处理。

          1. 管理PendingCheckpoint

          当CheckpointCoordinator接收到AcknowledgeCheckpoint后,会从pendingCheckpoints集合中获取对应的PendingCheckpoint,然后判断当前Checkpoint中是否收到AcknowledgedTasks集合所有的Task实例发送的Ack确认消息。

          如果notYetAcknowledgedTasks为空,则调用completePendingCheckpoint()方法完成当前PendingCheckpoint操作,并从pendingCheckpoints集合中移除当前的PendingCheckpoint。

          1. 添加CompletedCheckpoint:

          紧接着,PendingCheckpoint会转换成CompletedCheckpoint,此时CheckpointCoordinator会在completedCheckpointStore集合中添加CompletedCheckpoint。

          1. 通知Checkpoint操作结束。

          CheckpointCoordinator会遍历tasksToCommitTo集合中的ExecutionVertex节点并获取Execution对象,然后通过Execution向TaskManagerGateway发送CheckpointComplete消息,通知所有的Task实例本次Checkpoint操作结束。

          1. 通知同步

          当TaskExecutor接收到CheckpointComplete消息后,会从TaskSlotTable中获取对应的Task实例,向Task实例中发送CheckpointComplete消息。所有实现CheckpointListener监听器的组件或算子都会获取Checkpoint完成的消息,然后完成各自后续的处理操作。

           

          2. 触发并完成Checkpoint操作

          CheckpointCoordinator组件接收到Task实例的Ack消息(快照完成了?)后,会触发并完成Checkpoint操作。如代码PendingCheckpoint.finalizeCheckpoint()方法的具体实现如下。

          1)向sharedStateRegistry中注册operatorStates。
          2)结束pendingCheckpoint中的Checkpoint操作并生成CompletedCheckpoint。
          3)将completedCheckpoint添加到completedCheckpointStore中,
          4)从pendingCheckpoint中移除checkpointId对应的PendingCheckpoint,
          并触发队列中的Checkpoint请求。
          5)向所有的ExecutionVertex节点发送CheckpointComplete消息,
          通知Task实例本次Checkpoint操作完成。
          private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) 
             throws CheckpointException { final long checkpointId = pendingCheckpoint.getCheckpointId();
             final CompletedCheckpoint completedCheckpoint;
             // 首先向sharedStateRegistry中注册operatorStates
             Map operatorStates = 
                pendingCheckpoint.getOperatorStates();
             sharedStateRegistry.registerAll(operatorStates.values());
             // 对pendingCheckpoint中的Checkpoint做结束处理并生成CompletedCheckpoint
             try { try { completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
                   failureManager.handleCheckpointSuccess(pendingCheckpoint.
                      getCheckpointId());
                }
                catch (Exception e1) { // 如果出现异常则中止运行并抛出CheckpointExecution
                   if (!pendingCheckpoint.isDiscarded()) { failPendingCheckpoint(pendingCheckpoint,
                                             CheckpointFailureReason.FINALIZE_CHECKPOINT_
                                                  FAILURE, e1);
                   }
                   throw new CheckpointException("Could not finalize the pending 
                                                 checkpoint " +
                                                 checkpointId + '.',
                                                 CheckpointFailureReason
                                                 .FINALIZE_CHECKPOINT_FAILURE, e1);
                }
                // 当完成finalization后,PendingCheckpoint必须被丢弃
                Preconditions.checkState(pendingCheckpoint.isDiscarded() 
                                         && completedCheckpoint != null);
                // 将completedCheckpoint添加到completedCheckpointStore中
                try { completedCheckpointStore.addCheckpoint(completedCheckpoint);
                } catch (Exception exception) { // 如果completed checkpoint存储出现异常则进行清理
                   executor.execute(new Runnable() { @Override
                      public void run() { try { completedCheckpoint.discardOnFailedStoring();
                         } catch (Throwable t) { LOG.warn("Could not properly discard completed checkpoint {}.",
                                     completedCheckpoint.getCheckpointID(), t);
                         }
                      }
                   });
                   throw new CheckpointException("Could not complete the pending 
                                                 checkpoint " + 
                                                 checkpointId + '.', 
                                                 CheckpointFailureReason.
                                                 FINALIZE_CHECKPOINT_FAILURE, exception);
                }
             } finally { // 最后从pendingCheckpoints中移除checkpointId对应的PendingCheckpoint
                pendingCheckpoints.remove(checkpointId);
                // 触发队列中的Checkpoint请求
                triggerQueuedRequests();
             }
             // 记录checkpointId
             rememberRecentCheckpointId(checkpointId);
             // 清除之前的Checkpoints
             dropSubsumedCheckpoints(checkpointId);
             // 计算和前面Checkpoint操作之间的最低延时
             lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
             LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", 
                      checkpointId, job,
                      completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
             // 通知所有的ExecutionVertex节点Checkpoint操作完成
             final long timestamp = completedCheckpoint.getTimestamp();
             for (ExecutionVertex ev : tasksToCommitTo) { Execution ee = ev.getCurrentExecutionAttempt();
                if (ee != null) { ee.notifyCheckpointComplete(checkpointId, timestamp);
                }
             }
          }
          

           

          3. 通知CheckpointComplete给TaskExecutor

          当TaskExecutor接收到来自CheckpointCoordinator的CheckpointComplete消息后,会调用Task.notifyCheckpointComplete()方法将消息传递到指定的Task实例中。Task线程会将CheckpointComplete消息通知给StreamTask中的算子。

          如下代码,

          /**
          将notifyCheckpointComplete()转换成RunnableWithException线程并提交到Mailbox中运行,且在MailboxExecutor线程模型中获取和执行的优先级是最高的。
          最终notifyCheckpointComplete()方法会在MailboxProcessor中运行。
          **/
          public Future notifyCheckpointCompleteAsync(long checkpointId) { return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(
                () -> notifyCheckpointComplete(checkpointId),
                "checkpoint %d complete", checkpointId);
          }
          

          继续具体看StreamTask.notifyCheckpointComplete(),如下代码:

          1)获取当前Task中算子链的算子,并发送Checkpoint完成的消息。
          2)获取TaskStateManager对象,向其通知Checkpoint完成消息,这里主要调用
          TaskLocalStateStore清理本地无用的Checkpoint数据。
          3)如果当前Checkpoint是同步的Savepoint操作,直接完成并终止当前Task实例,并调用
          resetSynchronousSavepointId()方法将syncSavepointId重置为空。
          private void notifyCheckpointComplete(long checkpointId) { try { boolean success = actionExecutor.call(() -> { if (isRunning) { LOG.debug("Notification of complete checkpoint for task {}", 
                         getName());
                      // 获取当前Task中operatorChain所有的Operator,并通知每个Operator 
                         Checkpoint执行成功的消息
                      for (StreamOperator operator : operatorChain.getAllOperators()) { if (operator != null) { operator.notifyCheckpointComplete(checkpointId);
                         }
                      }
                      return true;
                   } else { LOG.debug("Ignoring notification of complete checkpoint for 
                         not-running task {}", getName());
                      return true;
                   }
                });
                // 获取TaskStateManager,并通知Checkpoint执行完成的消息
                getEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId);
                // 如果是同步的Savepoint操作,则直接完成当前Task
                if (success && isSynchronousSavepointId(checkpointId)) { finishTask();
                   // Reset to "notify" the internal synchronous savepoint mailbox loop.
                   resetSynchronousSavepointId();
                }
             } catch (Exception e) { handleException(new RuntimeException("Error while confirming checkpoint", e));
             }
          }
          

          算子接收到Checkpoint完成消息后,会根据自身需要进行后续的处理,默认在AbstractStreamOperator基本实现类中会通知keyedStateBackend进行后续操作。

          对于AbstractUdfStreamOperator实例,会判断当前userFunction是否实现了CheckpointListener,如果实现了,则向UserFucntion通知Checkpoint执行完成的信息

          例如在FlinkKafkaConsumerBase中会通过获取到的Checkpoint完成信息,将Offset提交至Kafka集群,确保消费的数据已经完成处理,详细实现可以参考FlinkKafkaConsumerBase.notifyCheckpointComplete()方法。

          public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId);
             if (userFunction instanceof CheckpointListener) { ((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);
             }
          }
          

           

          三. 状态管理学习小结

          通过学习状态管理的源码,我们可以再来思考下如下几个场景问题,是不是有一点“庖丁解牛”的意思!

          flink中状态存在的意义是什么,涉及到哪些场景。

          1. 实时聚合:比如,计算过去一小时内的平均销售额。这时,你会需要使用到Flink的状态来存储过去一小时内的所有销售数据。
          2. 窗口操作:Flink SQL支持滚动窗口、滑动窗口、会话窗口等。这些窗口操作都需要Flink的状态来存储在窗口期限内的数据。
          3. 状态的持久化与任务恢复:实时任务挂掉之后,为了快速从上一个点恢复任务,可以使用savepoint和checkpoint。
          4. 多流join:Flink至少存储一个流中的数据,以便于在新的记录到来时进行匹配。

           

          其次通过学习Flink状态管理相关源码,可以进一步了解状态管理的细节操作,为解决更加复杂的问题打下理论基础

          1. 深入理解任务运行过程中,各算子状态的流转机制;
          2. 快速定位问题:在遇到实际问题时,能够快速反应出是哪块逻辑出现了问题;
          3. 应对故障:状态管理和Flink容错机制相关,可以了解Flink发生故障时如何保证状态的一致性和可恢复性
          4. 二次开发:可以自定义状态后端,或者拓展优化已有的例如RocksDB状态后端等;
          5. 性能优化:了解了Flink是如何有效的处理和管理状态,就可以优化任务性能,减少资源消耗。

           

          参考:《Flink设计与实现:核心原理与源码解析》–张利兵