Spark运行任务原理

本站文章《Spark提交作业原理》讲述了作业是如何提交到集群,本篇介绍任务是如何被运行的。

任务执行

Executor

1
2
3
4
5
6
// 接受ExecutorBackend提交任务请求
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}

TaskRunner

DriverEndpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
}

ShuffleStatus

MapOutputTracker

抽象类,MapOutputTrackerMaster和MapOutputTrackerWorker的父类。该类用来记录stage的map输出的位置。

1
2
3
4
5
6
7
/**
* 每次当丢失map输出的时候,这个driver端的计数器就递增。
* 该值作为task的一部分发送到executor,然后executor会拿这个epoch值和它迄今所接收到最大的epoch值做比较。
* 如果新的epoch值更大,那么executor会清除本地缓存的map输出状态,然后从driver端重新获取map输出的状态信息
*/
protected var epoch: Long = 0
protected val epochLock = new AnyRef

MapOutputTrackerMaster

1
2
3
4
/**
* driver端存储shuffleStatuses
*/
val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala

MapOutputTrackerWorker

该类担负了executor端从driver端的MapOutputTrackerMaster获取map输出信息的客户端。

1
2
3
// key是shuffleId
val mapStatuses: Map[Int, Array[MapStatus]] =
new ConcurrentHashMap[Int, Array[MapStatus]]().asScala

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 每个executor上的task都会用它在driver端被创建时driver端最新的epoch来调用次方法。
*/
def updateEpoch(newEpoch: Long): Unit = {
epochLock.synchronized {
if (newEpoch > epoch) {
logInfo("Updating epoch to " + newEpoch + " and clearing cache")
epoch = newEpoch
mapStatuses.clear()
}
}
}

TaskSchedulerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
Option(taskIdToTaskSetManager.get(tid)) match {
case Some(taskSet) =>
if (state == TaskState.LOST) {
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates) or its " +
"executor has been marked as failed.")
.format(state, tid))
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
assert(reason.isDefined)
dagScheduler.executorLost(failedExecutor.get, reason.get)
backend.reviveOffers()
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Marks the task has completed in all TaskSetManagers for the given stage.
*
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
* do not also submit those same tasks. That also means that a task completion from an earlier
* attempt can lead to the entire stage getting marked as successful.
*/
private[scheduler] def markPartitionCompletedInAllTaskSets(
stageId: Int,
partitionId: Int,
taskInfo: TaskInfo) = {
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
tsm.markPartitionCompleted(partitionId, taskInfo)
}
}

TaskResultGetter

TaskResultGetter在内部运行了一个线程池enqueueSuccessfulTask()和enqueueFailedTask()提交的任务。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

def enqueueSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
serializedData: ByteBuffer): Unit = {
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
directResult.value(taskResultSerializer.get())
(directResult, serializedData.limit())
case IndirectTaskResult(blockId, size) => // 对于IndirectTaskResult,需要从远端拉取结果
if (!taskSetManager.canFetchMoreResults(size)) {
// dropped by executor if size is larger than maxResultSize
sparkEnv.blockManager.master.removeBlock(blockId)
return
}
logDebug("Fetching indirect task result for TID %s".format(tid))
scheduler.handleTaskGettingResult(taskSetManager, tid)
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
if (!serializedTaskResult.isDefined) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result. */
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
return
}
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get.toByteBuffer)
// force deserialization of referenced value
deserializedResult.value(taskResultSerializer.get())
// 下载完block,通知将该数据块remove掉
sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)
}

// Set the task result size in the accumulator updates received from the executors.
// We need to do this here on the driver because if we did this on the executors then
// we would have to serialize the result again after updating the size.
result.accumUpdates = result.accumUpdates.map { a =>
if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
val acc = a.asInstanceOf[LongAccumulator]
assert(acc.sum == 0L, "task result size should not have been set on the executors")
acc.setValue(size.toLong)
acc
} else {
a
}
}
//
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
// Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
case NonFatal(ex) =>
logError("Exception while getting task result", ex)
taskSetManager.abort("Exception while getting task result: %s".format(ex))
}
}
})
}

TaskSetManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
// Check if any other attempt succeeded before this and this attempt has not been handled
if (successful(index) && killedByOtherAttempt.contains(tid)) {
// Undo the effect on calculatedTasks and totalResultSize made earlier when
// checking if can fetch more results
calculatedTasks -= 1
val resultSizeAcc = result.accumUpdates.find(a =>
a.name == Some(InternalAccumulator.RESULT_SIZE))
if (resultSizeAcc.isDefined) {
totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value
}

// Handle this task as a killed task
handleFailedTask(tid, TaskState.KILLED,
TaskKilled("Finish but did not commit due to another attempt succeeded"))
return
}

info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
if (speculationEnabled) {
successfulTaskDurations.insert(info.duration)
}
removeRunningTask(tid)

// Kill any other attempts for the same task (since those are unnecessary now that one
// attempt completed successfully).
for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
killedByOtherAttempt += attemptInfo.taskId
sched.backend.killTask(
attemptInfo.taskId,
attemptInfo.executorId,
interruptThread = true,
reason = "another attempt succeeded")
}
if (!successful(index)) {
tasksSuccessful += 1
logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +
s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +
s" ($tasksSuccessful/$numTasks)")
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
}
} else {
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
// There may be multiple tasksets for this stage -- we let all of them know that the partition
// was completed. This may result in some of the tasksets getting completed.
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
maybeFinishTaskSet()
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
*
*/
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
if (speculationEnabled && !isZombie) {
successfulTaskDurations.insert(taskInfo.duration)
}
tasksSuccessful += 1
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
}
maybeFinishTaskSet()
}
}
}

1
2
3
4
5
6
7
8
9
10
11
private def maybeFinishTaskSet() {
if (isZombie && runningTasks == 0) {
sched.taskSetFinished(this)
if (tasksSuccessful == numTasks) {
blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet(
taskSet.stageId,
taskSet.stageAttemptId,
taskSetBlacklistHelperOpt.get.execToFailures))
}
}
}

DagScheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stageId = task.stageId

outputCommitCoordinator.taskCompleted(
stageId,
task.stageAttemptId,
task.partitionId,
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)

if (!stageIdToStage.contains(task.stageId)) {
// The stage may have already finished when we get this event -- eg. maybe it was a
// speculative task. It is important that we send the TaskEnd event in any case, so listeners
// are properly notified and can chose to handle it. For instance, some listeners are
// doing their own accounting and if they don't get the task end event they think
// tasks are still running when they really aren't.
postTaskEnd(event)

// Skip all the actions if the stage has been cancelled.
return
}

val stage = stageIdToStage(task.stageId)

// Make sure the task's accumulators are updated before any other processing happens, so that
// we can post a task end event before any jobs or stages are updated. The accumulators are
// only updated in certain cases.
event.reason match {
case Success =>
task match {
case rt: ResultTask[_, _] =>
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
// Only update the accumulator once for each result task.
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
}
case None => // Ignore update if task's job has finished.
}
case _ =>
updateAccumulators(event)
}
case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event)
case _ =>
}
postTaskEnd(event)

event.reason match {
case Success =>
task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}

// taskSucceeded runs some user code that might throw an exception. Make sure
// we are resilient against that.
try {
job.listener.taskSucceeded(rt.outputId, event.result)
} catch {
case e: Exception =>
// TODO: Perhaps we want to mark the resultStage as failed?
job.listener.jobFailed(new SparkDriverExecutionException(e))
}
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
}

case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
shuffleStage.pendingPartitions -= task.partitionId
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
// available.
mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
}

if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)

// This call to increment the epoch may not be strictly necessary, but it is retained
// for now in order to minimize the changes in behavior from an earlier version of the
// code. This existing behavior of always incrementing the epoch following any
// successful shuffle map stage completion may have benefits by causing unneeded
// cached map outputs to be cleaned up earlier on executors. In the future we can
// consider removing this call, but this will require some extra investigation.
// See https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
mapOutputTracker.incrementEpoch()

clearCacheLocs()

if (!shuffleStage.isAvailable) {
// Some tasks had failed; let's resubmit this shuffleStage.
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.findMissingPartitions().mkString(", "))
submitStage(shuffleStage)
} else {
markMapStageJobsAsFinished(shuffleStage)
submitWaitingChildStages(shuffleStage)
}
}
}

case FetchFailed(bmAddress, shuffleId, mapId, _, failureMessage) =>
...

case failure: TaskFailedReason if task.isBarrier =>
...

case Resubmitted =>
handleResubmittedFailure(task, stage)

case _: TaskCommitDenied =>
// Do nothing here, left up to the TaskScheduler to decide how to handle denied commits

case _: ExceptionFailure | _: TaskKilled =>
// Nothing left to do, already handled above for accumulator updates.

case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.

case _: ExecutorLostFailure | UnknownReason =>
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job.
}
}

OutputCommitCoordinator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private[scheduler] def taskCompleted(
stage: Int,
stageAttempt: Int,
partition: Int,
attemptNumber: Int,
reason: TaskEndReason): Unit = synchronized {
val stageState = stageStates.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
return
})
reason match {
case Success =>
// The task output has been committed successfully
case _: TaskCommitDenied =>
logInfo(s"Task was denied committing, stage: $stage.$stageAttempt, " +
s"partition: $partition, attempt: $attemptNumber")
case _ =>
// Mark the attempt as failed to blacklist from future commit protocol
val taskId = TaskIdentifier(stageAttempt, attemptNumber)
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
if (stageState.authorizedCommitters(partition) == taskId) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
stageState.authorizedCommitters(partition) = null
}
}
}

Spark任务

ShuffleMapTask

注册ShuffleHandle

ShuffleDependency通过SortShuffleManager注册ShuffleHandle,对于具体返回的ShuffleHandle,其选取标准为:

  • 如果不需要进行map端合并,且下游reduce端分区数小于等于spark.shuffle.sort.bypassMergeThreshold所配置的值,那么返回BypassMergeSortShuffleHandle;
  • 否则,如果a) 序列化器支持对象重定位,且b) 不需要进行map端合并,且c) 下游reduce端分区数小于$2^{24}$,那么返回SerializedShuffleHandle;
  • 其它情况,返回BaseShuffleHandle;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}

ShuffleWriter

Spark shuffle writer》详细描述各ShuffleWriter的内部原理。

对于ShuffleHandle的不同实现,SortShuffleManager简单地根据类型匹配来返回相应的ShuffleWriter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}

ResultTask

NOTEs

本文以Spark 2.4.3为基础。