1. RDD 的定义
RDD ,是 Spark 对一组数据集合的抽象描述,用于表示一个弹性的、可容错的分布式数据集合。RDD 可以从纵向与横向划分出四个属性:
- 纵向:Dependency、Compute
- 横向:Partitioner、Partitions
Spark 基于 Dependency 构建血缘关系,并生成一张有向无环图,有向无环图可以根据 Shuffle 划分为多个 Stage,每个 Stage 都可以转化多个 Compute 任务,并分发到各个 Partition 上,Partitioner 则决定了数据应该去往哪个 Partition。
2. 源码解读
2.1 RDD 的构造
RDD 的构造函数如下:
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
// ......
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
// ......
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
// ......
}
各构造参数的含义如下:
_sc
: SparkContext,存储了 Spark 的上下文信息deps
: 表示 RDD 依赖链,RDD 正是依赖 Dependency 序列构建血缘,并进行回溯的;数组类型意味着一个 RDD 可能有多条依赖链,如 ZippedPartitionsRDD2
同时还有一些重要的内部变量及方法:
- compute: 计算逻辑,由子类具体实现
- getPartitions:获取分区数组,仅会被调用一次(如触发 action 时,此处暂不详细展开)
- getDependencies: 获取依赖的父 RDD,同样进会被调用一次
- partitioner,定义了数据 Shuffle 时的规则,需要由子类具体去实现
可以看到,RDD 里的变量以及方法也对应了 Dependency、Compute、Partitions、Partitioner 。
RDD 使用泛型来表示任意数据类型,这使得 RDD 十分灵活,允许在内部存储任意结构的数据。
RDD 有许多子类实现,这里暂且以构造初始数据的 org.apache.spark.rdd.ParallelCollectionRDD
(复杂点的还有 HadoopRDD 等) 为例,有如下代码:
val rdd1 = spark.sparkContext.makeRDD(Seq("a", "b", "c", "d"))
其中 makeRDD(...)
方法会创建一个 ParallelCollectionRDD:
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
// 传入数据序列以及指定分片
parallelize(seq, numSlices)
}
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
// 创建并返回一个 ParallelCollectionRDD
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
其中 withScope
主要适用于记录一些 RDD 构造过程中的上下文信息(如调用的方法名,状态变化等),这里并非重点,不做详细赘述。
查看 ParallelCollectionRDD 的构造函数如下:
private[spark] class ParallelCollectionRDD[T: ClassTag](
sc: SparkContext,
@transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
// 存储了 partition 信息,每个 partition 代表一个分片的数据
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
// 具体计算逻辑,可以接收一个具体的 parition 并封装为一个 Iterator
override def compute(s: Partition, context: TaskContext): Iterator[T] = {
new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
}
// 存储了数据本地性的相关信息,不同 RDD 有不同的实现,HadoopRDD 则是存储在 split 中
override def getPreferredLocations(s: Partition): Seq[String] = {
locationPrefs.getOrElse(s.index, Nil)
}
}
可以看到,ParallelCollectionRDD 会将接收到的 SparkContext 传递给 RDD 接口,同时将一个 Nil
对象传递给 RDD 接口记录到 deps
中,这是由于当前 RDD 属于最起始的 RDD,并没有依赖的父 RDD。至此,一个 ParallelCollectionRDD 的构造便结束了。
2.2 Transformation
Transformation 算子,可以从一个 RDD 生成另一个 RDD,构造计算逻辑,但不触发真正的计算。
对前面定义的 rdd1
执行 Transformation 操作,代码如下:
val rdd2 = rdd1.flatMap(line => line.split("\n"))
.map((_, 1))
以下分别为 flatMap
以及 map
方法的源码:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
}
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
可以看到两个方法都是构造一个 org.apache.spark.rdd.MapPartitionsRDD
,RDD 调用这些高阶函数,并将自身 this
以及接收到的方法 f
作为 MapPartitionsRDD 构造参数。
MapPartitionsRDD 的构造函数如下:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
// ......
}
可以看到,RDD 将自身传入 MapPartitionsRDD 后,MapPartitionsRDD 又通过 RDD[U](prev)
将其传递给 RDD 接口,由前面对 RDD 源码的解析可知,这个 poev
会被构造为一个 Dependency 记录起来:
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
至此,RDD 之间的 Dependency 构造大致是理清了:一个 RDD 生成新的 RDD,新的 RDD 会将把父 RDD 构造为 Dependency 保存起来。
2.3 Action
Action 会真正触发 RDD 的计算逻辑,执行前面指定的 Transformation 算子。
对 rdd2
执行 Action 算子,代码如下:
rdd2.foreach(println)
查看 foreach
代码:
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
可以看到,RDD 通过 foreach
调用 sc.runJob()
方法,将自身传以及一个函数传递进去,并且还会通过 rdd.partitions
访问 RDD 的 partition 信息。
从这里,会开始借助前面构造的 Dependency 信息,一层一层的追溯父 RDD 的 partition 信息,其实现如下:
// rdd.partitions
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
stateLock.synchronized {
if (partitions_ == null) {
// 调用 RDD 的 getPartitions
partitions_ = getPartitions
partitions_.zipWithIndex.foreach { case (partition, index) =>
require(partition.index == index,
s"partitions($index).partition == ${partition.index}, but it should equal $index")
}
}
}
}
partitions_
}
}
此处的 rdd.partitions
内部会调用其 getPartitions
方法,根据之前的源码解析,可以知道这是 RDD 的一个抽象方法,在 MapPartitionsRDD 中的实现及调用如下:
// org.apache.spark.rdd.MapPartitionsRDD#getPartitions
override def getPartitions: Array[Partition] = firstParent[T].partitions
// org.apache.spark.rdd.RDD#firstParent
protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}
// org.apache.spark.rdd.RDD#dependencies
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
stateLock.synchronized {
if (dependencies_ == null) {
// 获取 denpendency 序列
dependencies_ = getDependencies
}
}
}
dependencies_
}
}
// org.apache.spark.rdd.RDD#getDependencies
protected def getDependencies: Seq[Dependency[_]] = deps
可以看到,MapPartitionsRDD 通过调用链 org.apache.spark.rdd.RDD#firstParent
-> org.apache.spark.rdd.RDD#dependencies
-> org.apache.spark.rdd.RDD#getDependencies
,从 Dependency 获取到父 RDD,接着父 RDD 也会调用 partitions
方法,再次重复上述步骤直到(当前 Stage)最顶级的 RDD。
按照例子中 RDD 的构建顺序,最终会追溯到 ParallelCollectionRDD
,其 getPartitions
实现如下:
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
可以看到,这里构造了 ParallelCollectionRDD 的 Parititon 数组,partition 在最初构造的时候就已经决定了,之后逐级回到之前的 org.apache.spark.SparkContext#runJob
方法中,继续往下走:
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
// DAGScheduler 负责构造 taskset 并结合 taskScheduler、SchedulerBackend 分发任务
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
经过多个函数调用后,会走到 DAGScheduler 那里真正的提交任务。调度模块的源码不属于此次解读范围,暂不详细展开,这部分主要工作是构造 task 的相关信息,最终将 task 以 RPC 的方式推送到各个 Executor 端。
与 RDD 获取 partitions 信息一样,RDD 的 Compute 同样是借助 Dependency 嵌套执行(Volcano 模型) 的。Executor 在接收到调度器发来的 task 时,会对其进行处理并封装为 Runnable 放入线程池执行:
// org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.LaunchTask$#unapply
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
// 反序列化 task
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
// 启动一个 task
executor.launchTask(this, taskDesc)
}
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
// 将 task 封装为实现了 Runnable 接口的 TaskRunner
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
// 放入线程池执行
threadPool.execute(tr)
}
org.apache.spark.executor.Executor.TaskRunner
的 run 方法定义如下:
override def run(): Unit = {
// ......
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem,
resources = taskDescription.resources)
// ......
}
最终调用会走到 org.apache.spark.scheduler.ResultTask#runTask
里:
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
func(context, rdd.iterator(partition, context))
}
方法最后一行 rdd.iterator
又会触发父 RDD 的溯源:
// org.apache.spark.rdd.RDD#iterator
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
// 并未进行持久化,走下面的逻辑
computeOrReadCheckpoint(split, context)
}
}
// org.apache.spark.rdd.RDD#computeOrReadCheckpoint
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
// 调用 RDD 的 compute 方法
compute(split, context)
}
}
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev) {
// ......
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
// ......
}
可以看到最终又回到各个 RDD 自行实现的 copmute 函数并通过 firstParent
追溯父 RDD 直到(当前 Stage)最顶级的 RDD。
可以看到,compute
方法是将一个方法作用在一个 Iterator 上,并返回一个新的 Iterator,层层嵌套调用执行计算的。
在 Iterator 上嵌套计算完成后,Executor 返回结果信息给 Driver。至此,RDD 的整个计算流程结束。
3. 扩展
在 org.apache.spark.SparkContext#hadoopFile
看到这样一行注释:
// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)
大意是会强制加载 hdfs-site.xml 配置文件的一个操作。但为什么需要这样做?
Congiguration 在新建时,默认会从加载本地配置文件。在 Spark 中,Congiguration 实例化完成后需要经过序列化-反序列传输到各个 Executor 上,这样一来,每个 Executor 反序列化新建 Configuration 时又需要从磁盘加载一次配置文件。
为了节省这个开销(详见 SPARK-8135),Spark 1.5 修改了 SerializableWritable (实际使用的是 SerializableConfiguration,但逻辑基本一样)的反序列化代码,在反序列化生成 Configuration 时不会再重新加载本地配置文件:
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value: T = t
override def toString: String = t.toString
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.defaultWriteObject()
new ObjectWritable(t).write(out)
}
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
// 设置 false 不再从磁盘上读取配置文件
ow.setConf(new Configuration(false))
ow.readFields(in)
t = ow.get().asInstanceOf[T]
}
}
但这样存在一个问题,如果在传输到 Executor 之前,Configuration 没有先添加好完整信息的话,会导致 Executor 端的 Configuration 并不完整导致异常的发生。
因此此处执行了 FileSystem.getLocal(hadoopConfiguration)
,先在 Driver 端强制读取本地配置文件构造完整的 Configuration,再通过广播变量发送到 Executor 端。执行前后 Configuration 内容变化如下: