SparkSql 重写 FileCommitter 解决多任务并发写同目录问题

Posted on Mon, Nov 29, 2021 Spark

SparkSql 并发写同目录问题

近期在将 RDD 转 Dataframe 写入 HDFS 时,发现当有多个 Spark 作业同时写入数据到同一目录下,就会产生 _temporary 目录的并发安全问题。

Spark 写入数据到 HDFS 时,为了保证数据的一致性,会先将数据写入 _temporary 目录,再执行 commitTask -> commitJob 进行两次 Rename 操作,将 _temporary 目录下的文件全部输出到最终目标下,再删除 _temporary 目录:

在这种机制下,当多个作业同时写相同目录,一旦某个作业先完成,其他作业在 _temporary 下的数据就会被被错误地删除。

在网上搜索一番后,并没有发现 SparkSql 修改 _temporary 的方案,基本都是 RDD 调用 saveAsHadoopFile API 保存数据时,直接传递一个 conf 进去,但 SparkSql 并没有提供类似的 API,因此无法按照相同方式解决。

查看 SQL 执行计划,发现 SparkSql 使用了 InsertIntoHadoopFsRelationCommand 执行数据写入操作,其中有个 committer 的变量构造如下:

 val committer = FileCommitProtocol.instantiate(
       sparkSession.sessionState.conf.fileCommitProtocolClass,
       jobId = java.util.UUID.randomUUID().toString,
       outputPath = outputPath.toString,
       dynamicPartitionOverwrite = dynamicPartitionOverwrite)

方法中会通过 sparkSession.sessionState.conf.fileCommitProtocolClass 决定使用哪个类构造 FileCommitProtocol

 val FILE_COMMIT_PROTOCOL_CLASS =
     buildConf("spark.sql.sources.commitProtocolClass")
       .version("2.1.1")
       .internal()
       .stringConf
       .createWithDefault(
         "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")

跟踪 SQLHadoopMapReduceCommitProtocol 代码,可以看到内部还会构造一个 committer,这个 committer 才是真正决定数据写入路径的类:

     val configuration = context.getConfiguration
     val clazz =
       configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
 
     if (clazz != null) {
       logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
       if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
         val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
         committer = ctor.newInstance(new Path(path), context)
       } else {
         val ctor = clazz.getDeclaredConstructor()
         committer = ctor.newInstance()
       }
     }

可以看到,committer 是通过反射进行构造的,并且只有当 committer 为 FileOutputCommitter 的子类时,才会将上下文信息传递进去,否则使用一个无参构造器构造 committer

由于我们需要修改 committer 的路径行为,因此无参构造器的方式先否决掉。因此只能考虑通过 context 传递自定义配置,重写 FileOutputCommitter 处理传进去的自定义配置,修改各个作业的 _temporary 路径。

其中 context 是基于 SparkConf 构建的 Hadoop 上下文,Spark 默认会把 spark.hadoop.xxx 的配置参数,注入到 Hadoop 的上下文中。因此在构造 SparkConf 的时候,就需要将自定义的外部配置传递进去:

 val conf = new SparkConf()
     // 设置自定义的 OutputCommitter
     .set("spark.sql.sources.outputCommitterClass", classOf[SelfFileOutputCommitterV2].getName)
     // 设置为 _xxx_temporary 路径
     .set(SelfFileOutputCommitterV2.CUSTOM_PENDING_DIR_NAME, "xxx")
     // 设置写入成功的标志文件名为 _xxx_SUCCESS
     .set(SelfFileOutputCommitterV2.CUSTOM_SUCCEEDED_FILE_NAME , "xxx")
     // 设置不清理临时目录,作为测试验证使用
     .set(SelfFileOutputCommitterV2.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED, "true")
NOTE:一旦 SparkConf 在各个 executor 构造为 SparkContext 后,无法再进行修改,因此需要在最开始就传递进去。

自定义 Committer 详见 hadoop/org/apache/hadoop/mapreduce/lib/output/SelfFileOutputCommitterV2.java · zxk/code_repo - 码云 - 开源中国 (gitee.com)

关于对象存储

同时还了解到,当 Spark 写 Amazon S3 对象存储时,传统的 Hadoop OutputCommitter 的 Rename 机制已经不适用了。Reame 操作对于 S3 来说是一个很昂贵的操作,需要拆解为 List、Copy、Delete 等多个操作。

S3 为上传的文件提供了最终一致性,首先为该次上传申请一个 Upload id,接着将文件切割一块块上传,期间文件对外部不可见。待到全部上传完成后,再向 S3 发送一个完成信号,S3 会将文件进行合并,之后才对外可见;或者向 S3 发送一个失败信号,S3 删除文件。

因此出现了基于 S3 Multipart Upload 机制的 Committer(Amazon S3 提供),流程如下:

Spark 写 HDFS 的一致性思考

再次回顾下 Spark 写入文件的两个阶段:

虽然网上各种资料都说是为了保证数据的一致性,避免出现重复数据,但在 commitJob 阶段,这个动作仍是一个个文件 rename 到目标目录下的,这个操作并不是原子性的。

在源码中追溯到 Spark 的 taskAttempt 主要是与当前时间戳相关的。当写入方式为 Append 时,作业失败后重算,此时时间戳已发生变化,作业并无法识别到之前的数据,导致重复的数据写入。

参考