如何解决使用Azure Blob存储进行火花检查点
我正在尝试在我的Spark结构化流应用程序中使用Azure存储作为检查点位置。 我见过的文章很少涉及读取/写入天蓝色存储,但是我还没有见过任何人解释将天蓝色存储用作检查点位置。以下是我的简单代码,从一个kafka主题读取并写回另一个主题,并添加了检查点位置。
SparkConf conf = new SparkConf().setMaster("local[*]");
conf.set(
"fs.azure.account.key.<storage-name>.blob.core.windows.net","<storage-key>");
conf.set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem");
SparkSession spark = SparkSession.builder().appName("app-name").config(conf).getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe","input")
.load();
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic","output")
.option("checkpointLocation","wasbs://<container-name>@<storage-account-name>.blob.core.windows.net/<directory-name>")
.start();
ds.awaitTermination();
Azure连接详细信息正确。当我运行该应用程序时,我可以看到在指定的Azure存储位置创建了一个文件(元数据)。但是,应用程序在几秒钟后崩溃。以下是例外。
Exception in thread "main" java.lang.IllegalArgumentException: Self-suppression not permitted
at java.lang.Throwable.addSuppressed(Throwable.java:1043)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:818)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:339)
at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:298)
at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:85)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.apply(StreamExecution.scala:124)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.apply(StreamExecution.scala:122)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:122)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:49)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:258)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
at com.test.Test.main(Test.java:73)
Caused by: java.io.IOException: Stream is already closed.
at com.microsoft.azure.storage.blob.BlobOutputStreamInternal.close(BlobOutputStreamInternal.java:332)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:818)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
at com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.close(WriterBasedJsonGenerator.java:883)
at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3561)
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2909)
at org.json4s.jackson.Serialization$.write(Serialization.scala:27)
at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78)
... 9 more
让我知道是否需要进行任何配置以启用Azure存储作为检查点位置,或者任何版本冲突都会导致此问题。
Spark: 2.3.0
hadoop-azure : 2.7
azure-storage : 8.0
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。