1、测试代码
package kafka.comsumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.PropUtil
/**
* @author yanghongbo
* @date 2019/7/25 10:03
* @description: Checkpoint可以同时维护多个topic的offset,并确保可以读取一次,但是代码改动则需要清空checkpoints
*/
object CheckpointOffset {
//获取参数
val prop = new PropUtil("config.properties")
val oracleUrl = prop.getProp("ORACLE_URL")
val oracleUser = prop.getProp("ORACLE_USER")
val oraclePassword = prop.getProp("ORACLE_PASSWORD")
val brokers = prop.getProp("KAFKA_broKERS")
// val checkpointDir = prop.getProp("checkpointDir")
val checkpointDir = "./CheckpointOffset"
val groupName: String = this.getClass.getName
def functionToCreateContext(): StreamingContext = {
//获取SparkSession连接,没有则创建
val spark = SparkSession.builder().appName(groupName).master("local[3]").getorCreate()
// val spark = SparkSession.builder().appName("SparkToOracleStatus").getorCreate()
val sc = spark.sparkContext
//设置日志级别
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(3))
ssc.checkpoint(checkpointDir)
//读取的topic
val topics = Array("testTopic2", "DC_HISTORY_STATUS_T")
//配置kafka参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupName,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
var kafkaStream: InputDStream[ConsumerRecord[String, String]] = null
kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
kafkaStream.foreachRDD(kafkaRDD => {
//todo 可注释
val offsetRanges: Array[OffsetRange] = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
println(o)
}
//获取message信息
val value: RDD[String] = kafkaRDD.map(x => {
x.value()
})
//逻辑处理
value.foreachPartition(rdds => {
rdds.foreach(x => {
println(x)
})
})
})
ssc
}
def main(args: Array[String]): Unit = {
// 创建context
val context = StreamingContext.getorCreate(checkpointDir, functionToCreateContext _)
// 启动流计算
context.start()
context.awaitTermination()
}
}
2、utils
package utils
import java.io.InputStream
import java.util.Properties
/**
* 读取配置文件信息
* @param file
*/
class PropUtil(val file: String) {
var prop = new Properties()
def getProp(key: String): String = {
val ipStream: InputStream = this.getClass.getResourceAsstream("/config.properties")
prop.load(ipStream)
prop.getProperty(key)
}
}
3、pom
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>3.0.0</hadoop.version>
<hbase.version>2.0.0</hbase.version>
<ojdbc7>12.1.0.2</ojdbc7>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.6.6</version>
</dependency>
<!-- 导入scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 导入spark的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- 导入spark sql的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark如果想整合Hive,必须加入hive的支持 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark steaming的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- sparkSteaming跟Kafka整合的依赖 -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>com.github.noraui</groupId>
<artifactId>ojdbc7</artifactId>
<version>${ojdbc7}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>meta-inf/*.SF</exclude>
<exclude>meta-inf/*.DSA</exclude>
<exclude>meta-inf/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。