Flink Joins(Stream API篇)

(TODO)

Window Join

窗口Join的语义:对两个流中落在同一窗口内的、具有相同键的元素计算笛卡尔积。

使用方式:

1
2
3
4
5
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction|FlatJoinFunction>)

注意事项:

  • 窗口Join表达的语义是内连接的语义,即,只有两端Join上的元素才能被应用到apply()函数;

滚动窗口

滚动窗口的特点:

  • 所有窗口的大小都一致,size参数控制;
  • 窗口之间在时间上没有重叠;
1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
.where(elem => /* select key */)
.equalTo(elem => /* select key */)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply { (e1, e2) => e1 + "," + e2 }

滑动窗口

滑动窗口的特点:

  • 所有窗口的大小都一致,size参数控制;
  • 窗口之间可以通过slide调整窗口滑动间隔;
1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
.where(elem => /* select key */)
.equalTo(elem => /* select key */)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.apply { (e1, e2) => e1 + "," + e2 }

会话窗口

会话窗口的特点:

  • 窗口的大小不固定;
  • 固定时间内如果没有事件则创建新窗口;
1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
.where(elem => /* select key */)
.equalTo(elem => /* select key */)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply { (e1, e2) => e1 + "," + e2 }

Interval Join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream
.keyBy(elem => /* select key */)
.intervalJoin(greenStream.keyBy(elem => /* select key */))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process(new ProcessJoinFunction[Integer, Integer, String] {
override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
out.collect(left + "," + right);
}
});
});

Window CoGroup

语义:在窗口内,将两个流中的带有相同键的元素分组到一起。

注意事项:

  • 从语义上,它与窗口Join在语义上的区别是:coGroup() 强调对原始元素分组到一起,而 join() 则是计算两端元素的笛卡尔积;
  • apply()函数需要提供一个实现了接口CoGroupFunction<IN1, IN2, O>的类实例,该接口只定义了一个方法void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;,其中first表示窗口内第一个流中所有拥有相同健的元素的Iterablesecond表示窗口内第二个流中所有拥有相同健的元素的Iterable

使用方式:

1
2
3
4
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});

参考

  1. https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/joining/
  2. https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2019-how-to-join-two-data-streams-piotr-nowojski
  3. https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/overview/#window-join