jtLiBrain

任何伟大的事都不会一蹴而就,三分智慧,七分韧性

0%

OLTP

OLTP(On-Line Transaction Processing),即联机事务处理

产品:Mysql、Oracle、Redis、Hbase

OLAP

OLAP(On-Line Analytical Processing),联机分析处理

OLAP系统按照其存储器的数据存储格式可以分为:关系OLAP(RelationalOLAP,简称ROLAP)、多维OLAP(MultidimensionalOLAP,简称MOLAP)和混合型OLAP(HybridOLAP,简称HOLAP)三种类型。

  • ROLAP:ROLAP将分析用的多维数据存储在关系数据库中并根据应用的需要有选择的定义一批实视图作为表也存储在关系数据库中。不必要将每一个SQL查询都作为实视图保存,只定义那些应用频率比较高、计算工作量比较大的查询作为实视图。对每个针对OLAP服务器的查询,优先利用已经计算好的实视图来生成查询结果以提高查询效率。同时用作ROLAP存储器的RDBMS也针对OLAP作相应的优化,比如并行存储、并行查询、并行数据管理、基于成本的查询优化、位图索引、SQL的OLAP扩展(cube,rollup)等等。
  • MOLAP:MOLAP将OLAP分析所用到的多维数据物理上存储为多维数组的形式,形成“立方体”的结构。维的属性值被映射成多维数组的下标值或下标的范围,而总结数据作为多维数组的值存储在数组的单元中。由于MOLAP采用了新的存储结构,从物理层实现起,因此又称为物理OLAP(PhysicalOLAP);而ROLAP主要通过一些软件工具或中间软件实现,物理层仍采用关系数据库的存储结构,因此称为虚拟OLAP(VirtualOLAP)。
  • HOLAP:由于MOLAP和ROLAP有着各自的优点和缺点(如下表所示),且它们的结构迥然不同,这给分析人员设计OLAP结构提出了难题。为此一个新的OLAP结构——混合型OLAP(HOLAP)被提出,它能把MOLAP和ROLAP两种结构的优点结合起来。迄今为止,对HOLAP还没有一个正式的定义。但很明显,HOLAP结构不应该是MOLAP与ROLAP结构的简单组合,而是这两种结构技术优点的有机结合,能满足用户各种复杂的分析请求。
分类 介绍 产品 优点 缺点
MOLAP 以多维数组为存储模型的OLAP。

特点:数据预计算,然后把预计算结果“立方体”存到多维数组里。
Kylin
Druid
cube包含所有维度的聚合结果,所以查询速度非常快。

相对关系型数据库,计算结果数据的磁盘空间占用更小,扩展性强,适用于维度数量多的模型
对于维度多的模型预计算慢,空间占用大。update cube的时间跟计算维度(group)相关,随着维度增加计算时间大幅增加,此外预计算还会造成数据库占用急剧膨胀。需要提前设计维度模型,查询分析的内容仅限于这些指定维度,增加维度需要重新计算。
ROLAP 基于关系模型存放数据,一般要求事实表和维度表按一定关系设计,它不需要预计算,使用标准SQL查询不同维度数据。 Elasticsearch
Hive
Spark SQL
Flink SQL
Presto
Impala
GreenPlum
ClickHouse
Doris
更适合处理非聚合的数据,例如文本描述

基于row数据更容易做权限管理
因为是即时计算,查询响应时间一般比预计算的MOLAP长
HOLAP MOLAP和ROLAP类型的混合运用
细节的数据以ROLAP的形式存放,更加方便灵活,而高度聚合的数据以MOLAP的形式展现
更适合于高效的分析处理。公司使用HOLAP的目的是根据不同场景来利用不同OLAP的特性。

OLTP和OLAP对比

OLTP OLAP
特征 功能 基本查询 分析决策
场景 简单事务 复杂查询
用户 初级的 决策者/高级的
用户数 上千个 上百万个
架构 面向应用 面向主题
数据 当前的,二维的 历史的,多维的
存取 百千条 上百万条
数据量 MB ~ GB GB、TD、PB、EB
产品 离线 Mysql、Oracle Hive、Presto
实时 Redis、Hbase Druid、Kylin

HTAP

产品:TiDB

参考

  1. OLTP、OLAP与HTAP中文OLTP, OLAP and HTAP英文
  2. https://thutmose.blog.csdn.net/article/details/108863376?spm=1001.2101.3001.6650.15&utm_medium=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~Rate-15.pc_relevant_default&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~Rate-15.pc_relevant_default&utm_relevant_index=20
  3. 大数据OLAP组件
  4. 大数据OLAP系统(1)大数据OLAP系统(2)
  5. https://zhuanlan.zhihu.com/p/266402829
  6. https://slidetodoc.com/data-warehouse-is-403-chapter-5-olap-dr/
  7. https://www.stitchdata.com/resources/oltp-vs-olap/
  8. https://www.jamesserra.com/archive/2016/12/what-is-htap/
  9. https://pingcap.medium.com/delivering-real-time-analytics-and-true-htap-by-combining-columnstore-and-rowstore-1e006d3c3ef5
  10. https://www.oreilly.com/library/view/building-real-time-data/9781491975879/ch01.html
  11. https://www.ibm.com/cloud/blog/olap-vs-oltp
  12. https://www.geeksforgeeks.org/difference-between-olap-and-oltp-in-dbms/
  13. https://www.educba.com/olap-tools/

BlockStoreShuffleReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
override def read(): Iterator[Product2[K, C]] = {
val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))

val serializerInstance = dep.serializer.newInstance()

// Create a key/value iterator for each stream
val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}

// Update the context task metrics for each record read.
val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
readMetrics.incRecordsRead(1)
record
},
context.taskMetrics().mergeShuffleReadMetrics())

// An interruptible iterator must be used here in order to support task cancellation
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)

val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
// We are reading values that are already combined
val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else {
// We don't know the value type, but also don't care -- the dependency *should*
// have made sure its compatible w/ this aggregator, which will convert the value
// type to the combined type C
val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else {
interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
}

// Sort the output if there is a sort ordering defined.
val resultIter = dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data.
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was finished/cancelled.
context.addTaskCompletionListener[Unit](_ => {
sorter.stop()
})
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter
}

resultIter match {
case _: InterruptibleIterator[Product2[K, C]] => resultIter
case _ =>
// Use another interruptible iterator here to support task cancellation as aggregator
// or(and) sorter may have consumed previous interruptible iterator.
new InterruptibleIterator[Product2[K, C]](context, resultIter)
}
}

ShuffleBlockFetcherIterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
// Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
// nodes, rather than blocking on reading output from one node.
val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize
+ ", maxBlocksInFlightPerAddress: " + maxBlocksInFlightPerAddress)

// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
// at most maxBytesInFlight in order to limit the amount of data in flight.
val remoteRequests = new ArrayBuffer[FetchRequest]

for ((address, blockInfos) <- blocksByAddress) {
if (address.executorId == blockManager.blockManagerId.executorId) {
blockInfos.find(_._2 <= 0) match {
case Some((blockId, size)) if size < 0 =>
throw new BlockException(blockId, "Negative block size " + size)
case Some((blockId, size)) if size == 0 =>
throw new BlockException(blockId, "Zero-sized blocks should be excluded.")
case None => // do nothing.
}
localBlocks ++= blockInfos.map(_._1)
numBlocksToFetch += localBlocks.size
} else {
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[(BlockId, Long)]
while (iterator.hasNext) {
val (blockId, size) = iterator.next()
if (size < 0) {
throw new BlockException(blockId, "Negative block size " + size)
} else if (size == 0) {
throw new BlockException(blockId, "Zero-sized blocks should be excluded.")
} else {
curBlocks += ((blockId, size))
remoteBlocks += blockId
numBlocksToFetch += 1
curRequestSize += size
}
if (curRequestSize >= targetRequestSize ||
curBlocks.size >= maxBlocksInFlightPerAddress) {
// Add this FetchRequest
remoteRequests += new FetchRequest(address, curBlocks)
logDebug(s"Creating fetch request of $curRequestSize at $address "
+ s"with ${curBlocks.size} blocks")
curBlocks = new ArrayBuffer[(BlockId, Long)]
curRequestSize = 0
}
}
// Add in the final request
if (curBlocks.nonEmpty) {
remoteRequests += new FetchRequest(address, curBlocks)
}
}
}
logInfo(s"Getting $numBlocksToFetch non-empty blocks including ${localBlocks.size}" +
s" local blocks and ${remoteBlocks.size} remote blocks")
remoteRequests
}

NOTEs

本文以Spark 2.4.3为基础。