微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark+Kafka构建实时分析

*Spark+Kafka构建实时分析*

*Dashboard 项目*

*一:实验环境准备*

*预备知识*

Linux系统命令使用、了解如何安装Python库、安装kafka。

*训练技能*

熟悉Linux基本操作、Pycharm的安装、Spark安装,Kafka安装,PyCharm安装。

*任务清单*

\1. Spark安装(略)

\2. Kafka安装

\3. Python安装(略)

\4. Python依赖库

\5. PyCharm安装(略)

一、系统和软件的安装

一、项目环境搭建。

(一)、spark搭建

我之前已搭建完成,在终端打开如下

pyspark

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2Ui4UJVc-1620658414735)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps1.jpg)]

(二)数据转移到Ubuntu

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LuKTadav-1620658414736)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps3.jpg)]

(三)、kafka环境搭建

把下载的kafka安装包解压到自己的目标文件夹下面,然后在如下操作:

`****切换到kafka的目录下****

****bin/zookeeper-server-start.sh /home/thc/spark/kafka_2.11-2.4.0/config/zookeeper.properties****`

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PI6w29jX-1620658414739)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps4.jpg)]

*****启动kafka*

重新打卡一个新的终端,切换到kafka的目录下

***\*bin/kafka-server-start.sh config/server.properties\****

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Cr6G1kRT-1620658414740)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps5.jpg)]

*以单节点的配置创建了一个叫dblab的topic.可以用list列出所有创建的topics,来查看刚才创建的主题是否存在。********也是重新打开终端。*

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9AtQJA5g-1620658414742)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps6.jpg)]

在结果中查看到dblab这个topic存在

bin/kafka-topics.sh --list --zookeeper localhost:2181

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-k6ObQGkY-1620658414742)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps7.jpg)]

用producer生产点数据

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RMQGcwWf-1620658414743)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps8.jpg)]

使用consumer来接收数据,重新打开新终端接受数据

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dblab --from-beginning

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bDedMI09-1620658414744)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps9.jpg)]

到这里,数据可以完全接收,表示卡夫卡已经搭建成功了。

二、数据处理和Python操作Kafka

1、先安装Python操作Kafka的代码库,之前的pycharm已经搭建好了,所以就直接用了,可以在项目的终端使用pip安装。

pip install kafka-python

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5bh9VHMT-1620658414745)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps10.jpg)]

2、接着可以写如下Python代码文件名为producer.py

import csv

import time

from kafka import KafkaProducer

\# 实例化一个KafkaProducer示例,用于向Kafka投递消息

producer = KafkaProducer(bootstrap_servers='localhost:9092')

\# 打开数据文件

csvfile = open("./user_log.csv","r",encoding="utf-8")

\# 生成一个可用于读取csv文件的reader

reader = csv.reader(csvfile)

for line in reader:

gender = line[9] # 性别在每行日志代码的第9个元素

if gender == 'gender':

continue # 去除第一行表头

time.sleep(0.1) # 每隔0.1秒发送一行数据

\# 发送数据,topic为'sex'

producer.send('sex',line[9].encode('utf8'))

上述代码很简单,首先是先实例化一个Kafka生产者。然后读取用户日志文件,每次读取一行,接着每隔0.1秒发送给Kafka,这样1秒发送10条购物日志。这里发送给Kafka的topic为’sex’

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LtMnq2RE-1620658414745)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps11.jpg)]

3、Python操作Kafka

我们可以写一个KafkaConsumer测试数据是否投递成功,代码如下,文件名为consumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer(‘sex’)

for msg in consumer:

print((msg.value).decode(‘utf8’))

在开启上述KafkaProducer和KafkaConsumer之前,需要先开启Kafka,

\1. cd /home/thc/spark/kafka_2.11-2.4.0

\2. bin/zookeeper-server-start.sh config/zookeeper.properties &

\3. bin/kafka-server-start.sh config/server.properties

在Kafka开启之后,即可开启KafkaProducer和KafkaConsumer。开启方法如下:
可以在pycharm终端当中直接运行程序,也可以直接在pycharm当中运行。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-P9cH1PQn-1620658414746)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps12.jpg)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-16MPQfAa-1620658414747)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps13.jpg)]

运行上面这条命令以后,这时,你会看到屏幕上会输出一行又一行的数字,类似下面的样子:

2

1

1

1

2

0

2

三:Spark Streaming实时处理数据

1、把spark-streaming-kafka-0-8_2.11-2.1.0.jar这个jar包复制到spark的jar包下面

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-635tinaL-1620658414747)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps14.jpg)]

2、然后在spark/jars目录下新建kafka目录,把 /kafka/libs下所有函数库复制到/spark/jars/kafka目录下

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-idlBrjJJ-1620658414748)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps15.jpg)]

3、然后,修改 Spark 配置文件,把 Kafka 相关 jar 包的路径信息增加到 spark-env.sh命令如下

\1. cd spark/conf

\2. vim spark-env.sh

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8Le9iVKE-1620658414749)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps16.jpg)]

执行上述步骤之后,Spark开发Kafka环境即已配置好,下面介绍如何编码实现。

4、建立pySpark项目

新建项目目录:

然后在目录下创建一个kafka_test.py文件:代码如下

`from* kafka *import* KafkaProducer****from*** pyspark.streaming *import* StreamingContext****from**** pyspark.streaming.kafka *import* KafkaUtils****from**** pyspark *import* SparkConf, SparkContext****import**** json****import**** sys`

`***\*def\**** KafkaWordCount(zkQuorum, group, topics, numThreads):` `spark_conf = SparkConf().setAppName(***\*"KafkaWordCount"\****)` `sc = SparkContext(conf=spark_conf)` `sc.setLogLevel(***\*"ERROR"\****)` `ssc = StreamingContext(sc, 1)` `ssc.checkpoint(***\*"."\****)` `**#** **这里表示把检查点文件写入分布式文件系统****HDFS****,所以要启动****HadooP**** ** **# ssc.checkpoint(".")**** ** topicAry = topics.split(***\*","\****) **#** **将****topic****转换为****hashmaP****形式,而****python****中字典就是一种****hashmaP**** ** topicMap = {} ***\*for\**** topic ***\*in\**** topicAry:` `topicMap[topic] = numThreads` `lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(***\*lambda\**** x : x[1])` `words = lines.flatMap(***\*lambda\**** x : x.split(***\*" "\****))` `wordcount = words.map(***\*lambda\**** x : (x, 1)).reduceByKeyAndWindow((***\*lambda\**** x,y : x+y), (***\*lambda\**** x,y : x-y), 1, 1, 1)` `wordcount.foreachRDD(***\*lambda\**** x : sendmsg(x))` `ssc.start()` `ssc.awaitTermination()`
`**#** **格式转化,将****[["1", 3], ["0", 4], ["2", 3]]****变为****[{'1': 3}, {'0': 4}, {'2': 3}]****,这样就不用修改第四个教程的代码了**** *****\*def\**** Get_dic(rdd_list):` `res = []` `***\*for\**** elm ***\*in\**** rdd_list:` `tmp = {elm[0]: elm[1]}` `res.append(tmp)` `***\*return\**** json.dumps(res)`
``
***\*def\**** sendmsg(rdd):
***\*if\**** rdd.count != 0:
msg = Get_dic(rdd.collect())
`# 实例化一个KafkaProducer示例,用于向Kafka投递消息**
** producer = KafkaProducer(bootstrap_servers=****‘localhost:9092’****)
producer.send(****“result”****, msg.encode(****‘utf8’**))
# 很重要,不然不会更新

** producer.flush()

*if* name == *‘main’*:# 输入的四个参数分别代表着** # 1.zkQuorumzookeeper地址
** # 2.group为消费者所在的组
** # 3.topics该消费者所消费的topics

** # 4.numThreads开启消费topic线程的个数
** *if* (len(sys.argv) < 5):print(
*"Usage: KafkaWordCount "*
**)exit(1)zkQuorum = sys.argv[1]group = sys.argv[2]topics = sys.argv[3]numThreads = int(sys.argv[4])print(group, topics)KafkaWordCount(zkQuorum, group, topics, numThreads)`

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YhfQyiRJ-1620658414750)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps17.jpg)]

如果代码当中出现这样子的报错,那么我们可以在/home/thc/spark/spark-2.3.3-bin-hadoop2.7/python/pyspark/streaming

找到kafka.py 复制到python里面去

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0ijzP5nc-1620658414751)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps18.jpg)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QFdoPFZK-1620658414752)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps19.jpg)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qy7H9Cnq-1620658414752)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps20.jpg)]

上述代码注释已经也很清楚了,下面在简要说明下:
\1. 首先按每秒的频率读取Kafka消息;
\2. 然后对每秒的数据执行wordcount算法,统计出0的个数,1的个数,2的个数;
\3. 最后将上述结果封装成json发送给Kafka。

另外,需要注意,上面代码中有一行如下代码

ssc.checkpoint(".")

这行代码表示把检查点文件写入分布式文件系统HDFS,所以一定要事先启动Hadoop。如果没有启动Hadoop,则后面运行时会出现“拒绝连接”的错误提示。如果你还没有启动Hadoop,则可以现在在Ubuntu终端中,使用如下Shell命令启动Hadoop:

\1. cd /home/spark/hadoop #这是hadoop的安装目录

\2. ./sbin/start-dfs.sh

另外,如果不想把检查点写入HDFS,而是直接把检查点写入本地磁盘文件(这样就不用启动Hadoop),则可以对ssc.checkpoint()方法中的文件路径进行指定,比如下面这个例子:

ssc.checkpoint(“file:///home/spark/spark/mycode/kafka/checkpoint”)

5、运行项目

编写好程序之后,接下来编写运行脚本,在/home/spark/spark/mycode/kafka目录下新建startup.sh文件,输入如下内容

/home/spark/spark/bin/spark-submit /home/spark/spark/mycode/kafka/kafka_test.py 127.0.0.1:2181 1 sex 1

其中最后四个为输入参数,含义如下
\1. 127.0.0.1:2181为Zookeeper地址
\2. 1 为consumer group标签
\3. sex为消费者接收的topic
\4. 1 为消费者线程数

最后在/home/thc/PycharmProjects/spark_test/Srraming/目录下,运行如下命令即可执行刚编写好的Spark Streaming程序

sh startup.sh

也可以直接把上面的步骤在相应的shell当中去执行

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SwagIsdx-1620658414753)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps21.jpg)]

程序运行成功之后,下面通过步骤二的KafkaProducer和KafkaConsumer来检测程序。

下面开启之前编写的KafkaProducer投递消息,然后将KafkaConsumer中接收的topic改为result,验证是否能接收topic为result的消息,更改之后的KafkaConsumer为

from kafka import KafkaConsumer

consumer = KafkaConsumer(‘result’)

for msg in consumer:

​ print((msg.value).decode(‘utf8’))

在同时开启Spark Streaming项目,KafkaProducer以及KafkaConsumer之后,可以在KafkaConsumer运行窗口看到如下输出
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EidCIejZ-1620658414754)(file:///C:\Users\xyyth\AppData\Local\Temp\ksohtml2128\wps22.jpg)]

到此为止,Spark Streaming程序编写完成,下篇文章将分析如何处理得到的最终结果。需要数据和源码的朋友可以私信我,刚开始写,做的不好的地方请多多指教,向大家学习。

from kafka import KafkaConsumer

consumer = KafkaConsumer(‘result’)

for msg in consumer:

​ print((msg.value).decode(‘utf8’))

在同时开启Spark Streaming项目,KafkaProducer以及KafkaConsumer之后,可以在KafkaConsumer运行窗口看到如下输出
[外链图片转存中…(img-EidCIejZ-1620658414754)]

到此为止,Spark Streaming程序编写完成,下篇文章将分析如何处理得到的最终结果。需要数据和源码的朋友可以私信我,刚开始写,做的不好的地方请多多指教,向大家学习。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐