(TODO)
Window Join
窗口Join的语义:对两个流中落在同一窗口内的、具有相同键的元素计算笛卡尔积。
使用方式:
1 2 3 4 5
| stream.join(otherStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(<WindowAssigner>) .apply(<JoinFunction|FlatJoinFunction>)
|
注意事项:
- 窗口Join表达的语义是内连接的语义,即,只有两端Join上的元素才能被应用到
apply()
函数;
滚动窗口
滚动窗口的特点:
- 所有窗口的大小都一致,
size
参数控制;
- 窗口之间在时间上没有重叠;
1 2 3 4 5 6 7 8 9 10 11 12 13
| import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ... val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream) .where(elem => ) .equalTo(elem => ) .window(TumblingEventTimeWindows.of(Time.milliseconds(2))) .apply { (e1, e2) => e1 + "," + e2 }
|
滑动窗口
滑动窗口的特点:
- 所有窗口的大小都一致,
size
参数控制;
- 窗口之间可以通过
slide
调整窗口滑动间隔;
1 2 3 4 5 6 7 8 9 10 11 12 13
| import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ... val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream) .where(elem => ) .equalTo(elem => ) .window(SlidingEventTimeWindows.of(Time.milliseconds(2) , Time.milliseconds(1) )) .apply { (e1, e2) => e1 + "," + e2 }
|
会话窗口
会话窗口的特点:
- 窗口的大小不固定;
- 固定时间内如果没有事件则创建新窗口;
1 2 3 4 5 6 7 8 9 10 11 12 13
| import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; ...
val orangeStream: DataStream[Integer] = ... val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream) .where(elem => ) .equalTo(elem => ) .window(EventTimeSessionWindows.withGap(Time.milliseconds(1))) .apply { (e1, e2) => e1 + "," + e2 }
|
Interval Join
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ... val greenStream: DataStream[Integer] = ...
orangeStream .keyBy(elem => ) .intervalJoin(greenStream.keyBy(elem => )) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process(new ProcessJoinFunction[Integer, Integer, String] { override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = { out.collect(left + "," + right); } }); });
|
Window CoGroup
语义:在窗口内,将两个流中的带有相同键的元素分组到一起。
注意事项:
- 从语义上,它与窗口Join在语义上的区别是:coGroup() 强调对原始元素分组到一起,而 join() 则是计算两端元素的笛卡尔积;
apply()
函数需要提供一个实现了接口CoGroupFunction<IN1, IN2, O>
的类实例,该接口只定义了一个方法void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
,其中first
表示窗口内第一个流中所有拥有相同健的元素的Iterable
,second
表示窗口内第二个流中所有拥有相同健的元素的Iterable
。
使用方式:
1 2 3 4
| dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...});
|
参考
- https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/joining/
- https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2019-how-to-join-two-data-streams-piotr-nowojski
- https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/overview/#window-join