private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private[sql] lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
用在了两个地方,承接的是前后两个stage 的metrics
/**
* A [[ShuffleDependency]] that will partition rows of its child based on
* the partitioning scheme defined in `newPartitioning`. Those partitions of
* the returned ShuffleDependency will be the input of shuffle.
*/
@transient
lazy val shuffleDependency : ShuffleDependency[Int, InternalRow, InternalRow] = {
val dep = ShuffleExchangeExec.prepareShuffleDependency(
inputRDD,
child.output,
outputPartitioning,
serializer,
writeMetrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext, executionId, metrics("numPartitions") :: Nil)
dep
}
protected override def doExecute(): RDD[InternalRow] = {
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null) {
cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics)
}
cachedShuffleRDD
}

一般情况是,两个metrics 相同。 write 在前,read 在后
如果下个shuffle read task 没有完成或者失败,就会出现read 比write 少的情况。