Flink入门系列3:状态管理

本篇是对官网 Stateful Stream ProcessingFault_ToleranceLearn Flink 三篇文章中有关状态管理的内容进行的再整理。

什么是状态

很多数据流中的运算每次看到的只是一个单独的事件(例如,事件parser),但有些运算会记录跨多个事件的信息(例如,窗口算子)。这样的运算被称为是有状态的

有状态的运算的几个示例:

  • 当应用程序查找某种事件模式的时候,状态保存的是所有到目前为止所遇到的事件序列;
  • 当每分钟/小时/天进行聚合的时候,状态保存的是中间聚合状态;
  • 当在一串数据点上训练机器学习模型时,状态保存的是当前版本的模型参数;
  • 当需要管理历史数据的时候,状态允许对过去发生的事件进行高效的访问;

Flink需要感知到状态,以便利用检查点(checkpoint)保存点(savepoint)使其能在发生故障的时候得到恢复。对状态的认识也允许对Flink应用进行重新伸缩规划,这意味着Flink负责将状态分发给并行实例。可查询的状态允许你在Flink应用运行的时候,从Flink之外对其进行访问。在使用状态当中,如果要使用状态,推荐阅读Flink的状态后端(State Backend)。Flink提供了不同的状态后端,状态后端决定着状态存储在哪以及是如何存储的。

Keyed State

Keyed状态被保存在一个内嵌的键/值数据库中。这种状态严格与有状态的算子所读取的流被一起分区、分发。因此,只有在keyed流上才可以访问到键/值状态(即,在keyed/partitioned数据交换之后),并且只能访问到与当前事件的key相关联的值。对流的键和状态进行对齐也保证了所有的状态更新都是本地运算,这样不需要事务开销就保证了一致性。对齐也让Flink透明地对状态重新分发以及调整流的分区。

Keyed State被进一步组织到Key Groups(如上图,ABY构成一个Key Groups)。Key Groups是Flink分发Keyed State的原子单元;Key Groups与定义的最大并行度是一致的。执行的时候,keyed算子的每个并行实例会操作来自一个或多个Key Gruops中键。

状态的持久化与容错

Flink利用流重放状态快照实现容错。

状态的持久化是通过快照实现的:

  • 快照(Snapshot):是对全局一致性的Flink作业状态镜像的通用术语。一个快照包括:每个数据源中的指针位置(例如,文件或Kafka分区中的偏移量)、以及每个有状态的算子通过处理这些位置之前的事件所构造出来的状态的一份状态拷贝。通俗地讲就是:每个数据源的指针位置 + 所有算子状态的一份拷贝
  • 检查点(Checkpoint):是由 Flink 自动执行的快照,其目的是能够从故障中恢复。检查点可以是增量的,它是为了快速恢复所进行的优化。
  • 外部检查点(Externalized Checkpoint):通常 checkpoints 是不需要由用户干预的。Flink 只保留作业运行时最近的 n 个 checkpoints(n 可配置),并在作业被取消时将它们删除。但你可以配置将它们保留下来,这样,你可以手动从它们进行恢复。
  • 保存点(Savepoint):出于某种运维的目的(例如有状态的重新部署/升级/缩放操作),由用户手动(或 API 调用)触发的快照。Savepoints 始终是完整的,并且针对操作灵活性进行了优化。保存点技术是依赖检查点的技术实现的,使用了检查点的程序都能从保存点恢复执行。保存点类似于检查点,不同的是保存点是人为触发的,并且当有新的检查点完成后,保存点是不会自动过期的。

状态快照如何工作

Flink容错机制的核心是获取分布式数据流和算子状态的一致性快照。获取这些快照的机制在“分布式数据流的轻量异步快照”中有描述。该论文受标准的Chandy-Lamport算法的启发,对Flink执行模型做了定制处理。

记住,checkpoint过程是异步的。分界线不会进行锁定,运算可以异步对它们的状态进行快照。

分界线(Barrier)

Flink分布式快照技术中的关键元素是流的分界线。这些分界线被插入到数据流中,作为数据流的一部分与记录一起流动。分界线不会反超过记录,它们严格按序流动。分界线将数据流中记录分隔成两组记录:一组记录属于当前快照,一组记录属于下一个快照。每个分隔线都携带了它所对应快照的ID。属于不同快照的分界线可以同时出现在流中,这意味着不同的快照可以同时发生。

对算子状态进行快照(Snapshot)

检查点n所包含的状态是这样的:状态是每个算子由消费分界线n前面所有事件所造成的。它强调全局性,它包含作业图中每个有状态算子的状态,而不是其中部分算子的状态;同时它强调一致性,算子状态都是由各个算子分界线n前面的事件所造成的,不会包含分界线n之后事件所造成的状态。

当TaskManager收到checkpoint协调器(它是JobManager中的一部分)的命令要开始快照的时候,它会让所有source记录下它们的偏移量、并让source向它们下游流中插入带有编号的分界线,并且这个偏移量会被发送给checkpoint协调器。

这些分界线接着会流向下游,当作业图(job graph)中的某个中间算子从它的所有输入流中都接收到了对于快照n的分界线,它会记录下其状态,并对此刻的状态进行快照,然后向checkpoint协调器对快照n进行确认,最后再向它的输出流发送快照n的分界线。一旦sink算子(流式DAG的终点)从它的所有输入流中都接收到了对于快照n的分界线,它同样会向checkpoint协调器对快照n进行确认。当所有sink都确认了一个快照之后,就认为完成了这次快照。

一旦快照n完成,作业就再也不会向source索要快照偏移量前面的记录了;因为此时,这些记录(以及由它们转换而来的派生记录)已经通过了整个数据流拓扑。

结果快照包括了:

  • 对于每一个并行流的source,快照开始时的偏移量/位置;
  • 对于每一个算子,指向作为快照的一部分被存储的状态的指针;

分界线对齐

有些算子接收多个输入流,这样的算子需要按快照分界线对输入流进行对齐。如下图显示的:

  • 当算子从其中一个输入流接收到了快照n的分界线,它将不能再往下处理该输入流的记录了,直到它从其它数据流中都收到了快照n的分界线。否则,会混合属于快照n和快照n+1的记录。
  • 报告了快照n分界线的输入流会被临时搁置。这些输入流中的记录不会被继续处理,但会被置于输入缓存中。如下左图,数值流已经报告了元素1前面的分界线,但英文字符流还没有报告相应的分界线,所以数值流中的元素123依次被缓存到了算子的输入缓存中。
  • 一旦从最后一个输入流收到了分界线n,该算子会发送出所有等待输出的记录(注意,这部分输出是前面输入流中属于快照n的记录所产生的,而不是前面输入流中被缓存下来的数据所产生的),然后也会发送快照n本身的分界线。如下中图,两个流中元素1和元素e前的分界线已经对齐了。如下右图,算子向下游发出两个流中元素1和元素e前的分界线。
  • 在这之后,该算子恢复处理来自所有输入流的记录,优先处理输入缓冲中的记录,再处理输入流中的记录。

状态后端(State Backend)

由 Flink 管理的 Keyed State 是一种分片的键/值存储,Keyed state中的每个状态项(每个key都有一个状态项) 的运行时副本都保存在负责该键的 TaskManager 本地中。另外,Operator state 也保存在机器节点本地。Flink 定期获取所有状态的快照,并将这些快照复制到更具持久性的位置,例如分布式文件系统。

如果发生故障,Flink 可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。

Flink 所管理的状态存储在状态后端 中。键/值状态所存储的实际数据结构取决于所选择的状态后端。Flink 有两种 状态后端的实现:a) 一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,b) 另一种基于堆的状态后端,将其工作状态保存在 Java 的堆内存中。这种基于堆的状态后端有两种类型:FsStateBackend,将其状态快照持久化到分布式文件系统;MemoryStateBackend,它使用 JobManager 的堆保存状态快照。

当使用基于堆的状态后端保存状态时,访问和更新是在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新会涉及到序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。

除了定义保存状态的数据结构,状态后端也实现了对任何时间点的键/值状态进行快照、以及将快照作为检查点进行保存的逻辑。所有这些状态后端都能够异步执行快照,这意味着它们可以在不妨碍正在进行的流处理的情况下执行快照。

名称 运行时状态(Working State)的位置 状态备份(State Backup)的位置 快照
RocksDBStateBackend 本地磁盘(tmp dir) 分布式文件系统 全量 / 增量
• 支持大于内存大小的状态
• 经验法则:比基于堆的后端慢10倍
FsStateBackend JVM Heap 分布式文件系统 全量
• 快速,需要大的堆内存
• 受限制于 GC
MemoryStateBackend JVM Heap JobManager JVM Heap 全量
适用于小状态(本地)的测试和实验

恢复

这种机制下,进行恢复是很直接的:一旦发生故障,Flink选择最近一次完成的检查点k。然后,系统重新部署整个分布式数据流,将保存在检查点k的状态相应地分配给各个算子。source被置为从位置 Sk处读取。例如,在Kafka中,这意味着让消费者从偏移量Sk处开始拉取数据。

如果状态是增量式进行快照的,那么算子会从最近一次完整快照开始,然后使用一系列的增量快照来更新状态。

容错语义

当流处理应用程序发生错误的时候,结果可能会产生丢失或者重复。Flink 根据你为应用程序和集群的配置,可以产生以下结果:

  • Flink 不从快照中进行恢复(at most once
  • 没有任何丢失,但你可能会得到重复冗余的结果(at least once
  • 不丢失,也不冗余重复(exactly once

Flink 通过回退和重放 source 数据流从故障中恢复,理想情况是精确一次的语言,但这并意味着每个事件都是被精确一次处理。而指定是,每一个事件都会影响 Flink 所管理的状态精确一次

只有在需要提供精确一次的语义保证时才需要进行分界线对齐。如果不需要这种语义,可以通过配置 CheckpointingMode.AT_LEAST_ONCE 关闭分界线对齐来提高性能。

对齐的步骤可能会对streaming程序造成延迟。一般,这种额外的延迟近似有几毫秒,但是我们也观察到了会有延迟明显异常的情况。对于那些要求对所有数据都始终保持超低延迟的应用而言,Flink有一个开关可以控制在检查点当中跳过对流进行对齐。当一个算子观察到了来自每个输入流的检查点边界线的时候,还是会对获取检查点快照。

如果跳过对齐,即使有检查点n的边界线到达了算子,该算子仍会继续处理对应输入流的输入。这样的话,在检查点n的状态快照被获取之前,该算子同时处理了属于检查点n+1的数据元素。在恢复的时候,因为这些数据也被包含在了检查点n的状态快照中,当作为检查点n之后的数据的一部分被重放时,这些记录就发生了重复。

注意 对齐只发生在拥有多个前置数据项的算子(例如,join)、以及有多个发送方(例如,在流重分区/shuffle之后)的算子上。因此,对于只使用的是高度并行化的流运算(map()、flatMap()、filter()、…)的数据流,即使在至少一次的模型下也可以保证精确一次

端到端精确一次

为了实现端到端的精确一次,以便 sources 中的每个事件都仅精确一次对 sinks 产生影响,必须满足以下条件:

  1. sources 必须是可重放的,并且;
  2. sinks 必须是事务性的或幂等的;

批处理程序中的状态和容错

Flink将批处理程序作为流式程序的特例来执行,但流是有界的,即有限数量的元素。Dataset被内部当作流数据处理。因此,上面的概念就像它们适用于流式程序一样同样适用于批处理程序,几点例外:

  • 批处理程序的容错不使用检查点。通过完全对流进行重放进行恢复。这是可能的,因为输入是有界的。这将开销推到了恢复逻辑当中,而使正常的处理变得相对简单,因为避免了检查点。
  • DataSet API中的状态操作使用了简化了的内存/核外数据结构,而不是键/值索引。
  • DataSet API引入了特殊的同步(基于超集的)迭代,它们只适用于有界的流上。详细内容,查看迭代的文档

参考

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/concepts/stateful-stream-processing.html
  2. https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/#stateful-stream-processing
  3. https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/learn-flink/#%E6%9C%89%E7%8A%B6%E6%80%81%E6%B5%81%E5%A4%84%E7%90%86
  4. https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/learn-flink/fault_tolerance.html
  5. https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/fault_tolerance.html

延伸阅读

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html