如何解决JMSConnection序列化失败
当前,我正在构建一个应用程序,该应用程序在Kafka Topic中读取消息(json中的事务)并在生产时发送给IBM MQ。我在JMS类中的序列化遇到了一些麻烦,并且在如何解决它方面有点迷失了。 我的代码是:
object dispatcherMqApp extends Serializable {
private val logger = LoggerFactory.getLogger(this.getClass)
val config = ConfigFactory.load()
def inicialize(transactionType: String) = {
val spark = new SparkConf()
.setAppName("dispatcher MQ Categorization")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.stopGracefullyOnShutDown","true")
logger.debug(s"Loading configuration at ${printConfig(config).head} =>\n${printConfig(config)(1)}")
val kafkaConfig = KafkaConfig.buildFromConfiguration(config,"dispatcher-mq")
val streamCtx = new StreamingContext(spark,Seconds(kafkaConfig.streamingInterval))
sys.ShutdownHookThread {
logger.warn("Stopping the application ...")
streamCtx.stop(stopSparkContext = true,stopGracefully = true)
logger.warn("Application Finish with Success !!!")
}
val topic = config.getString(s"conf.dispatcher-mq.consumer-topic.$transactionType")
logger.info(s"Topic: $topic")
val zkdir = s"${kafkaConfig.zookeeperBaseDir}$transactionType-$topic"
val kafkaManager = new KafkaManager(kafkaConfig)
val stream = kafkaManager.createStreaming(streamCtx,kafkaConfig.offset,topic,zkdir)
val kafkaSink = streamCtx.sparkContext.broadcast(kafkaManager.createProducer())
val mqConfig = MQConfig(config.getString("conf.mq.name"),config.getString("conf.mq.host"),config.getInt("conf.mq.port"),config.getString("conf.mq.channel"),config.getString("conf.mq.queue-manager"),config.getInt("conf.mq.retries"),config.getString("conf.mq.app-name"),Try(config.getString("conf.mq.user")).toOption,Try(config.getString("conf.mq.password")).toOption,config.getString("conf.dispatcher-mq.send.category_file"))
val queueConn = new MQService(mqConfig)
(stream,queueConn,streamCtx,kafkaSink,zkdir)
}
def main(args: Array[String]): Unit = {
val transactionType = args.head
if (transactionType=="account" | transactionType=="credit") {
val (messages,sc,zkdir) = inicialize(transactionType)
val fieldsType = config.getString(s"conf.dispatcher-mq.send.fields.$transactionType")
val source = config.getString("conf.dispatcher-mq.parameters.source")
val mqVersion = config.getString(s"conf.dispatcher-mq.parameters.version.$transactionType")
val topicError = config.getString("conf.kafka.topic_error")
messages.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map(_._2).filter(_.toupperCase.contains("BYCATEGORIZER"))
.foreach(message => {
val msg:Option[TextMessage] = try {
Some(queueConn.createOutputMq(message,fieldsType,source,mqVersion))
} catch {
case ex: Exception =>
logger.error(s"[ERROR] input: [[$message]]\n$ex")
val errorReport = ErrorReport("GENERAL","disPATCHER-MQ",transactionType.toString,ex.getMessage,None,Option(ex.toString))
ErrorReportService.sendError(errorReport,topicError,kafkaSink.value)
None
}
if(msg.nonEmpty) queueConn.submit(msg.get)
})
logger.info(s"Save Offset in $zkdir...\n${offsetRanges.toList.to}")
ZookeeperConn.saveOffsets(zkdir,offsetRanges)
})
sc.start()
sc.awaitTermination()
} else
logger.error(s"${args.head} is not a valid argument. ( account or credit ) !!! ")
}
我在对JMSConnection进行序列化时出错,该方法在createOutputMq
方法中被称为隐藏。错误是:
20/09/04 17:21:00 ERROR JobScheduler: Error running job streaming job 1599250860000 ms.0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:917)
at br.com.HIDDEN.dispatcher.dispatcherMqApp$$anonfun$main$1.apply(dispatcherMqApp.scala:80)
at br.com.HIDDEN.dispatcher.dispatcherMqApp$$anonfun$main$1.apply(dispatcherMqApp.scala:76)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.ibm.msg.client.jms.JmsConnection
Serialization stack:
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 30 more
20/09/04 17:21:00 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:917)
at br.com.HIDDEN.dispatcher.dispatcherMqApp$$anonfun$main$1.apply(dispatcherMqApp.scala:80)
at br.com.HIDDEN.dispatcher.dispatcherMqApp$$anonfun$main$1.apply(dispatcherMqApp.scala:76)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.ibm.msg.client.jms.JmsConnection
Serialization stack:
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 30 more
有人知道如何解决它吗?错误消息(76和80)中显示的行分别是我的messages.foreachRDD(rdd => {
和.foreach(message => {
。
预先感谢
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。