Flink关联维度数据

(TODO)

Driver预加载维度数据

  • 实现方式:通过Distributed Cache分发本地维度文件到Task Manager后加载到内存关联。
  • 优点:略
  • 缺点:仅支持小数据量的维度,更新的维度数据需要重启作业才能被使用
  • 适用场景:维度数据时文件形式、数据量小、更新频率极低

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
env.registerCachedFile("file:///{路径}/product.txt", "product.txt")

// 订单流(pid, 时间戳)
val orderStream = ...

val resultStream = orderStream.flatMap(new RichFlatMapFunction[(Int, Long), (Int, String, Long)] {
var dim: Map[Int, String] = Map()

override def open(parameters: Configuration): Unit = {
val dimFile = getRuntimeContext.getDistributedCache.getFile("product.txt")
val dimIter = FileUtils.readLines(dimFile, "utf8").iterator()
while(dimIter.hasNext) {
val dimIterm = dimIter.next()
val dimArray = dimIterm.split(",")
dim += dimArray(0).toInt -> dimArray(1)
}
}

override def flatMap(in: (Int, Long), out: Collector[(Int, String, Long)]): Unit = {
if(dim.contains(in._1)) {
out.collect((in._1, dim.get(in._1).get, in._2))
} else {
out.collect((in._1, "N/A", in._2))
}
}
})

算子预加载维度数据

  • 实现方式:自定义类并继承 RichFlatMapFunction,在open()方法中读取维度数据,将其全量加载到flatMap算子的内存。并且通过创建线程定时读取维度数据的方式,可以实现周期性更新内存中维度数据的功能。
  • 优点:实现简单
  • 缺点:仅支持小数据量的维度
  • 适用场景:维度小且变更频率低,对变更及时性要求低

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class DimFlatMapFunction extends RichFlatMapFunction[(Int, String), (Int, String, String)] {
val LOG = LoggerFactory.getLogger(classOf[DimFlatMapFunction])

var dim: Map[Int, String] = Map()

var conn: Connection = _
var stat: Statement = _

override def open(parameters: Configuration): Unit = {
// initialize the connection
conn = ... ...

val sql = "select pid, pname from dim_product"

try {
stat = conn.createStatement()

val resultSet = stat.executeQuery(sql)
while (resultSet.next()) {
val pId = resultSet.getInt("pid")
val pName = resultSet.getString("pname")

dim += pId -> pName
}
} catch {
case e: Exception => LOG.error(null, e)
}

// close stat and conn
... ...
}

override def flatMap(in: (Int, String), collector: Collector[(Int, String, String)]): Unit = {
val pidInProbe = in._1
if(dim.contains(pidInProbe)) { // implement as Inner-Join
collector.collect((in._1, in._2, dim.get(pidInProbe).get))
}
}
}

关联热存储上的维度数据

  • 实现方式:实时流与热存储上维度数据进行关联,并且在实时流端使用缓存技术来减轻对热存储的访问压力。具体的,将维度数据导入热存储(Redis/Tair/HBase/ES)通过异步IO查询热存储,实时流端使用缓存技术将维度数据缓存在内存
  • 优点:维度数据不受限Task Manager所分配的内存,能支持更多的维度数据
  • 缺点:需要热存储资源,更新的维度反馈到结果有延迟(热存储导入以及有缓存带来的延迟)
  • 适用场景:维度数据量较大,为接受维度更新有一定的延迟

广播维度数据

  • 实现方式:将维度数据发送到Kafka作为要被广播的流,利用Broadcast State将维度数据流广播到下游Task进行Join;通过自定义Source的方式来实现维度数据流,也可以实现对多样的维度数据进行广播的效果
  • 优点:维度的变更可以即时更新到结果
  • 缺点:数据保存在内存中,支持的维度数据量较小
  • 适用场景:需要实时感知维度变更,维度数据可转换为实时流

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 订单流:(pid, 时间戳)
val orderStream = ... ...
// 产品流:(pid, pname)
val dimStream = ... ...

// 1 定义广播状态描述符
val dimStateDesc = new MapStateDescriptor[Int, String]("ProductBroadcastState",
classOf[Int],
classOf[String]);

// 2 广播维度数据
val dimBroadcastStream = dimStream.broadcast(dimStateDesc)

// 3 将订单流与维度广播流进行连接
val resultStream = orderStream.connect(dimBroadcastStream)
.process(new BroadcastProcessFunction[(Int, Long), (Int, String), (Int, String, Long)] {
// 3.1 处理广播流数据
override def processBroadcastElement(value: (Int, String),
ctx: BroadcastProcessFunction[(Int, Long), (Int, String), (Int, String, Long)]#Context,
out: Collector[(Int, String, Long)]): Unit = {
val dimState = ctx.getBroadcastState(dimStateDesc)
dimState.put(value._1, value._2)
}

// 3.2 处理非广播流数据
override def processElement(value: (Int, Long),
ctx: BroadcastProcessFunction[(Int, Long), (Int, String), (Int, String, Long)]#ReadOnlyContext,
out: Collector[(Int, String, Long)]): Unit = {
val dimState = ctx.getBroadcastState(dimStateDesc)
if(dimState.contains(value._1)) {
out.collect((value._1, dimState.get(value._1), value._2))
} else {
out.collect((value._1, "N/A", value._2))
}
}
})

参考

  1. https://www.bilibili.com/video/BV1q7411N75R
  2. https://flink.apache.org/2019/06/26/broadcast-state.html(中文篇https://developer.aliyun.com/article/706760)
  3. https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/fault-tolerance/broadcast_state/
  4. https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/table/sourcessinks/#runtime-1
  5. https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/datastream/operators/asyncio/