记一次SparkStreaming作业由于Kerberos过期导致卡死

Posted on Tue, Jun 1, 2021 Spark Kerberos

1. 前言

先说下结论,这个问题是由于 Spark 2.1.0.cloudera1 版本的 bug(其他 cdh 相关版本未验证,原生 2.1 并没有这个问题),再加上 Kerberos 凭据过期时间点正好与 checkpoint 的时间点重合导致的。

2. 排查过程

查看 5 月 29 号当天 HDFS 数据,发现在 15:35 分后就没有数据了:

查看作业 Driver 端,日志如下:

 21/05/31 11:33:20 INFO JobScheduler: Added jobs for time 1622432000000 ms
 21/05/31 11:33:20 INFO JobGenerator: Checkpointing graph for time 1622432000000 ms
 21/05/31 11:33:20 INFO DStreamGraph: Updating checkpoint data for time 1622432000000 ms
 21/05/31 11:33:20 INFO DStreamGraph: Updated checkpoint data for time 1622432000000 ms
 21/05/31 11:33:20 INFO CheckpointWriter: Submitted checkpoint of time 1622432000000 ms to writer queue
 21/05/31 11:33:20 INFO CheckpointWriter: Saving checkpoint for time 1622432000000 ms to file 'hdfs://nzj-cluster-gdyd/zkx_deploy/xdrhub_jiake/checkpoint/cyw/https/CY_HTTPS_JIAKE_GZ_XDR/all/checkpoint-1622432000000'
 21/05/31 11:33:20 WARN UserGroupInformation: PriviledgedActionException as:zkx (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for zkx: HDFS_DELEGATION_TOKEN owner=zkx, renewer=yarn, realUser=oozie/service2.nzj.gdyd@NZJ.GDYD, issueDate=1619681808569, maxDate=1622273808569, sequenceNumber=370742481, masterKeyId=803) can't be found in cache
 21/05/31 11:33:20 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for zkx: HDFS_DELEGATION_TOKEN owner=zkx, renewer=yarn, realUser=oozie/service2.nzj.gdyd@NZJ.GDYD, issueDate=1619681808569, maxDate=1622273808569, sequenceNumber=370742481, masterKeyId=803) can't be found in cache
 21/05/31 11:33:20 WARN UserGroupInformation: PriviledgedActionException as:zkx (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for zkx: HDFS_DELEGATION_TOKEN owner=zkx, renewer=yarn, realUser=oozie/service2.nzj.gdyd@NZJ.GDYD, issueDate=1619681808569, maxDate=1622273808569, sequenceNumber=370742481, masterKeyId=803) can't be found in cache
 21/05/31 11:33:20 WARN CheckpointWriter: Error in attempt 1 of writing checkpoint to 'hdfs://nzj-cluster-gdyd/zkx_deploy/xdrhub_jiake/checkpoint/cyw/https/CY_HTTPS_JIAKE_GZ_XDR/all/checkpoint-1622432000000'
 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for zkx: HDFS_DELEGATION_TOKEN owner=zkx, renewer=yarn, realUser=oozie/service2.nzj.gdyd@NZJ.GDYD, issueDate=1619681808569, maxDate=1622273808569, sequenceNumber=370742481, masterKeyId=803) can't be found in cache
     at org.apache.hadoop.ipc.Client.call(Client.java:1504)
     at org.apache.hadoop.ipc.Client.call(Client.java:1441)
     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
     at com.sun.proxy.$Proxy10.delete(Unknown Source)
     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:535)
     at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:260)
     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
     at com.sun.proxy.$Proxy11.delete(Unknown Source)
     at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2062)
     at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:684)
     at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:680)
     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
     at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:680)
     at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
 21/05/31 11:33:20 INFO CheckpointWriter: Saving checkpoint for time 1622432000000 ms to file 'hdfs://nzj-cluster-gdyd/zkx_deploy/xdrhub_jiake/checkpoint/cyw/https/CY_HTTPS_JIAKE_GZ_XDR/all/checkpoint-1622432000000'
 Exception in thread "pool-20-thread-1582" java.lang.NullPointerException
     at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)

可以看到,Driver 并未挂掉,而是持续抛出异常,重点关注 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for zkx: HDFS_DELEGATION_TOKEN owner=zkx, renewer=yarn, realUser=oozie/service2.nzj.gdyd@NZJ.GDYD, issueDate=1619681808569, maxDate=1622273808569, sequenceNumber=370742481, masterKeyId=803) can't be found 这句,其中 maxDate=1622273808569 对应时间点为 2021-05-29 15:36:48,正好在数据中断后的第一个 batch 里,接着后面又抛出了一个空异常:

 Exception in thread "pool-20-thread-1582" java.lang.NullPointerException
     at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)
 ...

从空异常的相关日志也可以看到,该空异常是从在一个线程池里执行的,再结合 pool-20-thread-1582 的线程 ID 可以推导出,在每次抛出空异常后,Spark 并没有对该异常做任何处理,而是简单的让线程挂掉并新建一个线程,所以才能看到这么大的线程 ID。

根据空异常的提示,查看 Checkpoint.scala:233 处的 Spark 源码(版本 2.1.0.cloudera1):

     def run() {
       if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) {
         latestCheckpointTime = checkpointTime
       }
       if (fs == null) {
         fs = new Path(checkpointDir).getFileSystem(hadoopConf)
       }
       var attempts = 0
       val startTime = System.currentTimeMillis()
       val tempFile = new Path(checkpointDir, "temp")
       // We will do checkpoint when generating a batch and completing a batch. When the processing
       // time of a batch is greater than the batch interval, checkpointing for completing an old
       // batch may run after checkpointing of a new batch. If this happens, checkpoint of an old
       // batch actually has the latest information, so we want to recovery from it. Therefore, we
       // also use the latest checkpoint time as the file name, so that we can recover from the
       // latest checkpoint file.
       //
       // Note: there is only one thread writing the checkpoint files, so we don't need to worry
       // about thread-safety.
       val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime)
       val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, latestCheckpointTime)
 
       while (attempts < MAX_ATTEMPTS && !stopped) {
         attempts += 1
         try {
           logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
 
           // Write checkpoint to temp file
           fs.delete(tempFile, true) // just in case it exists
           val fos = fs.create(tempFile)
           Utils.tryWithSafeFinally {
             fos.write(bytes)
           } {
             fos.close()
           }
 
           // If the checkpoint file exists, back it up
           // If the backup exists as well, just delete it, otherwise rename will fail
           if (fs.exists(checkpointFile)) {
             fs.delete(backupFile, true) // just in case it exists
             if (!fs.rename(checkpointFile, backupFile)) {
               logWarning(s"Could not rename $checkpointFile to $backupFile")
             }
           }
 
           // Rename temp file to the final checkpoint file
           if (!fs.rename(tempFile, checkpointFile)) {
             logWarning(s"Could not rename $tempFile to $checkpointFile")
           }
 
           // Delete old checkpoint files
           val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
           if (allCheckpointFiles.size > 10) {
             allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { file =>
               logInfo(s"Deleting $file")
               fs.delete(file, true)
             }
           }
 
           // All done, print success
           val finishTime = System.currentTimeMillis()
           logInfo(s"Checkpoint for time $checkpointTime saved to file '$checkpointFile'" +
             s", took ${bytes.length} bytes and ${finishTime - startTime} ms")
           jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)
           return
         } catch {
           case ioe: IOException =>
             val msg = s"Error in attempt $attempts of writing checkpoint to '$checkpointFile'"
             logWarning(msg, ioe)
             fs = null
         }
       }
       logWarning(s"Could not write checkpoint for time $checkpointTime to file '$checkpointFile'")
     }
   }

可以看到,run 方法中主要是一个有限次数的循环,结合日志 Error in attempt 1 of writing checkpoint to... 可以看到第一次循环捕获异常后会把 fs 设置为 null,进入第二轮循环,而空异常正好是在 fs.delete(tempFile, true) // just in case it exists 处抛出的,而 run 方法中没有对空异常进行捕获,导致线程直接挂掉。同时 Spark 也无法感知到任何异常的发生。

查看原生 Spark 2.1 相同处的源码,发现 fs 有对 null 进行了处理:

           logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
           if (fs == null) {
             fs = new Path(checkpointDir).getFileSystem(hadoopConf)
           }

也就是说,起码从 Spark 2.1 开始,不存在该问题。

3. 出现问题的原因

从日志来看,当 Kerberos 过期,而 Spark Driver 端又正好在写 checkpoint 的时候,就会出现以上异常。

Kerberos 的有效间隔为 30 天整,一旦我们启动 Spark 作业的时间点正好撞上 checkpoint 时间点,就会重复出现这个问题,但概率较小,具有随机性。

由于近期为了提高资源利用率缩短了 batch 间隔,所以以上问题发生的概率增加了,虽然概率仍然很小,但随着处理间隔越短,发生该问题的概率越高。

可以考虑通过升级 Spark 版本的方式来彻底解决该问题。