微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

解决:IDEA中import kafka.serializer.StringDecoder导入报红的问题

解决:IDEA中import kafka.serializer.StringDecoder导入报红的问题

kafka的版本是2.1.1,spark-streaming-kafka-0-8_2.11

在SparkStreaming整合Kafka时,采用direct方法。在手动导入kafka.serializer.StringDecoder时,一直报红。

解决方法一:手动在代码头部添加

import _root_.kafka.serializer.StringDecoder

解决方法二:暂时删除之前创建的java->spark.kafka文件夹,因为会有冲突,删除后未报错或者将spark.kafka改个名字也可以。。。

代码如下:

package spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import _root_.kafka.serializer.StringDecoder

/**
  * Spark Streaming对接Kafka的方式
  */
object KafkaDirectWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 2) {
      System.err.println("Usage: KafkaDirectWordCount <brokers> <topics>")
      System.exit(1)
    }

    val Array(brokers, topics) = args

    val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount")
      .setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String,String]("Metadata.broker.list"-> brokers)

    // Todo... Spark Streaming如何对接Kafka
    val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
    ssc,kafkaParams,topicsSet
    )

    // Todo... 自己去测试为什么要取第二个
    messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐