微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

大数据项目之电商数仓、Maxwell使用、 Maxwell启停脚本、增量数据同步、历史数据全量同步、采集通道Maxwell配置、通道测试

7. 业务数据采集模块

7.2 采集工具

7.2.4 Maxwell使用

7.2.4.1 启动Kafka集群

  若Maxwell发送数据的目的地为Kafka集群,则需要先确保Kafka集群为启动状态。

7.2.4.2 Maxwell启停

7.2.4.2.1 启动Maxwell
@H_404_112@[summer@hadoop102 maxwell-1.29.2]$ bin/maxwell --config config.properties --daemon

在这里插入图片描述

7.2.4.2.2 停止Maxwell
@H_404_112@[summer@hadoop102 ~]$ ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9
7.2.4.2.3 Maxwell启停脚本
7.2.4.2.3.1 创建并编辑Maxwell启停脚本
@H_404_112@[summer@hadoop102 bin]$ vim mxw.sh

在这里插入图片描述

7.2.4.2.3.2 脚本内容如下
@H_404_112@#!/bin/bash MAXWELL_HOME=/opt/module/maxwell-1.29.2 status_maxwell(){ result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l` return $result } start_maxwell(){ status_maxwell if [[ $? -lt 1 ]]; then echo "启动Maxwell" $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon else echo "Maxwell正在运行" fi } stop_maxwell(){ status_maxwell if [[ $? -gt 0 ]]; then echo "停止Maxwell" ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9 else echo "Maxwell未在运行" fi } case $1 in start ) start_maxwell ;; stop ) stop_maxwell ;; restart ) stop_maxwell start_maxwell ;; esac

在这里插入图片描述

在这里插入图片描述

7.2.4.3 增量数据同步

7.2.4.3.1 启动Kafka消费者
@H_404_112@[summer@hadoop103 kafka-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell
7.2.4.3.2 模拟生成数据
@H_404_112@[summer@hadoop102 bin]$ java -jar gmall2020-mock-db-2021-11-14.jar
7.2.4.3.3 观察Kafka消费者

在这里插入图片描述

7.2.4.4 历史数据全量同步

  上一节,我们已经实现了使用Maxwell实时增量同步MysqL变更数据的功能。但有时只有增量数据是不够的,我们可能需要使用到MysqL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。

7.2.4.4.1 Maxwell-bootstrap
@H_404_112@[summer@hadoop102 maxwell-1.29.2]$ /opt/module/maxwell-1.29.2/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell-1.29.2/config.properties

在这里插入图片描述

7.2.4.4.2 boostrap数据格式

采用bootstrap方式同步的输出数据格式如下:

@H_404_112@{ "database": "gmall", "table": "user_info", "type": "bootstrap-start", "ts": 1667014630, "data": {} } { "database": "gmall", "type": "bootstrap-insert", "data": { "id":194, "login_name":"nwuckp5", "nick_name":"环雪", "passwd":null, "name":"鲍环雪", "phone_num":"13878128474", "email":"nwuckp5@126.com", "head_img":null, "user_level":"2", "birthday":"2002-12-14", "gender":"F", "create_time":"2020-06-14 10:31:20", "operate_time":null, "status":null } } { "database": "gmall", "data": { "id":195, "login_name":"eu3kk9va08", "nick_name":"文辉", "passwd":null, "name":"齐文辉", "phone_num":"13771612693", "email":"eu3kk9va08@sina.com", "head_img":null, "user_level":"1", "birthday":"1984-06-14", "gender":null, "create_time":"2020-06-14 10:31:20", "operate_time":null, "status":null } } { "database": "gmall", "type": "bootstrap-complete", "data": {} }

在这里插入图片描述

注意事项:
1)第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。
2)一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间。

7.3 采集通道Maxwell配置

7.3.1 修改Maxwell配置文件config.properties

@H_404_112@[summer@hadoop102 maxwell-1.29.2]$ vim config.properties

在这里插入图片描述

7.3.2 配置参数如下

在这里插入图片描述

@H_404_112@log_level=info producer=kafka kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092 #kafka topic配置 kafka_topic=topic_db # MysqL login info host=hadoop102 user=maxwell password=maxwell jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

7.3.3 重新启动Maxwell

@H_404_112@[summer@hadoop102 maxwell-1.29.2]$ mxw.sh restart

在这里插入图片描述

7.3.4 通道测试

7.3.4.1 启动Zookeeper以及Kafka集群

在这里插入图片描述

7.3.4.2 启动一个Kafka Console Consumer,消费topic_db数据

@H_404_112@[summer@hadoop103 kafka-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db

7.3.4.3 生成模拟数据

@H_404_112@[summer@hadoop102 bin]$ cd /opt/module/db_log/ [summer@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar

7.3.4.4 观察Kafka消费者是否能消费到数据

在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐