如何解决Akk / Alpakka:将压缩数据写入HDFS时出错
我有一个简单的akka流媒体应用程序(使用alpakka hdfs连接器),可将数据写入HDFS。 持久存储纯数据(json是主要格式)时,一切都很好,但是当我切换为以压缩格式(Gzip)写入数据时,我会遇到下一个错误。
Exception in thread "main" java.lang.IllegalArgumentException: requirement Failed: Compressor cannot be null
at scala.Predef$.require(Predef.scala:340)
at akka.stream.alpakka.hdfs.impl.writer.CompressedDataWriter.<init>(CompressedDataWriter.scala:32)
at akka.stream.alpakka.hdfs.impl.writer.CompressedDataWriter$.apply(CompressedDataWriter.scala:73)
at akka.stream.alpakka.hdfs.scaladsl.HdfsFlow$.compressedWithPassthrough(HdfsFlow.scala:112)
at akka.stream.alpakka.hdfs.scaladsl.HdfsFlow$.compressed(HdfsFlow.scala:82)
at HdfsPersistance$.main(HdfsPersistance.scala:59)
at HdfsPersistance.main(HdfsPersistance.scala)
此特定错误仅在尝试使用Gzip时出现,在尝试使用DefaultCodec时没有此错误。 这是到目前为止我正在使用的代码片段:
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.alpakka.hdfs._
import akka.stream.alpakka.hdfs.scaladsl.HdfsFlow
import akka.stream.scaladsl.{Sink,Source}
import akka.util.ByteString
import org.apache.hadoop.io.compress.GzipCodec
object StackOverflow {
def main(args: Array[String]): Unit = {
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
implicit val system = ActorSystem("QuickStart")
val conf = new Configuration()
conf.set("fs.defaultFS","hdfs://192.168.0.8:9000/")
val pathGenerator = FilePathGenerator( (rotationCount: Long,timestamp: Long) => s"/data/$rotationCount-$timestamp")
val settings =
HdfsWritingSettings()
.withOverwrite(true)
.withNewLine(true)
.withLineseparator(System.getProperty("line.separator"))
.withPathGenerator(pathGenerator)
val fs: FileSystem = FileSystem.get(conf)
val elements = (0 to 100000).map(s => s"$s\n")
val source = Source(elements)
val codec = new GzipCodec()
codec.setConf(fs.getConf)
val result = source
.map { json =>
HdfsWriteMessage(ByteString(json))
}
.via(
HdfsFlow.compressed(
fs,SyncStrategy.count(100),RotationStrategy.count(500),codec,settings
)
)
.runWith(Sink.ignore)
implicit val ec = system.dispatcher
result.onComplete {
case _ => system.terminate()
}
}
}
所以,我的问题是,我应该尝试配置客户端应用程序吗?解决此错误?还是特别需要在HDFS端启用此功能?我确实尝试在HDFS上启用压缩,但是看起来没有成功。 还是我想我可能会缺少某种依赖性? FY-> val AkkaVersion =“ 2.6.8”
谢谢
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。