微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

SparkStreaming整合SparkSql

SparkStreaming整合Sparksql的程序中spark的重要对象创建的顺序可能会导致程序报错。
可按照 sparkConf、SparkContext、StreamingContext、SparkSession的顺序。

    //Todo 1、创建ssc对象
    val conf = new SparkConf().setAppName("BoxLogStreamingDeal").setMaster("yarn")
      .set("spark.defalut.parallelism", "500")
      //每秒钟每个分区kafka拉取消息的速率
      .set("spark.streaming.kafka.maxRatePerPartition", "500")
      // 序列化
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      // 建议开启rdd的压缩
      .set("spark.io.compression.codec", "snappy")
//      .set("spark.driver.allowMultipleContexts","true")

    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc, Seconds(60))
    //    ssc.checkpoint("/jar/sparkstreaming/BoxLog/checkpoint")

    val spark = SparkSession.builder()
      .config(conf)
      .config("hive.metastore.uris","thrift://××××:7004")
      .enableHiveSupport()
      .getorCreate()
 //Todo 2、准备kafka的参数
    val (brokerList, BoxLogTopic, groupId) = (ConfigrautionUtils.brokerList,ConfigrautionUtils.BoxLogTopic,ConfigrautionUtils.groupId)

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokerList,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> "false",
      "group.id" -> groupId)

    //Todo 3、消费kafka数据,获取数据流
    val kafkaStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](List(BoxLogTopic), kafkaParams)
    )

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.yzf</groupId>
    <artifactId>BoxLog</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <spark.version>2.4.3</spark.version>
        <scala.version>2.11</scala.version>
        <MysqL.version>5.1.48</MysqL.version>
        <redis.version>3.0.0</redis.version>
        <kudu.version>1.10.0</kudu.version>
        <zk.version>0.10</zk.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-pool2</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.16</version>
            <scope>provided</scope>
        </dependency>

        <!-- 导入spark sql的依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <!--添加hive依赖-->

        <!--spark-streaming-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

        <!--spark-streaming-kafka-plugin-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>


        <!-- 导入spark kudu的依赖 -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>${kudu.version}</version>
            <scope>provided</scope>
        </dependency>

        <!--redis 依赖-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>${redis.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>MysqL</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${MysqL.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-pool2</artifactId>
                </exclusion>
            </exclusions>
            <version>1.3.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.3.0</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <!-- maven 打包插件 打原始jar包 第三方依赖打入jar包中-->
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <!--这里要替换成jar包main方法所在类 -->
                            <mainClass>com.yzf.streaming.Boxlog.BoxLogStreamingDeal</mainClass>
                        </manifest>
                        <manifestEntries>
                            <Class-Path>.</Class-Path>
                        </manifestEntries>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐