微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

mac环境canal+mysql+kafka的安装及使用

为了实现 实时同步数据,在mac环境搭建了canal,mysql,kafka的一套流程

使用canal加MysqL加kafka的方式传递数据

MysqL 数据源头

canal模仿slave冲MysqL取数据。。是一个管道

kafka 将canal获取的数据放入kafka 。然后消费(程序获取kafka的队列。消费数据)

  1. MysqL安装

    这个就不详述了。。不要安装最新版的MysqL。本人亲测8.0版本和canal不一定能很好的兼容

    所以。。安装了MysqL5.7

    安装命令(这样安装 安装的是8.0版本)

    brew install MysqL

   2.java安装

  不管是canal还是kafka都是需要java环境的。。

   也不能用最新版的java,推荐使用java8也就是jdk8

   最好是从官网oracle下载

安装方法

https://blog.csdn.net/oumuv/article/details/84064169

3.canal安装

在装calal之前 确保MysqL 及 java8 环境安装好了

下载地址: https://github.com/alibaba/canal/releases

下载 canal.deployer-1.1.4.tar.gz

在家目录下面新建一个canal目录.直接解压就行

官方文件已经编译好了,不需再编译

需要添加账户(canal@%及canal@local)

在canal/bin目录下有几个脚本文件,startup.sh 启动服务用的,stop.sh 停止服务用的

在canal/logs目录下放的是日志文件

在canal/conf目录下放的是配置文件

实例配置文件

canal/conf/example/instance.properties

实例的配置文件。。决定了 你连接哪个实例的数据库 可以精确到表

instance.properties中比较重要的参数

#实例

canal.instance.master.address = 127.0.0.1:3306

#db

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

canal.instance.defaultDatabaseName =test

canal.instance.connectionCharset = UTF-8

#table

# table regex

canal.instance.filter.regex = test.ttt

#mq topic

canal.mq.topic=test12345

上面这样配置 就代表了。。通过canal连接127.0.0.1中的test库的ttt表,放到一个叫做test12345的topic里面

验证canal是否和MysqL连接 只需要在MysqL的进程里面查看是否有一个复制连接(因为canal就是模仿了一个slave)

mac环境canal+mysql+kafka的安装及使用

还有一个全局的文件

表示了需要连接的kafka zookeeper等

canal/conf/canal.properties

列举canal连接kafka的重要参数

canal.id= 1#如果有多个canal 这个值和集群中的canal不能冲突

canal.ip=172.17.61.113#canal的ip

canal.port= 11111

canal.zkServers=172.17.61.113:2181#zookeeper的ip:port

canal.serverMode = kafka

canal.destinations = example

#mq

canal.mq.servers = 172.17.61.113:9092#kafka的ip:port

上面有些关于kafka的参数 是 需要kafka安装好之后才能陪得(因为已经装好了 所以先列举出来)

4.kafka安装

kafka安装时需要zookeeper的。。

但是一般zookeeper会集成在kafka里面 所以不需要特别单独安装(也可以单独安装)

mac环境安装命令如下

brew install kafka

一直等安装完成就行

运行kafka是需要依赖于zookeeper的,所以安装kafka的时候也会包含zookeeper。

kafka的安装目录:/usr/local/Cellar/kafka/2.0.0/bin

kafka的配置文件目录:/usr/local/Cellar/kafka/2.0.0/bin

kafka服务的配置文件:/usr/local/etc/kafka/server.properties

zookeeper配置文件: /usr/local/etc/kafka/zookeeper.properties 

# server.properties中的重要配置 需要修改的参数
  1. listeners=PLAINTEXT://172.17.61.113:9092

  2. advertised.listeners=PLAINTEXT://172.17.61.113:9092

zookeeper 配置文件我在安装的时候 没有做任何修改

然后 通过上面配置canal.properties 就连接上kafka

kafka的基本命令(可以通过find / -name zookeeper-server-start 查找具体的位置)

首先,启动zookeeper:

zookeeper-server-start /usr/ local /etc/kafka/zookeeper.properties

然后,启动kafka

kafka- server -start /usr/local/etc/kafka/ server .properties

创建一个“使用单个分区”、“只有一个副本”、名为“test”的主题的topic


kafka-topics 
--create 
--zookeeper 
localhost
:2181 
--replication-factor 1 
--partitions 1 
--topic 
test
(创建topic,这个topic。我们在canal中已经有了配置)
使用如下命令创建topic
kafka-topics --create --zookeeper localhost:2181/kafka --replication-factor 1
--partitions 1 --topic    topic1 查看创建的topic,运行list topic命令:
kafka-topics --list --zookeeper 172.17.61.113:2181 生产消息
kafka-console-producer --broker-list 172.17.61.113:9092 --topic test
(消息数据由canal自动发送。所以这里的命令我们只有明白就行)

消费消息(这个地方就可以最后来查看MysqL+canal+kafka是否已经联动起来了)
kafka-console-consumer --bootstrap-server 172.17.61.113:9092 --topic test12345 --from-beginning

我在MysqL里面的test库的ttt表里加了一条记录
然后反应在kafka就是如下结果

{"data":[{"id":"13","var":"ded"}],"database":"test","es":1575448571000,"id":8,"isDdl":false,"MysqLType":{"id":"int(11)","var":"varchar(5)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"var":12},"table":"ttt","ts":1575448571758,"type":"INSERT"}

这就表示整个MysqL+canal+kafka已经成功了

接下来只要等着程序那边去消费这个队列信息就好了

##############

有可能的报错

服务端:com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example
是由于你改了配置文件,导致Meta.dat 中保存的位点信息和数据库的位点信息不一致;导致canal抓取不到数据库的动作;
解决方法删除Meta.dat删除,再重启canal,问题解决

详见

https://www.cnblogs.com/shaozhiqi/p/11534658.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐