Spark3 - RDD 源码解读

Posted on Sat, Aug 21, 2021 Spark3.0.3 源码 Scala

1. RDD 的定义

RDD ,是 Spark 对一组数据集合的抽象描述,用于表示一个弹性的、可容错的分布式数据集合。RDD 可以从纵向与横向划分出四个属性:

Spark 基于 Dependency 构建血缘关系,并生成一张有向无环图,有向无环图可以根据 Shuffle 划分为多个 Stage,每个 Stage 都可以转化多个 Compute 任务,并分发到各个 Partition 上,Partitioner 则决定了数据应该去往哪个 Partition。

2. 源码解读

2.1 RDD 的构造

RDD 的构造函数如下:

 abstract class RDD[T: ClassTag](
     @transient private var _sc: SparkContext,
     @transient private var deps: Seq[Dependency[_]]
   ) extends Serializable with Logging {
   // ......
   /** Construct an RDD with just a one-to-one dependency on one parent */
   def this(@transient oneParent: RDD[_]) =
     this(oneParent.context, List(new OneToOneDependency(oneParent)))
   // ......
   /**
    * :: DeveloperApi ::
    * Implemented by subclasses to compute a given partition.
    */
   @DeveloperApi
   def compute(split: Partition, context: TaskContext): Iterator[T]
 
   /**
    * Implemented by subclasses to return the set of partitions in this RDD. This method will only
    * be called once, so it is safe to implement a time-consuming computation in it.
    *
    * The partitions in this array must satisfy the following property:
    *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
    */
   protected def getPartitions: Array[Partition]
 
   /**
    * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
    * be called once, so it is safe to implement a time-consuming computation in it.
    */
   protected def getDependencies: Seq[Dependency[_]] = deps
 
   /**
    * Optionally overridden by subclasses to specify placement preferences.
    */
   protected def getPreferredLocations(split: Partition): Seq[String] = Nil
 
   /** Optionally overridden by subclasses to specify how they are partitioned. */
   @transient val partitioner: Option[Partitioner] = None
   // ......
 }

各构造参数的含义如下:

同时还有一些重要的内部变量及方法:

可以看到,RDD 里的变量以及方法也对应了 Dependency、Compute、Partitions、Partitioner 。

RDD 使用泛型来表示任意数据类型,这使得 RDD 十分灵活,允许在内部存储任意结构的数据。

RDD 有许多子类实现,这里暂且以构造初始数据的 org.apache.spark.rdd.ParallelCollectionRDD(复杂点的还有 HadoopRDD 等) 为例,有如下代码:

 val rdd1 = spark.sparkContext.makeRDD(Seq("a", "b", "c", "d"))

其中 makeRDD(...) 方法会创建一个 ParallelCollectionRDD:

   def makeRDD[T: ClassTag](
       seq: Seq[T],
       numSlices: Int = defaultParallelism): RDD[T] = withScope {
     // 传入数据序列以及指定分片
     parallelize(seq, numSlices)
   }
 
   def parallelize[T: ClassTag](
       seq: Seq[T],
       numSlices: Int = defaultParallelism): RDD[T] = withScope {
     assertNotStopped()
     // 创建并返回一个 ParallelCollectionRDD
     new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
   }

其中 withScope 主要适用于记录一些 RDD 构造过程中的上下文信息(如调用的方法名,状态变化等),这里并非重点,不做详细赘述。

查看 ParallelCollectionRDD 的构造函数如下:

 private[spark] class ParallelCollectionRDD[T: ClassTag](
     sc: SparkContext,
     @transient private val data: Seq[T],
     numSlices: Int,
     locationPrefs: Map[Int, Seq[String]])
     extends RDD[T](sc, Nil) {

   // 存储了 partition 信息,每个 partition 代表一个分片的数据
   override def getPartitions: Array[Partition] = {
     val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
     slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
   }
 
   // 具体计算逻辑,可以接收一个具体的 parition 并封装为一个 Iterator
   override def compute(s: Partition, context: TaskContext): Iterator[T] = {
     new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
   }
 
   // 存储了数据本地性的相关信息,不同 RDD 有不同的实现,HadoopRDD 则是存储在 split 中
   override def getPreferredLocations(s: Partition): Seq[String] = {
     locationPrefs.getOrElse(s.index, Nil)
   }
 }

可以看到,ParallelCollectionRDD 会将接收到的 SparkContext 传递给 RDD 接口,同时将一个 Nil 对象传递给 RDD 接口记录到 deps 中,这是由于当前 RDD 属于最起始的 RDD,并没有依赖的父 RDD。至此,一个 ParallelCollectionRDD 的构造便结束了。

2.2 Transformation

Transformation 算子,可以从一个 RDD 生成另一个 RDD,构造计算逻辑,但不触发真正的计算。

对前面定义的 rdd1 执行 Transformation 操作,代码如下:

 val rdd2 = rdd1.flatMap(line => line.split("\n"))
     .map((_, 1))

以下分别为 flatMap 以及 map 方法的源码:

   def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
     val cleanF = sc.clean(f)
     new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
   }

   def map[U: ClassTag](f: T => U): RDD[U] = withScope {
     val cleanF = sc.clean(f)
     new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
   }

可以看到两个方法都是构造一个 org.apache.spark.rdd.MapPartitionsRDD,RDD 调用这些高阶函数,并将自身 this 以及接收到的方法 f 作为 MapPartitionsRDD 构造参数。

MapPartitionsRDD 的构造函数如下:

 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
     var prev: RDD[T],
     f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
     preservesPartitioning: Boolean = false,
     isFromBarrier: Boolean = false,
     isOrderSensitive: Boolean = false)
   extends RDD[U](prev) {
 
   override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
 
   override def compute(split: Partition, context: TaskContext): Iterator[U] =
     f(context, split.index, firstParent[T].iterator(split, context))
   // ......
 }

可以看到,RDD 将自身传入 MapPartitionsRDD 后,MapPartitionsRDD 又通过 RDD[U](prev) 将其传递给 RDD 接口,由前面对 RDD 源码的解析可知,这个 poev 会被构造为一个 Dependency 记录起来:

   def this(@transient oneParent: RDD[_]) =
     this(oneParent.context, List(new OneToOneDependency(oneParent)))

至此,RDD 之间的 Dependency 构造大致是理清了:一个 RDD 生成新的 RDD,新的 RDD 会将把父 RDD 构造为 Dependency 保存起来。

2.3 Action

Action 会真正触发 RDD 的计算逻辑,执行前面指定的 Transformation 算子。

rdd2 执行 Action 算子,代码如下:

 rdd2.foreach(println)

查看 foreach 代码:

   def foreach(f: T => Unit): Unit = withScope {
     val cleanF = sc.clean(f)
     sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
   }
 
   def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
     runJob(rdd, func, 0 until rdd.partitions.length)
   }

可以看到,RDD 通过 foreach 调用 sc.runJob() 方法,将自身传以及一个函数传递进去,并且还会通过 rdd.partitions 访问 RDD 的 partition 信息。

从这里,会开始借助前面构造的 Dependency 信息,一层一层的追溯父 RDD 的 partition 信息,其实现如下

   // rdd.partitions
   final def partitions: Array[Partition] = {
     checkpointRDD.map(_.partitions).getOrElse {
       if (partitions_ == null) {
         stateLock.synchronized {
           if (partitions_ == null) {
             // 调用 RDD 的 getPartitions
             partitions_ = getPartitions
             partitions_.zipWithIndex.foreach { case (partition, index) =>
               require(partition.index == index,
                 s"partitions($index).partition == ${partition.index}, but it should equal $index")
             }
           }
         }
       }
       partitions_
     }
   }

此处的 rdd.partitions 内部会调用其 getPartitions 方法,根据之前的源码解析,可以知道这是 RDD 的一个抽象方法,在 MapPartitionsRDD 中的实现及调用如下:

   // org.apache.spark.rdd.MapPartitionsRDD#getPartitions
   override def getPartitions: Array[Partition] = firstParent[T].partitions
 
   // org.apache.spark.rdd.RDD#firstParent
   protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
     dependencies.head.rdd.asInstanceOf[RDD[U]]
   }

   // org.apache.spark.rdd.RDD#dependencies
   final def dependencies: Seq[Dependency[_]] = {
     checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
       if (dependencies_ == null) {
         stateLock.synchronized {
           if (dependencies_ == null) {
             // 获取 denpendency 序列
             dependencies_ = getDependencies
           }
         }
       }
       dependencies_
     }
   }
 
   // org.apache.spark.rdd.RDD#getDependencies
   protected def getDependencies: Seq[Dependency[_]] = deps

可以看到,MapPartitionsRDD 通过调用链 org.apache.spark.rdd.RDD#firstParent -> org.apache.spark.rdd.RDD#dependencies -> org.apache.spark.rdd.RDD#getDependencies,从 Dependency 获取到父 RDD,接着父 RDD 也会调用 partitions 方法,再次重复上述步骤直到(当前 Stage)最顶级的 RDD。

按照例子中 RDD 的构建顺序,最终会追溯到 ParallelCollectionRDD,其 getPartitions 实现如下:

   override def getPartitions: Array[Partition] = {
     val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
     slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
   }

可以看到,这里构造了 ParallelCollectionRDD 的 Parititon 数组,partition 在最初构造的时候就已经决定了,之后逐级回到之前的 org.apache.spark.SparkContext#runJob 方法中,继续往下走:

   def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
     runJob(rdd, func, 0 until rdd.partitions.length)
   }

   def runJob[T, U: ClassTag](
       rdd: RDD[T],
       func: Iterator[T] => U,
       partitions: Seq[Int]): Array[U] = {
     val cleanedFunc = clean(func)
     runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
   }

   def runJob[T, U: ClassTag](
       rdd: RDD[T],
       func: (TaskContext, Iterator[T]) => U,
       partitions: Seq[Int]): Array[U] = {
     val results = new Array[U](partitions.size)
     runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
     results
   }
 
   def runJob[T, U: ClassTag](
       rdd: RDD[T],
       func: (TaskContext, Iterator[T]) => U,
       partitions: Seq[Int],
       resultHandler: (Int, U) => Unit): Unit = {
     if (stopped.get()) {
       throw new IllegalStateException("SparkContext has been shutdown")
     }
     val callSite = getCallSite
     val cleanedFunc = clean(func)
     logInfo("Starting job: " + callSite.shortForm)
     if (conf.getBoolean("spark.logLineage", false)) {
       logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
     }
     // DAGScheduler 负责构造 taskset 并结合 taskScheduler、SchedulerBackend 分发任务
     dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
     progressBar.foreach(_.finishAll())
     rdd.doCheckpoint()
   }

经过多个函数调用后,会走到 DAGScheduler 那里真正的提交任务。调度模块的源码不属于此次解读范围,暂不详细展开,这部分主要工作是构造 task 的相关信息,最终将 task 以 RPC 的方式推送到各个 Executor 端。

与 RDD 获取 partitions 信息一样,RDD 的 Compute 同样是借助 Dependency 嵌套执行(Volcano 模型) 的。Executor 在接收到调度器发来的 task 时,会对其进行处理并封装为 Runnable 放入线程池执行:

 // org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.LaunchTask$#unapply
 case LaunchTask(data) =>
     if (executor == null) {
         exitExecutor(1, "Received LaunchTask command but executor was null")
     } else {
         // 反序列化 task
         val taskDesc = TaskDescription.decode(data.value)
         logInfo("Got assigned task " + taskDesc.taskId)
         taskResources(taskDesc.taskId) = taskDesc.resources
         // 启动一个 task
         executor.launchTask(this, taskDesc)
     }
 
 
 def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
     // 将 task 封装为实现了 Runnable 接口的 TaskRunner
     val tr = new TaskRunner(context, taskDescription)
     runningTasks.put(taskDescription.taskId, tr)
     // 放入线程池执行
     threadPool.execute(tr)
 }

org.apache.spark.executor.Executor.TaskRunner 的 run 方法定义如下:

     override def run(): Unit = {
     // ......
         val res = task.run(
             taskAttemptId = taskId,
             attemptNumber = taskDescription.attemptNumber,
             metricsSystem = env.metricsSystem,
             resources = taskDescription.resources)
     // ......
     }

最终调用会走到 org.apache.spark.scheduler.ResultTask#runTask 里:

   override def runTask(context: TaskContext): U = {
     // Deserialize the RDD and the func using the broadcast variables.
     val threadMXBean = ManagementFactory.getThreadMXBean
     val deserializeStartTimeNs = System.nanoTime()
     val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
       threadMXBean.getCurrentThreadCpuTime
     } else 0L
     val ser = SparkEnv.get.closureSerializer.newInstance()
     val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
       ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
     _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
     _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
       threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
     } else 0L
 
     func(context, rdd.iterator(partition, context))
   }

方法最后一行 rdd.iterator 又会触发父 RDD 的溯源:

   // org.apache.spark.rdd.RDD#iterator
   final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
     if (storageLevel != StorageLevel.NONE) {
       getOrCompute(split, context)
     } else {
       // 并未进行持久化,走下面的逻辑
       computeOrReadCheckpoint(split, context)
     }
   }
 
   // org.apache.spark.rdd.RDD#computeOrReadCheckpoint
   private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
   {
     if (isCheckpointedAndMaterialized) {
       firstParent[T].iterator(split, context)
     } else {
       // 调用 RDD 的 compute 方法
       compute(split, context)
     }
   }
 
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
     var prev: RDD[T],
     f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
     preservesPartitioning: Boolean = false,
     isFromBarrier: Boolean = false,
     isOrderSensitive: Boolean = false)
   extends RDD[U](prev) {
   // ......
   override def compute(split: Partition, context: TaskContext): Iterator[U] =
     f(context, split.index, firstParent[T].iterator(split, context))
   // ......
 }

可以看到最终又回到各个 RDD 自行实现的 copmute 函数并通过 firstParent 追溯父 RDD 直到(当前 Stage)最顶级的 RDD。

可以看到,compute 方法是将一个方法作用在一个 Iterator 上,并返回一个新的 Iterator,层层嵌套调用执行计算的。

在 Iterator 上嵌套计算完成后,Executor 返回结果信息给 Driver。至此,RDD 的整个计算流程结束。

3. 扩展

org.apache.spark.SparkContext#hadoopFile 看到这样一行注释:

 // This is a hack to enforce loading hdfs-site.xml.
 // See SPARK-11227 for details.
 FileSystem.getLocal(hadoopConfiguration)

大意是会强制加载 hdfs-site.xml 配置文件的一个操作。但为什么需要这样做?

Congiguration 在新建时,默认会从加载本地配置文件。在 Spark 中,Congiguration 实例化完成后需要经过序列化-反序列传输到各个 Executor 上,这样一来,每个 Executor 反序列化新建 Configuration 时又需要从磁盘加载一次配置文件。

为了节省这个开销(详见 SPARK-8135),Spark 1.5 修改了 SerializableWritable (实际使用的是 SerializableConfiguration,但逻辑基本一样)的反序列化代码,在反序列化生成 Configuration 时不会再重新加载本地配置文件:

 class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
 
   def value: T = t
 
   override def toString: String = t.toString
 
   private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     out.defaultWriteObject()
     new ObjectWritable(t).write(out)
   }
 
   private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     val ow = new ObjectWritable()
     // 设置 false 不再从磁盘上读取配置文件
     ow.setConf(new Configuration(false))
     ow.readFields(in)
     t = ow.get().asInstanceOf[T]
   }
 }

但这样存在一个问题,如果在传输到 Executor 之前,Configuration 没有先添加好完整信息的话,会导致 Executor 端的 Configuration 并不完整导致异常的发生。

因此此处执行了 FileSystem.getLocal(hadoopConfiguration) ,先在 Driver 端强制读取本地配置文件构造完整的 Configuration,再通过广播变量发送到 Executor 端。执行前后 Configuration 内容变化如下:

详细的官方 issue 详见:[SPARK-11227][CORE] UnknownHostException can be thrown when NameNode HA is enabled.