Flink:学习Flink
本文是对Flink官网文档中《Learn Flink》的学习笔记。对官方相应文章的章节进行了重新组织与调整,以使大纲更具结构性。
概述
略
参考
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/learn-flink/
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/
DataFrame API介绍
示例
1 | import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
流式程序执行原理
每个 Flink 应用都需要有执行环境,在该示例中为 env
。流式应用需要用到 StreamExecutionEnvironment
。
DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment
。当调用 env.execute()
时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。
注意,如果没有调用 execute(),应用就不会运行。
参考
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/learn-flink/datastream_api.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/datastream_api.html
数据管道&ETL
无状态的转换
我们注意到每种转换操作都需要定义相应的算子,如filter -> NYCFilter
、map ->Enrichment
、map -> NYCEnrichment
。
- map()
1 | DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...)); |
- flatmap()
1 | DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...)); |
有状态的转换
Keyed Streams
将一个流根据其中的一些属性来进行分区是十分有用的,这样我们可以使所有具有相同属性的事件分到相同的组里。例如,如果你想找到从每个网格单元出发的最远的出租车行程。按 SQL 查询的方式来考虑,这意味着要对 startCell
进行 GROUP BY 再排序,在 Flink 中这部分可以用 keyBy(KeySelector)
实现。
1 | rides |
隐式状态
keyBy()
后进行的聚合函数都作为隐式状态,如maxBy()
、reduce()
等函数。关于如何使用状态,参看文档:使用状态。
1 | minutesByStartCell |
Keyed State
1 | public static void main(String[] args) throws Exception { |
Connected Streams
略
参考
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/etl.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/learn-flink/etl.html
流式分析
事件时间和水印
代码示例
如果想要使用事件时间,需要给 Flink 提供一个时间戳提取器和 Watermark 生成器,Flink 将使用它们来跟踪事件时间的进度:
1 | DataStream<Event> stream = ... |
概念解析
事件时间
水印
该节主要说了,水印的由来与作用,大体归结为:
- 考虑到输出结果的准确性,需要缓存一段时间的数据,缓存自然就带来了延迟输出的问题;
- 需要一种策略来决定什么时候不再等具有某一时间戳的事件了,从而为该时间戳输出结果,这便是水印的作用。
Watermark(t)表示的是时间戳 t 的水印,它表示该水印出现以后,系统就不再等待事件时间小于或等于t的事件了。例如,当时间戳t=2或更大时间戳的水印出现后,事件流的排序器应停止等待,并输出 2 作为已经排序好的流。
窗口
代码范式
用 Flink 计算窗口分析取决于两个主要的抽象操作:
- Window Assigners,将事件分配给窗口(根据需要创建新的窗口对象);
- Window Functions,处理窗口内的数据;
1 | // 键控流 |
概念解析
Flink 的窗口 API 还具有 Triggers 和 Evictors 的概念,Triggers 确定何时调用窗口函数,而 Evictors 则可以删除在窗口中收集的元素。
窗口分配器
上图描绘窗口可以分为以下三类:
- 基于时间的窗口分配器:会话窗口属于该类;该类型窗口既可以处理
事件时间
,也可以处理处理时间
。 - 基于计数的窗口分配器:只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算;
- 全局窗口分配器:将每个事件(相同的 key)分配给某一个指定的全局窗口。这只用于在你使用自定义触发器实现自定义窗口的场景。
窗口函数
处理窗口中的事件有3种基本的方式:
- 作为一个批次,窗口中内容以
Iterable
的形式传递给ProcessWindowFunction
; - 增量式地,当有事件被分配到窗口时,就会调用
ReduceFunction
或者AggregateFunction
; - 两者结合的方式,将
ReduceFunction
或者AggregateFunction
预聚合的增量计算结果在触发窗口时, 提供给ProcessWindowFunction
做全量计算。
ProcessWindowFunction 示例
1 | DataStream<SensorReading> input = ... |
这个例子中有一些值得关注的地方:
- 所有分配给窗口的事件都要被缓存在以键划分的Flink状态中,直到触发窗口为止。这个操作可能是相当昂贵的。
- Flink 会传递给
ProcessWindowFunction
一个Context
对象,这个对象内包含了一些窗口信息。
增量聚合示例
1 | DataStream<SensorReading> input = ... |
请注意 Iterable<SensorReading>
将只包含一个读数 – MyReducingMax
计算出的预先汇总的最大值。
延迟
延迟是相对于 watermarks 定义的。Watermark(t)
表示事件流的时间已经到达了 t; watermark 之后的时间戳 ≤ t 的任何事件都被称之为延迟事件。
代码示例
默认,当使用事件时间窗口时,晚到的事件会被丢掉。Flink 给了我们两个选择去控制这些事件。
旁侧输出
1 | OutputTag<Event> lateTag = new OutputTag<Event>("late"){}; |
允许内的延迟
你还可以指定 允许的延迟(allowed lateness) 间隔,在这个间隔时间内,延迟的事件还是会继续分配给窗口(该窗口的状态也同样被保留着),默认状态下,每个晚到事件都会导致窗口函数被再次调用(有时也称之为 late firing )。
默认,允许的延迟为 0。换句话说,watermark 之后的元素将被丢弃(或发送到侧输出流)。
1 | stream. |
延迟 VS 正确性
watermarks 给了开发者流处理的一种选择,它们使开发人员在开发应用程序时可以控制数据延迟和数据完整性之间的权衡。与批处理不同,批处理中的奢侈之处在于可以在产生任何结果之前完全了解输入,而使用流式传输,我们不被允许等待所有的事件都产生了,才输出排序好的数据,这与流相违背。
异想不到的事情
滑动窗口是通过复制来实现的
滑动窗口的分配器会创建很多的窗口对象,并将每个事件复制到每个相关的窗口中。例如,如果你每隔 15 分钟就有 24 小时的滑动窗口,那么每个事件会被复制到 4 * 24 = 96 个窗口中。
时间窗口会和时间对齐
仅仅因为你使用的是一小时长度的处理时间窗口、并且是在 12:05 开始运行的应用程序,但这并不意味着第一个窗口将在 1:05 关闭。第一个窗口将长 55 分钟,并在 1:00 关闭。
请注意,滑动窗口和滚动窗口分配器所采用的 offset 参数可用于改变窗口的对齐方式。有关详细的信息,请参见 滚动窗口 和 滑动窗口 。
window 后面可以接 window
1 | stream |
空的时间窗口不会输出结果
窗口只有当有事件分配给它们时才会被创建。所以,如果在特定的时间范围内没有事件,就不会有窗口,也就不会有输出结果。
延迟事件可能会促使合并
会话窗口的实现是基于窗口的一个抽象能力,窗口可以 合并。每个元素一开始都会被分配给一个新窗口,然而如果窗口之间的间隔足够小,窗口就会被合并。这样的话,延迟事件可以填补上分隔之前两个会话的这段间隔,从而促成一次合并(从而使得结果更加准确)。
参考
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/learn-flink/streaming_analytics.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/streaming_analytics.html
事件驱动的应用
笔者真的不太理解为什么Flink要将这一篇的标题定义为《事件驱动的应用》。Flink官方文档总给人一种很乱的感觉。这一篇是对于上一篇《流式分析》的补充。
Process函数
ProcessFunction
将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction
十分相似, 但是增加了 Timer。
代码示例
以下代码采用了 TumblingEventTimeWindow
来计算每个小时内每个司机的小费总和:
1 | // 计算每个司机每小时的小费总和 |
使用 KeyedProcessFunction
去实现相同的操作更加直接且更有学习意义。 让我们开始用以下代码替换上面的代码:
1 | // 计算每个司机每小时的小费总和 |
注意事项:
- 有几种类型的 ProcessFunctions – 该例子中的是
KeyedProcessFunction
,还有CoProcessFunctions
、BroadcastProcessFunctions
等。 KeyedProcessFunction
是一种RichFunction
。作为RichFunction
,它可以访问使用 Managed Keyed State 所需的open
和getRuntimeContext
方法。- 有两个回调方法须要实现:
processElement
和onTimer
。每个输入事件都会调用processElement
方法; 当计时器触发时调用onTimer
。它们可以是基于事件时间的计时器,也可以是基于处理时间的计时器。 除此之外,processElement
和onTimer
都提供了一个上下文对象,该对象可用于与TimerService
交互。 这两个回调还传递了一个可用于发出结果的Collector
。
open()方法
1 | // 每个窗口都持有托管的 Keyed state 的入口,并且根据窗口的结束时间执行 keyed 策略。 |
由于票价事件(fare-event)可能会乱序到达,有时需要在计算输出前一个小时结果前,处理下一个小时的事件。 这样能够保证“乱序造成的延迟数据”得到正确处理(放到前一个小时中)。 实际上,如果 Watermark 延迟比窗口长度长得多,则可能有多个窗口同时打开,而不仅仅是两个。 此实现通过使用 MapState
来支持处理这一点,该 MapState
将每个窗口的结束时间戳映射到该窗口的小费总和。
processElement() 方法
1 | public void processElement( |
需要考虑的事项:
- 延迟的事件怎么处理?watermark 后面的事件(即延迟的)正在被删除。 如果你想做一些比这更高级的操作,可以考虑使用旁路输出(Side outputs),这将在下一节中解释。
- 本例使用一个
MapState
,其中 keys 是时间戳(timestamp),并为同一时间戳设置一个 Timer。 这是一种常见的模式;它使得在 Timer 触发时查找相关信息变得简单高效。
onTimer() 方法
1 | public void onTimer( |
注意:
- 传递给
onTimer
的OnTimerContext context
可用于确定当前 key。 - 我们的 pseudo-windows 在当前 Watermark 到达每小时结束时被触发,此时
onTimer
被调用。 这个onTimer
方法从sumOfTips
中删除相关的条目,这样做的后果是不可能照顾到延迟的事件。 这相当于在使用 Flink 的时间窗口时将 allowedLateness 设置为零。
性能考虑
Flink 提供了为 RocksDB 优化的 MapState
和 ListState
类型。 相对于 ValueState
,更建议使用 MapState
和 ListState
,因为使用 RocksDBStateBackend 的情况下, MapState
和 ListState
比 ValueState
性能更好。 RocksDBStateBackend 可以附加到 ListState
,而无需进行(反)序列化, 对于 MapState
,每个 key/value 都是一个单独的 RocksDB 对象,因此可以有效地访问和更新 MapState
。
旁路输出
有几个很好的理由希望从 Flink 算子输出多个流,比如对下面情况给出报告:
- 异常情况(exceptions)
- 格式错误的事件(malformed events)
- 延迟的事件(late events)
- operator 告警(operational alerts),如与外部服务的连接超时
旁路输出(Side outputs)是一种方便的方法。除了错误报告之外,旁路输出也是实现流的 n 路分割的好方法。
示例
现在你可以对上一节中忽略的延迟事件执行某些操作。
Side output channel 与 OutputTag<T>
相关联。这些标记拥有自己的名称,并与对应 DataStream 类型一致。
1 | private static final OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {}; |
上面显示的是一个静态 OutputTag<TaxiFare>
,当在 PseudoWindow
的 processElement
方法中发出延迟事件时,可以引用它:
1 | if (eventTime <= timerService.currentWatermark()) { |
以及当在作业的 main
中从该旁路输出访问流时:
1 | // 计算每个司机每小时的小费总和 |
或者,可以使用两个同名的 OutputTag 来引用同一个旁路输出,但如果这样做,它们必须具有相同的类型。
结束语
在本例中,你已经了解了如何使用 ProcessFunction
重新实现一个简单的时间窗口。 当然,如果 Flink 内置的窗口 API 能够满足你的开发需求,那么一定要优先使用它。但如果你发现自己在考虑用 Flink 的窗口做些错综复杂的事情,不要害怕自己动手。
ProcessFunctions
的另一个常见用例是清理过时 State。
参考
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/learn-flink/event_driven.html
通过状态快照实现容错处理
State Backends
Flink 管理的状态存储在 state backend 中。Flink 有两种 state backend 的实现 – 一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,另一种基于堆的 state backend,将其工作状态保存在 Java 的堆内存中。这种基于堆的 state backend 有两种类型:FsStateBackend,将其状态快照持久化到分布式文件系统;MemoryStateBackend,它使用 JobManager 的堆保存状态快照。
当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend
中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend
能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。
所有这些 state backends 都能够异步执行快照,这意味着它们可以在不妨碍正在进行的流处理的情况下执行快照。
名称 | Working State | 状态备份 | 快照 |
---|---|---|---|
RocksDBStateBackend | 本地磁盘(tmp dir) | 分布式文件系统 | 全量 / 增量 |
• 支持大于内存大小的状态 • 经验法则:比基于堆的后端慢10倍 |
|||
FsStateBackend | JVM Heap | 分布式文件系统 | 全量 |
• 快速,需要大的堆内存 • 受限制于 GC |
|||
MemoryStateBackend | JVM Heap | JobManager JVM Heap | 全量 |
适用于小状态(本地)的测试和实验 |
State Snapshots
- 快照:指的是全局、一致性的Flink作业状态镜像的通用术语。一个快照包括:到每个数据源的指针(例如,文件或Kafka分区中的偏移量)、以及每个作业的有状态运算符的状态副本
- 检查点:是由 Flink 自动执行的快照,其目的是能够从故障中恢复。检查点可以是增量的,并为快速恢复进行了优化。
- 外部化的 Checkpoint:通常 checkpoints 不会被用户操纵。Flink 只保留作业运行时的最近的 n 个 checkpoints(n 可配置),并在作业取消时删除它们。但你可以将它们配置为保留,在这种情况下,你可以手动从中恢复。
- Savepoint:用户出于某种操作目的(例如有状态的重新部署/升级/缩放操作)手动(或 API 调用)触发的快照。Savepoints 始终是完整的,并且已针对操作灵活性进行了优化。