jtLiBrain

任何伟大的事都不会一蹴而就,三分智慧,七分韧性

0%

《Hive优化系列1:MR&Hive如何控制map和reduce任务数量》一文中介绍了Hadoop和Hive中Map和Reduce任务数量是如何决定的,本文我们将介绍Spark中基于文件的表其物理计划中map任务的产生过程。

FileSourceScanExec

工作过程:

  1. 首先遍历每一个文件,根据计算出来的最大分片大小对可切分的文件进行切分;
  2. 对切分后的文件分片根据分片大小降序排序,根据计算出来的最大分片大小将小的文件分片合并分配到相应分区,进而便是map任务的个数;

createNonBucketedReadRDD()方法为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Array[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val maxSplitBytes =
FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
)
}
}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

val partitions =
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)

// 使用分配的分区进而创建FileScanRDD
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}

切分文件

FilePartition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def maxSplitBytes(
sparkSession: SparkSession,
selectedPartitions: Seq[PartitionDirectory]): Long = {
// spark.sql.files.maxPartitionBytes
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
// spark.sql.files.openCostInBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// spark.sql.files.minPartitionNum
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
.getOrElse(sparkSession.sparkContext.defaultParallelism)
// 计算目标分片大小
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / minPartitionNum

Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}

要点:

  1. 分片不会过大,由spark.sql.files.maxPartitionBytes控制;

  2. 分片不会过小,由spark.sql.files.openCostInBytes控制;

  3. 目标分片大小等于:${\sum_{i=1}^n{(文件大小_i+开销)_i}}\over{并行度}$;

对文件进行分区

FilePartition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def getFilePartitions(
sparkSession: SparkSession,
partitionedFiles: Seq[PartitionedFile],
maxSplitBytes: Long): Seq[FilePartition] = {
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L

/** Close the current partition and move to the next. */
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
// Copy to a new Array.
val newPartition = FilePartition(partitions.size, currentFiles.toArray)
partitions += newPartition
}
currentFiles.clear()
currentSize = 0
}

val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// 使用"Next Fit Decreasing"将文件分配到分区
partitionedFiles.foreach { file =>
// maxSplitBytes控制分区大小,达到指定阈值,则创建分区
// 注意,没有计入文件开销
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
// 注意,当前文件在添加到当前分区时考虑了文件开销
currentSize += file.length + openCostInBytes
currentFiles += file
}
closePartition()
partitions.toSeq
}

模式分类

我们根据两条准则对模式进行分类:

  • 目的准则:即模式是用来完成什么工作的。a)创建型模式与对象的创建有关;b)结构型模式处理类或对象的组合;c)行为型模式对类或对象怎样交互和怎样分配职责进行描述;
  • 范围准则:指定模式主要是用于类还是对象。a)类模式处理类与子类之间的关系,这些关系通过继承建立,是静态的,在编译时便确定下来了;b)对象模式处理对象之间的关系,这些关系在运行时是可变化的;

创建型类模式将对象的部分创建工作延迟到子类,而创建型对象模式则将它延迟到另一个对象中;结构型类模式使用继承机制来组合类,而结构型对象模式则描述了对象的组装方式;行为型类模式使用继承描述算法和控制流,而行为型对象模式则描述一组对象怎样协作完成单个对象所无法完成的任务。

目的
创建型 结构型 行为型
范围 工厂方法Factory Method 适配器Adapter 解释器Interpreter
模版方法Template Method
对象 抽象工厂Abstract Factory
建造者Builder
原型Prototype
单例Singleton
适配器Adapter
桥接Bridge
组合Composite
装饰器Decorator
外观Facade
享元Fly weight
代理Proxy
责任链Chain of Responsibility
命令Command
迭代器Iterator
中介者Mediator
备忘录Memento
观察者Observer
状态State
策略Strategy
访问者Visitor

参考

  1. 《设计模式-可复用面向对象软件的基础》

窗口编程模型

  • Keyed Windows
1
2
3
4
5
6
7
8
9
stream
.keyBy(...) <- 仅 keyed 窗口需要
.window(...) <- 必填项:"assigner"
[.trigger(...)] <- 可选项:"trigger" (省略则使用默认 trigger)
[.evictor(...)] <- 可选项:"evictor" (省略则不使用 evictor)
[.allowedLateness(...)] <- 可选项:"lateness" (省略则为 0)
[.sideOutputLateData(...)] <- 可选项:"output tag" (省略则不对迟到数据使用 side output)
.reduce/aggregate/apply() <- 必填项:"function"
[.getSideOutput(...)] <- 可选项:"output tag"
  • Non-Keyed Windows
1
2
3
4
5
6
7
8
stream
.windowAll(...) <- 必填项:"assigner"
[.trigger(...)] <- 可选项:"trigger" (else default trigger)
[.evictor(...)] <- 可选项:"evictor" (else no evictor)
[.allowedLateness(...)] <- 可选项:"lateness" (else zero)
[.sideOutputLateData(...)] <- 可选项:"output tag" (else no side output for late data)
.reduce/aggregate/apply() <- 必填项:"function"
[.getSideOutput(...)] <- 可选项:"output tag"

窗口生命周期

备注:参考Flink代码WindowOperator和EvictingWindowOperator。

窗口分配器Window Assigner

WindowAssigner 负责将 stream 中的每个数据分发到一个或多个窗口中。

窗口函数Window Function

函数类型 说明 函数类 API
聚合函数 每一次有数据被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量聚合。 ReduceFunction
AggregateFunction
WindowedStream:
  sum(), max(), min(), maxBy(), reduce(), aggregate()
窗口函数 ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量的窗口计算。 ProcessWindowFunction
ProcessAllWindowFunction
WindowedStream:
  process(ProcessWindowFunction)
聚合+窗口函数 ProcessWindowFunction 与 ReduceFunction 或 AggregateFunction 搭配使用,使其能够在数据到达窗口的时候进行增量聚合。在窗口关闭时,ProcessWindowFunction 会得到聚合的结果。这样就可以做到增量聚合窗口的元素,并且可以从 ProcessWindowFunction中获得窗口的元数据。   WindowedStream:
  reduce(ReduceFunction,ProcessWindowFunction)
  aggregate(AggregateFunction,WindowFunction)

触发器Trigger

清除器Evictor

参考

  1. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/
  2. https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/streaming_analytics/
  3. https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/event-time/generating_watermarks/

图结构:

1
2
3
4
5
6
7
8
Map<String, String[]> graph = new HashMap<String, String[]>(){{
put("A", new String[]{"B", "C"});
put("B", new String[]{"A", "C", "D"});
put("C", new String[]{"A", "B", "D", "E"});
put("D", new String[]{"B", "C", "E", "F"});
put("E", new String[]{"C", "D"});
put("F", new String[]{"D"});
}};

BFS

思路

广度优先搜索算法的基本思想是:优先在当前层进行搜索,搜索完全后,再进入与当前层关联的下一层进行搜索;直到搜索完所有节点进而终止。

依据广度优先的思路,对于我们的问题进行一次展开:

步骤1:假定选择顶点 A 做为起点,首先搜索 A;

步骤2:进入下一层,

步骤2.1:顶点 A 的邻接点 B 、 C ,搜索 B 、 C ;

步骤3:进入下一层,

步骤3.1:顶点 B 的邻接点 A 、 C 、 D , A 、 C 已经搜索过了,搜索 D ;

步骤3.2:顶点 C 的邻接点有 A 、 B 、 D 、 E , A 、 B 、 C 、 D 已经搜索过了,搜索 E ;

步骤4:进入下一层,

步骤4.1:3.1中 D 的邻接点只有 F 没有搜索,搜索 F ;

步骤4.2:3.2中 E 的邻接点都搜索过了,终止;

最后,一个可能的解是:A->B->C->D->E->F

代码

BFS实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void BFS(Map<String, String[]> graph, String start) {
Queue<String> queue = new ArrayDeque();
queue.offer(start);

Set<String> visited = new HashSet<>();
visited.add(start);

while (!queue.isEmpty()) {
String vertex = queue.poll();

for(String adVertex : graph.get(vertex)) {
if(!visited.contains(adVertex)) { // 存在有环图,入队前判断是否已经访问过
queue.offer(adVertex);
visited.add(adVertex);
}
}

System.out.print(vertex + " ");
}
}

求解的问题

  1. 可以用于求解图中两点间的最短路径;

DFS

思路

代码

DFS实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void DFS(Map<String, String[]> graph, String start) {
Deque<String> stack = new ArrayDeque();
stack.push(start);

Set<String> visited = new HashSet<>();
visited.add(start);

while (!stack.isEmpty()) {
String vertex = stack.pop();

for(String adVertex : graph.get(vertex)) {
if(!visited.contains(adVertex)) { // 存在有环图,入队前判断是否已经访问过
stack.push(adVertex);
visited.add(adVertex);
}
}

System.out.print(vertex + " ");
}
}

Dijkstra

参考

  1. https://www.bilibili.com/video/BV1Ks411575U/?spm_id_from=autoNext
  2. https://www.geeksforgeeks.org/breadth-first-search-or-bfs-for-a-graph/

聚合

TODO

高级聚合

GROUPING SETS

1
GROUP BY GROUPING SETS(分组集1[, 分组集2, ...])

对关键字 GROUPING SETS后面指定的多个分组集合中每个集合都进行一次分组操作,等价于按照每个分组集合单独进行聚合,然后再将这些聚合结果进行 UNION ALL

注意:为了兼容Hive,Spark也支持GROUP BY ... GROUPING SETS (...)这样的写法。GROUP BY 表达式通常被忽略,但是如果GROUP BY 表达式中包含有GROUPING SETS表达式以外的表达式,那么这些额外的表达式将被包含进分组表达式中且其值为NULL,例如SELECT a, b, c FROM ... GROUP BY a, b, c GROUPING SETS (a, b)

GROUPING SETS 等价表示
GROUP BY GROUPING SETS((a), (b))

GROUP BY GROUPING SETS(a, b)
GROUP BY a
UNION ALL
GROUP BY b
GROUP BY a,b GROUPING SETS(a, b) SELECT a, NULL ... GROUP BY a
UNION ALL
SELECT NULL, b ... GROUP BY b

ROLLUP

1
2
3
GROUP BY 分组表达式1[, 分组表达式2, ...] WITH ROLLUP

GROUP BY ROLLUP(分组集1[, 分组集2, ...])
RULLUP 等价表示
GROUP BY a, b WITH ROLLUP

GROUP BY ROLLUP(a, b)
GROUP BY GROUPING SETS((a, b), (a), ())

CUBE

1
2
3
GROUP BY 分组表达式1[, 分组表达式2, ...] WITH CUBE

GROUP BY CUBE(分组集1[, 分组集2, ...])
CUBE 等价表示
GROUP BY a, b WITH CUBE

GROUP BY CUBE(a,b)
GROUP BY GROUPING SETS((a,b), (a), (b), ())
GROUP BY CUBE(a, b, (a,c)) GROUP BY GROUPING SETS((a,b,c), (a,b), (a,c), (b,a,c), (a), (b), (a,c), ())

参考:

  1. https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-groupby.html
  2. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+GroupBy#LanguageManualGroupBy-Multi-Group-ByInserts
  3. https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C+Grouping+and+Rollup

数据类型

日期时间类型

DateType

TimestampType

基础数据类型

参考类org.apache.spark.sql.catalyst.expressions.Cast中的castToTimestamp()castToDate()

字符串类型

数值类型

时间间隔类型

YearMonthIntervalType

DayTimeIntervalType

时区

可以通过以下三种方式来设置时区参数:

  • 区域ID所表示的时区:区域ID的格式必须是:area/city,例如,America/Los_AngelesAsia/Shanghai
  • 时区偏移量:时区偏移量的格式必须是:(+|-)HH(+|-)HH:mm(+|-)HH:mm:ss;例如-08+01:00-13:33:33
  • 也可以使用短名称UTC来表示+00:00,其他短名称不建议使用。

查看时区:

1
2
sql> select current_timezone();
sql> SET spark.sql.session.timeZone;

设置时区

TIMEZONE参数

1
2
3
4
5
-- 使用区域ID设置时区
sql> SET timezone = America/Los_Angeles;

-- 使用时区偏移量设置时区
sql> SET timezone = +08:00;

TIME ZONE关键字

1
2
3
4
5
6
7
8
9
10
11
12
-- 设置为系统默认时区
sql> SET TIME ZONE LOCAL;

-- 使用区域ID设置时区
sql> SET TIME ZONE 'America/Los_Angeles';

-- 使用时区偏移量设置时区
sql> SET TIME ZONE '+08:00';

-- 使用间隔量设置时区
sql> SET TIME ZONE INTERVAL 1 HOUR 30 MINUTES;
sql> SET TIME ZONE INTERVAL '08:30:00' HOUR TO SECOND;

spark.sql.session.timeZone参数

1
2
3
4
5
-- 使用区域ID设置时区
sql> SET spark.sql.session.timeZone = America/Los_Angeles;

-- 使用时区偏移量设置时区
sql> SET spark.sql.session.timeZone = +08:00;

函数

转换为UTC时间

unix_timestamp

函数 unix_timestamp(timeExp, String fmt)
返回值类型 bigint
描述 将当前系统时区表示的时间转换为UTC秒数
参数 timeExp:可选参数,Spark中该参数可以是date、timestamp、string类型,Hive中该参数是string类型。
fmt:可选参数,当timeExp不是string类型时,会忽略该参数。默认值为yyyy-MM-dd HH:mm:ss。参考日期时间格式

示例1:

1
sql> SELECT unix_timestamp();

示例2:入参格式串没有时区信息,默认使用系统时区

1
2
3
sql> SET TIME ZONE 'Asia/Shanghai';
sql> SELECT unix_timestamp('1970-01-01 08:00:00', 'yyyy-MM-dd HH:mm:ss');
0

示例3:入参格式串包含明确的时区信息,则使用入参中的时区

1
2
3
sql> SET TIME ZONE 'Asia/Shanghai';
sql> SELECT unix_timestamp('1970-01-01T08:00:00.000+0800', "yyyy-MM-dd'T'HH:mm:ss.SSSZ");
0

示例4:入参格式串包含明确的时区信息,则使用入参中的时区

1
2
3
sql> SET TIME ZONE 'Etc/UTC';
sql> SELECT unix_timestamp('1970-01-01T08:00:00.000+0800', "yyyy-MM-dd'T'HH:mm:ss.SSSZ");
0

to_unix_timestamp

该函数与unix_timestamp()函数功能类似,有以下几点需要注意:

  1. 该函数只是unix_timestamp()函数相比,unix_timestamp()函数有空函数的形式,返回当前时间的UTC时间;
  2. 该函数只有Spark支持,Hive不支持该函数;

时间与时区的运算

from_utc_timestamp

该函数理解起来比较困难,它实际的运算路径是:时间 –(等价变换)–> 目标时区 –(加/减时区差)–>结果时间,从语义上它表达是在时间加上某个时区相对于UTC偏移量后的时间。

函数 from_utc_timestamp(ts, string to_tz)
返回值类型 timestamp
描述 时间加上某个时区偏移量后的时间
参数 ts:UTC时间;Spark中可以是timestamp/date、时间字符串类型;Hive中可以是timestamp/date、tinyint/smallint/int/bigint、float/double、decimal。
to_tz:目标时区(从UTC到目标时区)

示例1:UTC 2016-08-31加上8小时(上海时区与UTC时区为8小时)后为UTC 2016-08-31T08:00:00.000

1
2
3
4
5
6
7
8
9
sql> SET TIME ZONE 'UTC';
sql> SELECT \
from_utc_timestamp('2016-08-31', 'UTC'), \
from_utc_timestamp('2016-08-31', 'Asia/Shanghai'), \
from_utc_timestamp('2016-08-31', 'Asia/Seoul');

2016-08-31T00:00:00.000+0000
2016-08-31T08:00:00.000+0000
2016-08-31T09:00:00.000+0000

示例2:

1
2
3
4
5
6
7
8
9
sql> SET TIME ZONE 'Asia/Shanghai';
sql> SELECT \
from_utc_timestamp('2016-08-31', 'UTC'), \
from_utc_timestamp('2016-08-31', 'Asia/Shanghai'), \
from_utc_timestamp('2016-08-31', 'Asia/Seoul');

2016-08-31T00:00:00.000+0800
2016-08-31T08:00:00.000+0800
2016-08-31T09:00:00.000+0800

to_utc_timestamp

与from_utc_timestamp()函数相对应,从语义上该函数表达是在时间减去某个时区相对于UTC偏移量后的时间。

函数 to_utc_timestamp(ts, string to_tz)
返回值类型 timestamp
描述 时间减去某个时区偏移量后的时间
参数 ts:UTC时间;Spark中可以是timestamp/date、时间字符串类型;Hive中可以是timestamp/date、tinyint/smallint/int/bigint、float/double、decimal。
to_tz:目标时区(从目标时区到UTC)

示例1:

1
2
3
4
5
6
7
8
9
10
11
sql> SET TIME ZONE 'UTC'
sql> SELECT \
to_utc_timestamp('1970-01-02 08:00:00.000+0800', 'Etc/UTC'), \
to_utc_timestamp('1970-01-02 08:00:00.000', 'Etc/UTC'), \
to_utc_timestamp('1970-01-02 08:00:00.000+0800', 'Asia/Shanghai'), \
to_utc_timestamp('1970-01-02 08:00:00.000', 'Asia/Shanghai');

1970-01-02T00:00:00.000+0000
1970-01-02T08:00:00.000+0000
1970-01-01T16:00:00.000+0000
1970-01-02T00:00:00.000+0000

示例2:

1
2
3
4
5
6
7
8
9
10
11
sql> SET TIME ZONE 'Asia/Shanghai'
sql> SELECT \
to_utc_timestamp('1970-01-02 08:00:00.000+0800', 'Etc/UTC'), \
to_utc_timestamp('1970-01-02 08:00:00.000', 'Etc/UTC'), \
to_utc_timestamp('1970-01-02 08:00:00.000+0800', 'Asia/Shanghai'), \
to_utc_timestamp('1970-01-02 08:00:00.000', 'Asia/Shanghai');

1970-01-02T08:00:00.000+0800
1970-01-02T08:00:00.000+0800
1970-01-02T00:00:00.000+0800
1970-01-02T00:00:00.000+0800

字符串转换时间

to_timestamp

函数 to_timestamp(string ts[, string fmt])
返回值类型 timestamp
描述 将时间字符串按指定格式解析为当前系统时区表示的时间
参数 ts:时间字符串。
fmt:格式化字符串,参考Datetime PatternsJava SimpleDateFormat。

注意:Hive不支持该函数。

示例1:入参时间字符串没有时区信息,则默认使用系统当前时区对其进行解析

1
2
3
sql> SET TIME ZONE 'UTC';
sql> SELECT to_timestamp('2022-05-15 19:00:00', 'yyyy-MM-dd HH:mm:ss');
2022-05-15T19:00:00.000+0000

示例2:入参时间字符串没有时区信息,则默认使用系统当前时区对其进行解析

1
2
3
sql> SET TIME ZONE 'Asia/Shanghai';
sql> SELECT to_timestamp('2022-05-15 19:00:00', 'yyyy-MM-dd HH:mm:ss');
2022-05-15T19:00:00.000+0800

示例3:入参时间字符串包含明确的时区信息

1
2
3
sql> SET TIME ZONE 'UTC';
sql> SELECT to_timestamp('2022-05-15T19:00:00.000+0800', "yyyy-MM-dd'T'HH:mm:ss.SSSZ");
2022-05-15T11:00:00.000+0000

示例4:入参时间字符串包含明确的时区信息

1
2
3
sql> SET TIME ZONE 'Asia/Shanghai';
sql> SELECT to_timestamp('2022-05-15T19:00:00.000+0800', "yyyy-MM-dd'T'HH:mm:ss.SSSZ");
2022-05-15T19:00:00.000+0800

to_date

函数 to_date(string date[, string fmt])
返回值类型 date
描述 将时间字符串按指定格式解析为当前系统时区表示的时间
参数 date:时间字符串。
fmt:格式化字符串,参考Datetime PatternsJava SimpleDateFormat。

示例参考to_timestamp()函数。

格式化函数

from_unixtime

函数 from_unixtime(bigint unixtime[, string to_fmt])
返回值类型 string
描述 将时间戳按当前系统时区表示的时间进行格式化
参数 unixtime:从Unix纪元(1970-01-01 00:00:00 UTC)计数的秒数;
fmt:格式化字符串,参考Datetime PatternsJava SimpleDateFormat

示例1:

1
2
3
sql> SET TIME ZONE 'UTC';
sql> SELECT from_unixtime(0, 'yyyy-MM-dd HH:mm:ss');
1970-01-01 00:00:00

示例2:

1
2
3
sql> SET TIME ZONE 'Asia/Shanghai';
sql> SELECT from_unixtime(0, 'yyyy-MM-dd HH:mm:ss');
1970-01-01 08:00:00

示例3:

1
2
3
sql> SET TIME ZONE 'Asia/Shanghai';
sql> SELECT from_unixtime(0, "yyyy-MM-dd'T'HH:mm:ss.SSSZ");
1970-01-01T08:00:00.000+0800

date_format

函数 date_format(ts, string to_fmt)
返回值类型 string
描述 将时间按指定格式进行格式化
参数 ts:date/timestamp、或时间字符串。
to_fmt:格式化字符串,参考Datetime PatternsJava SimpleDateFormat。

示例1:入参时间字符串包含明确的时区信息

1
2
3
sql> SET TIME ZONE 'UTC';
sql> SELECT date_format('2022-05-15T19:00:00.000+0800', 'yyyy-MM-dd HH:mm:ss');
2022-05-15 11:00:00

示例2:入参时间字符串没有时区信息,则入参时间字符串默认为系统当前时区

1
2
3
sql> SET TIME ZONE 'UTC';
sql> SELECT date_format('2022-05-15 19:00:00', "yyyy-MM-dd'T'HH:mm:ss.SSSZ");
2022-05-15T19:00:00.000+0000

算数运算函数

date_add

date_sub

datediff

last_day

next_day

add_months

months_between

trunc

获取函数

参考

  1. https://docs.microsoft.com/en-us/azure/databricks/sql/language-manual/parameters/timezone
  2. https://docs.microsoft.com/en-us/azure/databricks/sql/language-manual/sql-ref-syntax-aux-conf-mgmt-set-timezone
  3. https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html#date-and-timestamp-functions
  4. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions
  5. https://spark.apache.org/docs/latest/api/sql/index.html