1. 前言
先说下结论,这个问题是由于 Spark 2.1.0.cloudera1 版本的 bug(其他 cdh 相关版本未验证,原生 2.1 并没有这个问题),再加上 Kerberos 凭据过期时间点正好与 checkpoint 的时间点重合导致的。
2. 排查过程
查看 5 月 29 号当天 HDFS 数据,发现在 15:35 分后就没有数据了:
查看作业 Driver 端,日志如下:
21/05/31 11:33:20 INFO JobScheduler: Added jobs for time 1622432000000 ms
21/05/31 11:33:20 INFO JobGenerator: Checkpointing graph for time 1622432000000 ms
21/05/31 11:33:20 INFO DStreamGraph: Updating checkpoint data for time 1622432000000 ms
21/05/31 11:33:20 INFO DStreamGraph: Updated checkpoint data for time 1622432000000 ms
21/05/31 11:33:20 INFO CheckpointWriter: Submitted checkpoint of time 1622432000000 ms to writer queue
21/05/31 11:33:20 INFO CheckpointWriter: Saving checkpoint for time 1622432000000 ms to file 'hdfs://nzj-cluster-gdyd/zkx_deploy/xdrhub_jiake/checkpoint/cyw/https/CY_HTTPS_JIAKE_GZ_XDR/all/checkpoint-1622432000000'
21/05/31 11:33:20 WARN UserGroupInformation: PriviledgedActionException as:zkx (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for zkx: HDFS_DELEGATION_TOKEN owner=zkx, renewer=yarn, realUser=oozie/service2.nzj.gdyd@NZJ.GDYD, issueDate=1619681808569, maxDate=1622273808569, sequenceNumber=370742481, masterKeyId=803) can't be found in cache
21/05/31 11:33:20 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for zkx: HDFS_DELEGATION_TOKEN owner=zkx, renewer=yarn, realUser=oozie/service2.nzj.gdyd@NZJ.GDYD, issueDate=1619681808569, maxDate=1622273808569, sequenceNumber=370742481, masterKeyId=803) can't be found in cache
21/05/31 11:33:20 WARN UserGroupInformation: PriviledgedActionException as:zkx (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for zkx: HDFS_DELEGATION_TOKEN owner=zkx, renewer=yarn, realUser=oozie/service2.nzj.gdyd@NZJ.GDYD, issueDate=1619681808569, maxDate=1622273808569, sequenceNumber=370742481, masterKeyId=803) can't be found in cache
21/05/31 11:33:20 WARN CheckpointWriter: Error in attempt 1 of writing checkpoint to 'hdfs://nzj-cluster-gdyd/zkx_deploy/xdrhub_jiake/checkpoint/cyw/https/CY_HTTPS_JIAKE_GZ_XDR/all/checkpoint-1622432000000'
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for zkx: HDFS_DELEGATION_TOKEN owner=zkx, renewer=yarn, realUser=oozie/service2.nzj.gdyd@NZJ.GDYD, issueDate=1619681808569, maxDate=1622273808569, sequenceNumber=370742481, masterKeyId=803) can't be found in cache
at org.apache.hadoop.ipc.Client.call(Client.java:1504)
at org.apache.hadoop.ipc.Client.call(Client.java:1441)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy10.delete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:535)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:260)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy11.delete(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2062)
at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:684)
at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:680)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:680)
at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/05/31 11:33:20 INFO CheckpointWriter: Saving checkpoint for time 1622432000000 ms to file 'hdfs://nzj-cluster-gdyd/zkx_deploy/xdrhub_jiake/checkpoint/cyw/https/CY_HTTPS_JIAKE_GZ_XDR/all/checkpoint-1622432000000'
Exception in thread "pool-20-thread-1582" java.lang.NullPointerException
at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
可以看到,Driver 并未挂掉,而是持续抛出异常,重点关注 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for zkx: HDFS_DELEGATION_TOKEN owner=zkx, renewer=yarn, realUser=oozie/service2.nzj.gdyd@NZJ.GDYD, issueDate=1619681808569, maxDate=1622273808569, sequenceNumber=370742481, masterKeyId=803) can't be found
这句,其中 maxDate=1622273808569
对应时间点为 2021-05-29 15:36:48
,正好在数据中断后的第一个 batch 里,接着后面又抛出了一个空异常:
Exception in thread "pool-20-thread-1582" java.lang.NullPointerException
at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)
...
从空异常的相关日志也可以看到,该空异常是从在一个线程池里执行的,再结合 pool-20-thread-1582
的线程 ID 可以推导出,在每次抛出空异常后,Spark 并没有对该异常做任何处理,而是简单的让线程挂掉并新建一个线程,所以才能看到这么大的线程 ID。
根据空异常的提示,查看 Checkpoint.scala:233
处的 Spark 源码(版本 2.1.0.cloudera1):
def run() {
if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) {
latestCheckpointTime = checkpointTime
}
if (fs == null) {
fs = new Path(checkpointDir).getFileSystem(hadoopConf)
}
var attempts = 0
val startTime = System.currentTimeMillis()
val tempFile = new Path(checkpointDir, "temp")
// We will do checkpoint when generating a batch and completing a batch. When the processing
// time of a batch is greater than the batch interval, checkpointing for completing an old
// batch may run after checkpointing of a new batch. If this happens, checkpoint of an old
// batch actually has the latest information, so we want to recovery from it. Therefore, we
// also use the latest checkpoint time as the file name, so that we can recover from the
// latest checkpoint file.
//
// Note: there is only one thread writing the checkpoint files, so we don't need to worry
// about thread-safety.
val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime)
val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, latestCheckpointTime)
while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
// Write checkpoint to temp file
fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
Utils.tryWithSafeFinally {
fos.write(bytes)
} {
fos.close()
}
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
if (fs.exists(checkpointFile)) {
fs.delete(backupFile, true) // just in case it exists
if (!fs.rename(checkpointFile, backupFile)) {
logWarning(s"Could not rename $checkpointFile to $backupFile")
}
}
// Rename temp file to the final checkpoint file
if (!fs.rename(tempFile, checkpointFile)) {
logWarning(s"Could not rename $tempFile to $checkpointFile")
}
// Delete old checkpoint files
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
if (allCheckpointFiles.size > 10) {
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { file =>
logInfo(s"Deleting $file")
fs.delete(file, true)
}
}
// All done, print success
val finishTime = System.currentTimeMillis()
logInfo(s"Checkpoint for time $checkpointTime saved to file '$checkpointFile'" +
s", took ${bytes.length} bytes and ${finishTime - startTime} ms")
jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)
return
} catch {
case ioe: IOException =>
val msg = s"Error in attempt $attempts of writing checkpoint to '$checkpointFile'"
logWarning(msg, ioe)
fs = null
}
}
logWarning(s"Could not write checkpoint for time $checkpointTime to file '$checkpointFile'")
}
}
可以看到,run 方法中主要是一个有限次数的循环,结合日志 Error in attempt 1 of writing checkpoint to...
可以看到第一次循环捕获异常后会把 fs 设置为 null,进入第二轮循环,而空异常正好是在 fs.delete(tempFile, true) // just in case it exists
处抛出的,而 run 方法中没有对空异常进行捕获,导致线程直接挂掉。同时 Spark 也无法感知到任何异常的发生。
查看原生 Spark 2.1 相同处的源码,发现 fs 有对 null 进行了处理:
logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
if (fs == null) {
fs = new Path(checkpointDir).getFileSystem(hadoopConf)
}
也就是说,起码从 Spark 2.1 开始,不存在该问题。
3. 出现问题的原因
从日志来看,当 Kerberos 过期,而 Spark Driver 端又正好在写 checkpoint 的时候,就会出现以上异常。
Kerberos 的有效间隔为 30 天整,一旦我们启动 Spark 作业的时间点正好撞上 checkpoint 时间点,就会重复出现这个问题,但概率较小,具有随机性。
由于近期为了提高资源利用率缩短了 batch 间隔,所以以上问题发生的概率增加了,虽然概率仍然很小,但随着处理间隔越短,发生该问题的概率越高。
可以考虑通过升级 Spark 版本的方式来彻底解决该问题。