数仓实战系列:缓慢变化维度-Type2(历史拉链维度表)

缓慢变化维度是数仓中常见的问题,Type2类型的维度是处理数仓中追踪缓慢变化维度属性值最为强悍的技术手段。Type2通过添加新的维度记录来追踪维度信息的变更历史。本文通过一个示例介绍如何在实际当中使用历史拉链维度表解决缓慢变化维度的问题。

概述

TODO

拉链维度表中的列

有效日期

TODO

过期日期

参看《Data Warehouse Toolkit》第5章第3节【Type2: Add New Attribute】。

代理键

The type 2 response to slowly changing dimensions requires the use of surrogate keys, but you’re already using them anyhow, right? You certainly can’t use the operational natural key because there are multiple profile versions for the same natural key. It is not suicient to use the natural key with two or three version digits because you’d be vulnerable to the entire list of potential operational issues discussed in Chapter 3. Likewise, it is inadvisable to append an efective date to the otherwise primary key of the dimension table to uniquely identify each version. With the type 2 response, you create a new dimension row with a new single-column primary key to uniquely identify the new product profile. This single-column primary key establishes the linkage between the fact and dimension tables for a given set of product characteristics. There’s no need to create a confusing secondary join based on the dimension row’s efective or expiration dates.

We recognize some of you may be concerned about the administration of surrogate keys to support type 2 changes. In Chapter 19: ETL Subsystems and Techniques and Chapter 20: ETL System Design and Development Process and Tasks, we’ll discuss a workflow for managing surrogate keys and accommodating type 2 changes in more detail.

准备工作

业务系统

MySQL中创建商品表product

1
2
3
4
5
6
7
create table if not exists product(
product_id varchar(100), -- 商品编号
product_status varchar(100), -- 商品状态
created_time varchar(100), -- 商品创建时间
last_updated_time varchar(100), -- 商品最后修改时间
primary key (product_id)
)

数仓

Hive数仓中创建表t_ods_product和表 t_dim_product_his

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
create database demo;

create table if not exists demo.t_ods_product(
product_id string, -- 商品编号
product_status string, -- 商品状态
created_time string, -- 商品创建时间
last_updated_time string -- 商品最后修改时间
)
partitioned by (p_date string);

create table if not exists demo.t_dim_product_his(
product_id string, -- 商品编号
product_status string, -- 商品状态
created_time string, -- 商品创建时间
last_updated_time string, -- 商品最后修改时间
effective_date string, -- 生效日期
expiration_date string -- 失效日期
);

定义如下函数负责每日更新历史拉链表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Spark&Scala代码
def insertOrUpdateDimHis(day: String): Unit = {
val sql1 = "DROP TABLE IF EXISTS demo.t_dim_product_his_tmp"
val sql2 =
s"""
|CREATE TABLE demo.t_dim_product_his_tmp AS
|SELECT
| product_id, product_status, created_time, last_updated_time,
| effective_date, expiration_date
|FROM (
| SELECT
| dim.product_id, dim.product_status, dim.created_time, dim.last_updated_time,
| dim.effective_date,
| CASE
| WHEN ods.product_id IS NOT NULL AND dim.expiration_date > '$day' THEN '$day'
| ELSE dim.expiration_date
| END AS expiration_date
| FROM demo.t_dim_product_his dim
| LEFT JOIN (
| SELECT * FROM demo.t_ods_product WHERE p_date = '$day'
| ) ods ON (dim.product_id = ods.product_id)
| UNION ALL
| SELECT
| product_id, product_status, created_time, last_updated_time,
| last_updated_time AS effective_date,
| '9999-12-31' AS expiration_date
| FROM demo.t_ods_product
| WHERE p_date = '$day'
|) t
|ORDER BY product_id,effective_date
|""".stripMargin
println(sql2)

val sql3 = "INSERT OVERWRITE TABLE demo.t_dim_product_his SELECT * FROM demo.t_dim_product_his_tmp"

spark.sql(sql1)
spark.sql(sql2)
spark.sql(sql3)
}

以上insertOrUpdateDimHis()逻辑如下图所示,其中:

  • A部分为历史拉链表中product_id不存在于当日快照数据的记录,对于这部分记录保留拉链表中的expiration_date值;
  • B部分为历史拉链表中product_id存在于当日快照数据的记录,说明这部分记录的product_id所对应商品信息在当前日期发生了更改,需要将历史拉链表中这部分记录更新其expiration_date值为当前日期(注意,数据有效期限为[effective_date, expiration_date));
  • 当日快照中的数据为最新状态维度信息,记录的effective_date为对应的last_updated_timeexpiration_date9999-12-31
  • 将上述A、B更新后结果、以及当日快照数据进行合并形成最终的拉链表;

业务系统执行情况

下图为业务系统商品表product在2021年01月02日、2021年01月03日、2021年01月04日三天的快照记录。(具体实验数据参考

数仓系统执行情况

2021年01月03日

执行

将2021年01月02日新创建或更新过的商品从业务关系型数据库加载到t_ods_product表中p_date='2021-01-02'的分区中(注,dimProduct20210102代表的是使用Spark代码从业务系统加载2021-01-02的product表所创建的临时表):

1
2
3
-- HIVE SQL
insert overwrite table demo.t_ods_product partition (p_date='2021-01-02')
select * from dimProduct20210102 where created_time = '2021-01-02' or last_updated_time = '2021-01-02';

刷新历史拉链表:

1
2
// Spark&Scala代码
insertOrUpdateDimHis("2021-01-02")

结果

2021年01月04日

执行

将2021年01月03日新创建或更新过的商品从业务关系型数据库加载到t_ods_product表中p_date='2021-01-03'的分区中(注,dimProduct20210103代表的是使用Spark代码从业务系统加载2021-01-03的product表所创建的临时表):

1
2
3
-- HIVE SQL
insert overwrite table demo.t_ods_product partition (p_date='2021-01-03')
select * from dimProduct20210103 where created_time = '2021-01-03' or last_updated_time = '2021-01-03';

刷新历史拉链表:

1
2
// Spark&Scala代码
insertOrUpdateDimHis("2021-01-03")

结果

2021年01月05日

执行

将2021年01月03日新创建或更新过的商品从业务关系型数据库加载到t_ods_product表中p_date='2021-01-03'的分区中(注,dimProduct20210104代表的是使用Spark代码从业务系统加载2021-01-04的product表所创建的临时表):

1
2
3
-- HIVE SQL
insert overwrite table demo.t_ods_product partition (p_date='2021-01-04')
select * from dimProduct20210104 where created_time = '2021-01-04' or last_updated_time = '2021-01-04';

刷新历史拉链表:

1
2
// Spark&Scala代码
insertOrUpdateDimHis("2021-01-04")

结果

查询拉链表

查询商品表某一天的快照

查询商品表最新的快照

遗留问题

该示例拉链表中product_id是业务系统中product表的自然主键,并没有设计拉链表代理键的逻辑,后续会继续完善该篇。

  1. 2021年01月02日产品表快照
  2. 2021年01月03日产品表快照
  3. 2021年01月04日产品表快照
  4. 示例代码

参考

  1. https://www.pianshen.com/article/97501801493/
  2. https://blog.csdn.net/qq_44509920/article/details/106044647
  3. http://lxw1234.com/archives/2015/04/20.htm
  4. 《Data Warehouse Toolkit》第5章第3节