微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

JMSConnection序列化失败

如何解决JMSConnection序列化失败

当前,我正在构建一个应用程序,该应用程序在Kafka Topic中读取消息(json中的事务)并在生产时发送给IBM MQ。我在JMS类中的序列化遇到了一些麻烦,并且在如何解决它方面有点迷失了。 我的代码是:

object dispatcherMqApp extends Serializable {

  private val logger = LoggerFactory.getLogger(this.getClass)
  val config = ConfigFactory.load()

  def inicialize(transactionType: String) = {

    val spark = new SparkConf()
      .setAppName("dispatcher MQ Categorization")
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .set("spark.streaming.stopGracefullyOnShutDown","true")
    logger.debug(s"Loading configuration at ${printConfig(config).head} =>\n${printConfig(config)(1)}")

    val kafkaConfig = KafkaConfig.buildFromConfiguration(config,"dispatcher-mq")
    val streamCtx = new StreamingContext(spark,Seconds(kafkaConfig.streamingInterval))

    sys.ShutdownHookThread {
      logger.warn("Stopping the application ...")
      streamCtx.stop(stopSparkContext = true,stopGracefully = true)
      logger.warn("Application Finish with Success !!!")
    }

    val topic = config.getString(s"conf.dispatcher-mq.consumer-topic.$transactionType")
    logger.info(s"Topic: $topic")
    val zkdir = s"${kafkaConfig.zookeeperBaseDir}$transactionType-$topic"
    val kafkaManager = new KafkaManager(kafkaConfig)
    val stream = kafkaManager.createStreaming(streamCtx,kafkaConfig.offset,topic,zkdir)
    val kafkaSink = streamCtx.sparkContext.broadcast(kafkaManager.createProducer())
    val mqConfig = MQConfig(config.getString("conf.mq.name"),config.getString("conf.mq.host"),config.getInt("conf.mq.port"),config.getString("conf.mq.channel"),config.getString("conf.mq.queue-manager"),config.getInt("conf.mq.retries"),config.getString("conf.mq.app-name"),Try(config.getString("conf.mq.user")).toOption,Try(config.getString("conf.mq.password")).toOption,config.getString("conf.dispatcher-mq.send.category_file"))

    val queueConn = new MQService(mqConfig)

    (stream,queueConn,streamCtx,kafkaSink,zkdir)
  }

  def main(args: Array[String]): Unit = {
    val transactionType = args.head
    if (transactionType=="account" | transactionType=="credit") {
      val (messages,sc,zkdir) = inicialize(transactionType)
      val fieldsType = config.getString(s"conf.dispatcher-mq.send.fields.$transactionType")
      val source = config.getString("conf.dispatcher-mq.parameters.source")
      val mqVersion = config.getString(s"conf.dispatcher-mq.parameters.version.$transactionType")
      val topicError = config.getString("conf.kafka.topic_error")

      messages.foreachRDD(rdd => {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.map(_._2).filter(_.toupperCase.contains("BYCATEGORIZER"))
          .foreach(message => {
            val msg:Option[TextMessage] = try {
              Some(queueConn.createOutputMq(message,fieldsType,source,mqVersion))
            } catch {
              case ex: Exception =>
                logger.error(s"[ERROR] input: [[$message]]\n$ex")
                val errorReport = ErrorReport("GENERAL","disPATCHER-MQ",transactionType.toString,ex.getMessage,None,Option(ex.toString))
                ErrorReportService.sendError(errorReport,topicError,kafkaSink.value)
                None
            }
            if(msg.nonEmpty) queueConn.submit(msg.get)
          })
        logger.info(s"Save Offset in  $zkdir...\n${offsetRanges.toList.to}")
        ZookeeperConn.saveOffsets(zkdir,offsetRanges)
      })


      sc.start()
      sc.awaitTermination()

    } else
      logger.error(s"${args.head} is not a valid argument. ( account or credit ) !!! ")
  }

我在对JMSConnection进行序列化时出错,该方法createOutputMq方法中被称为隐藏。错误是:

20/09/04 17:21:00 ERROR JobScheduler: Error running job streaming job 1599250860000 ms.0
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:917)
    at br.com.HIDDEN.dispatcher.dispatcherMqApp$$anonfun$main$1.apply(dispatcherMqApp.scala:80)
    at br.com.HIDDEN.dispatcher.dispatcherMqApp$$anonfun$main$1.apply(dispatcherMqApp.scala:76)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.ibm.msg.client.jms.JmsConnection
Serialization stack:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more
20/09/04 17:21:00 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:917)
    at br.com.HIDDEN.dispatcher.dispatcherMqApp$$anonfun$main$1.apply(dispatcherMqApp.scala:80)
    at br.com.HIDDEN.dispatcher.dispatcherMqApp$$anonfun$main$1.apply(dispatcherMqApp.scala:76)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.ibm.msg.client.jms.JmsConnection
Serialization stack:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more

有人知道如何解决它吗?错误消息(76和80​​)中显示的行分别是我的messages.foreachRDD(rdd => {.foreach(message => {。 预先感谢

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。