public void write(Iterator<Product2<K, V>> records) throwsIOException { assert (partitionWriters == null); if (!records.hasNext()) { partitionLengths = new long[numPartitions]; shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); return; } finalSerializerInstance serInstance = serializer.newInstance(); final long openStartTime = System.nanoTime(); partitionWriters = newDiskBlockObjectWriter[numPartitions]; partitionWriterSegments = newFileSegment[numPartitions]; for (int i = 0; i < numPartitions; i++) { // 为每个分区分别创建临时分区文件和shuffle block finalTuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock(); finalFile file = tempShuffleBlockIdPlusFile._2(); finalBlockId blockId = tempShuffleBlockIdPlusFile._1(); // 每个分区创建一个写入器 partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be // included in the shuffle write time. writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
while (records.hasNext()) { finalProduct2<K, V> record = records.next(); finalK key = record._1(); // 根据record的key,将其写入相应的分区文件中 partitionWriters[partitioner.getPartition(key)].write(key, record._2()); }
// 写入完成,调用commitAndGet()和close() for (int i = 0; i < numPartitions; i++) { finalDiskBlockObjectWriter writer = partitionWriters[i]; partitionWriterSegments[i] = writer.commitAndGet(); writer.close(); }
/** * 将分区文件拼接成一个汇总文件 * * @return 返回一个数值数组,其中数值项代表每个分区数据的字节长度 */ private long[] writePartitionedFile(File outputFile) throwsIOException { // Track location of the partition starts in the output file final long[] lengths = new long[numPartitions]; if (partitionWriters == null) { // We were passed an empty iterator return lengths; }
finalFileOutputStream out = newFileOutputStream(outputFile, true); final long writeStartTime = System.nanoTime(); boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { // 按分区ID,将分区文件数据copy到最终的文件 finalFile file = partitionWriterSegments[i].file(); if (file.exists()) { finalFileInputStream in = newFileInputStream(file); boolean copyThrewException = true; try { lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); copyThrewException = false; } finally { Closeables.close(in, copyThrewException); } if (!file.delete()) { logger.error("Unable to delete file for partition {}", i); } } } threwException = false; } finally { Closeables.close(out, threwException); writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } partitionWriters = null; return lengths; }
//写入当前分区的数据 public void write(scala.collection.Iterator<Product2<K, V>> records) throwsIOException { // Keep track of success so we know if we encountered an exception // We do this rather than a standard try/catch/re-throw to handle // generic throwables. boolean success = false; try { while (records.hasNext()) { insertRecordIntoSorter(records.next()); } closeAndWriteOutput(); success = true; } finally { ... } }
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) throwsIOException {
// for tests assert(inMemSorter != null); if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold); spill(); }
growPointerArrayIfNecessary(); final int uaoSize = UnsafeAlignedOffset.getUaoSize(); // Need 4 or 8 bytes to store the record length. final int required = length + uaoSize; acquireNewPageIfNecessary(required);
/** * Sort and spill the current records in response to memory pressure. */ @Override public long spill(long size, MemoryConsumer trigger) throwsIOException { if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) { return0L; }
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", Thread.currentThread().getId(), Utils.bytesToString(getMemoryUsage()), spills.size(), spills.size() > 1 ? " times" : " time");
writeSortedFile(false); final long spillSize = freeMemory(); inMemSorter.reset(); // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the // records. Otherwise, if the task is over allocated memory, then without freeing the memory // pages, we might not be able to get memory for the pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); return spillSize; }
if (isLastFile) { // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes. writeMetricsToUse = writeMetrics; } else { // We're spilling, so bytes written should be counted towards spill rather than write. // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count // them towards shuffle bytes written. writeMetricsToUse = newShuffleWriteMetrics(); }
// 小数据量写入到DiskBlockObjectWriter性能是极其不高效的,这里使用字节数组充当缓存 final byte[] writeBuffer = new byte[diskWriteBufferSize];
// Because this output will be read during shuffle, its compression codec must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more details. finalTuple2<TempShuffleBlockId, File> spilledFileInfo = blockManager.diskBlockManager().createTempShuffleBlock(); finalFile file = spilledFileInfo._2(); finalTempShuffleBlockId blockId = spilledFileInfo._1(); finalSpillInfo spillInfo = newSpillInfo(numPartitions, file, blockId);
int currentPartition = -1; final int uaoSize = UnsafeAlignedOffset.getUaoSize(); while (sortedRecords.hasNext()) { sortedRecords.loadNext(); final int partition = sortedRecords.packedRecordPointer.getPartitionId(); assert (partition >= currentPartition); if (partition != currentPartition) { // Switch to the new partition if (currentPartition != -1) { finalFileSegment fileSegment = writer.commitAndGet(); // 每个分区对应一个文件分片 spillInfo.partitionLengths[currentPartition] = fileSegment.length(); } currentPartition = partition; }
final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); finalObject recordPage = taskMemoryManager.getPage(recordPointer); final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); // 将数据从数据分页中copy到数组缓存 Platform.copyMemory( recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); // 将数组缓存中的数据写入 writer.write(writeBuffer, 0, toTransfer); recordReadPosition += toTransfer; dataRemaining -= toTransfer; } writer.recordWritten(); }
finalFileSegment committedSegment = writer.commitAndGet(); writer.close(); // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted, // then the file might be empty. Note that it might be better to avoid calling // writeSortedFile() in that case. if (currentPartition != -1) { spillInfo.partitionLengths[currentPartition] = committedSegment.length(); spills.add(spillInfo); }
if (!isLastFile) { // i.e. this is a spill file // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter // relies on its `recordWritten()` method being called in order to trigger periodic updates to // `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that // counter at a higher-level, then the in-progress metrics for records written and bytes // written would get out of sync. // // When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter; // in all other cases, we pass in a dummy write metrics to capture metrics, then copy those // metrics to the true write metrics here. The reason for performing this copying is so that // we can avoid reporting spilled bytes as shuffle write bytes. // // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`. // Consistent with ExternalSorter, we do not count this IO towards shuffle write time. // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this. writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten()); taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten()); } }
public void insertRecord(long recordPointer, int partitionId) { if (!hasSpaceForAnotherRecord()) { thrownewIllegalStateException("There is no space for new record"); } array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId)); pos++; }
// Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) val tmp = Utils.tempFileWith(output) try { val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, tmp) // 写入索引文件 shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") } } }
definsertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined
if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) // map更新插入 maybeSpillCollection(usingMap = true) } } else { // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])// buffer插入 maybeSpillCollection(usingMap = false) } } }
private[this] defspillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator) : SpilledFile = { // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock()
// These variables are reset after each flush var objectsWritten: Long = 0 val spillMetrics: ShuffleWriteMetrics = newShuffleWriteMetrics val writer: DiskBlockObjectWriter = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
// List of batch sizes (bytes) in the order they are written to disk val batchSizes = newArrayBuffer[Long]
// How many elements we have in each partition val elementsPerPartition = newArray[Long](numPartitions)
// Flush the disk writer's contents to disk, and update relevant variables. // The writer is committed at the end of this process. defflush(): Unit = { val segment = writer.commitAndGet() batchSizes += segment.length _diskBytesSpilled += segment.length objectsWritten = 0 }
var success = false try { while (inMemoryIterator.hasNext) { val partitionId = inMemoryIterator.nextPartition() require(partitionId >= 0 && partitionId < numPartitions, s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})") inMemoryIterator.writeNext(writer) elementsPerPartition(partitionId) += 1 objectsWritten += 1
if (objectsWritten == serializerBatchSize) { flush() } } if (objectsWritten > 0) { flush() } else { writer.revertPartialWritesAndClose() } success = true } finally { if (success) { writer.close() } else { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further writer.revertPartialWritesAndClose() if (file.exists()) { if (!file.delete()) { logWarning(s"Error deleting ${file}") } } } }
// Track location of each range in the output file val lengths = newArray[Long](numPartitions) val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics().shuffleWriteMetrics)
if (spills.isEmpty) {// 只有内存数据,没有溢出数据 // Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext) { val partitionId = it.nextPartition() while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(writer) } val segment = writer.commitAndGet() lengths(partitionId) = segment.length } } else {// 有溢出数据,需要进行溢出数据和内存中的数据进行归并排序 // We must perform merge-sort; get an iterator by partition and write everything directly. for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { for (elem <- elements) { // 将一个分区中的数据按序写入到文件流 writer.write(elem._1, elem._2) } val segment = writer.commitAndGet() // 每个分区进行一次提交 lengths(id) = segment.length } } }
/** * Return an iterator over all the data written to this object, grouped by partition and * aggregated by the requested aggregator. For each partition we then have an iterator over its * contents, and these are expected to be accessed in order (you can't "skip ahead" to one * partition without reading the previous one). Guaranteed to return a key-value pair for each * partition, in order of partition ID. * * For now, we just merge all the spilled files in once pass, but this can be modified to * support hierarchical merging. * Exposed for testing. */ defpartitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { val usingMap = aggregator.isDefined val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer if (spills.isEmpty) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID if (!ordering.isDefined) { // The user hasn't requested sorted keys, so only sort by partition ID, not key groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None))) } else { // We do need to sort by both partition ID and key groupByPartition(destructiveIterator( collection.partitionedDestructiveSortedIterator(Some(keyComparator)))) } } else { // Merge spilled and in-memory data merge(spills, destructiveIterator( collection.partitionedDestructiveSortedIterator(comparator))) } }
/** * Merge-sort a sequence of (K, C) iterators using a given a comparator for the keys. */ privatedefmergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K]) : Iterator[Product2[K, C]] = { val bufferedIters = iterators.filter(_.hasNext).map(_.buffered) typeIter= BufferedIterator[Product2[K, C]] val heap = new mutable.PriorityQueue[Iter]()(newOrdering[Iter] { // 对元素的key进行排序,注意这里对key进行了升序的处理 overridedefcompare(x: Iter, y: Iter): Int = comparator.compare(y.head._1, x.head._1) }) heap.enqueue(bufferedIters: _*) // Will contain only the iterators with hasNext = true newIterator[Product2[K, C]] { overridedefhasNext: Boolean = !heap.isEmpty
overridedefnext(): Product2[K, C] = { if (!hasNext) { thrownewNoSuchElementException } val firstBuf = heap.dequeue() // key最小的记录所在的迭代器出队 val firstPair = firstBuf.next() // 因为使用的是BufferedIterator,next()返回head的值,并不向后推进迭代器 if (firstBuf.hasNext) { // 如果出队的迭代器还有记录,将其入队 heap.enqueue(firstBuf) } firstPair } } }
defspill(): Boolean = SPILL_LOCK.synchronized { if (hasSpilled) { false } else { val inMemoryIterator = newWritablePartitionedIterator { private[this] var cur = if (upstream.hasNext) upstream.next() elsenull
defwriteNext(writer: DiskBlockObjectWriter): Unit = { writer.write(cur._1._2, cur._2) cur = if (upstream.hasNext) upstream.next() elsenull }
defhasNext(): Boolean = cur != null
defnextPartition(): Int = cur._1._1 } logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s" it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") val spillFile = spillMemoryIteratorToDisk(inMemoryIterator) forceSpillFiles += spillFile val spillReader = newSpillReader(spillFile) nextUpstream = (0 until numPartitions).iterator.flatMap { p => val iterator = spillReader.readNextPartition() iterator.map(cur => ((p, cur._1), cur._2)) } hasSpilled = true true } }
defreadNext(): ((Int, K), C) = SPILL_LOCK.synchronized { if (nextUpstream != null) { upstream = nextUpstream nextUpstream = null } if (upstream.hasNext) { upstream.next() } else { null } }
overridedefhasNext(): Boolean = cur != null
overridedefnext(): ((Int, K), C) = { val r = cur cur = readNext() r } }
/** Construct a stream that only reads from the next batch */ defnextBatchStream(): DeserializationStream = { // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether // we're still in a valid batch. if (batchId < batchOffsets.length - 1) { if (deserializeStream != null) { deserializeStream.close() fileStream.close() deserializeStream = null fileStream = null }
assert(end >= start, "start = " + start + ", end = " + end + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) // 从文件流中读取(end - start)个字节 val bufferedStream = newBufferedInputStream(ByteStreams.limit(fileStream, end - start))
val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream) // 返回当前batch数据的解序列化流 serInstance.deserializeStream(wrappedStream) } else { // No more batches left cleanup() null } }
// ExternalSorter.mergeSort()每次出队时,会先调用next(),拿到缓存的nextItem, // 然后调用hasNext()读取下一个元素,并设置给nextItem overridedefhasNext: Boolean = { if (nextItem == null) { nextItem = readNextItem() if (nextItem == null) { returnfalse } } assert(lastPartitionId >= myPartition) // Check that we're still in the right partition; note that readNextItem will have returned // null at EOF above so we would've returned false there lastPartitionId == myPartition }
/** * 从解序列化流中读取下一个(K,C)对,如果当前batch被消费完了,那么触发下一个batch的读取 */ privatedefreadNextItem(): (K, C) = { if (finished || deserializeStream == null) { returnnull } val k = deserializeStream.readKey().asInstanceOf[K] val c = deserializeStream.readValue().asInstanceOf[C] lastPartitionId = partitionId // Start reading the next batch if we're done with this one indexInBatch += 1 if (indexInBatch == serializerBatchSize) { indexInBatch = 0 deserializeStream = nextBatchStream() } // Update the partition location of the element we're reading indexInPartition += 1 skipToNextPartition() // If we've finished reading the last partition, remember that we're done if (partitionId == numPartitions) { finished = true if (deserializeStream != null) { deserializeStream.close() } } (k, c) }
/** Iterate through the data and write out the elements instead of returning them. Records are * returned in order of their partition ID and then the given comparator. * This may destroy the underlying collection. * * 其中的keyComparator为ExternalSorter中的定义的 */ defdestructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]]) : WritablePartitionedIterator = { val it = partitionedDestructiveSortedIterator(keyComparator) newWritablePartitionedIterator { private[this] var cur = if (it.hasNext) it.next() elsenull
defwriteNext(writer: DiskBlockObjectWriter): Unit = { writer.write(cur._1._2, cur._2) cur = if (it.hasNext) it.next() elsenull }
private[spark] objectWritablePartitionedPairCollection{ /** * A comparator for (Int, K) pairs that orders them by only their partition ID. */ defpartitionComparator[K]: Comparator[(Int, K)] = newComparator[(Int, K)] { overridedefcompare(a: (Int, K), b: (Int, K)): Int = { a._1 - b._1 } }
/** * A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering. */ defpartitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = { newComparator[(Int, K)] { overridedefcompare(a: (Int, K), b: (Int, K)): Int = { val partitionDiff = a._1 - b._1 if (partitionDiff != 0) { // 对内存集合中记录排序时,首先比较其分区,分区ID作为第一排序依据 partitionDiff } else { keyComparator.compare(a._2, b._2) // 分区相同,分区内部中记录按“键”进行排序 } } } } }
/** Iterate through the data in a given order. For this class this is not really destructive. */ overridedefpartitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]) : Iterator[((Int, K), V)] = { // 对内存集合中记录排序时,首先比较其分区,分区ID作为第一排序依据,分区相同,分区内部中记录按“键”进行排序; // 如果不需要shuffle操作是不排序/不聚合的操作,那么只按照分区ID进行排序; val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator) // 内部使用TimSort排序 newSorter(newKVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator) iterator }