基于canal同步mysql的数据到es中
> 首发公众号:MarkerHub > > 原创作者:吕一明 > > 视频讲解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/ hello,大家好呀,好久没写过原创了,今天带大家做个实验吧,基于canal同步mysql的数据到es中! 原理啥的,都给我百度去吧,这里直接搞实验! 本文使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中。 实验中间件版本说明: * centos 8 * mysql 5.7.36 * es 7.16.2 * cannal.server: 1.1.5 * canal.adapter: 1.1.5 * postman 0、安装docker ---------- 基本命令: ``` #centos 7 安装 docker yum install docker #centos 8 安装docker yum erase podman buildah yum install -y yum-utils yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo yum install docker-ce docker-ce-cli containerd.io #检验安装是否成功 [root@localhost opt]# docker --version Docker version 20.10.12, build e91ed57 #启动 systemctl start docker #换镜像源 sudo vim /etc/docker/daemon.json 内容如下: { "registry-mirrors": ["https://m9r2r2uj.mirror.aliyuncs.com"] } 保存退出,重启docker #重启 sudo service docker restart #列出镜像 docker images #查看运行进程 docker ps ``` 1、安装mysql --------- ``` docker pull mysql:5.7.36 docker run --name mysql5736 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7.36 docker exec -it mysql5736 /bin/bash apt-get update apt-get install vim cd /etc/mysql/mysql.conf.d vim mysqld.cnf  // 修改mysql配置 ``` 配置: ``` [mysqld] #binlog setting log-bin=mysql-bin  // 开启logbin binlog-format=ROW  // binlog日志格式 server-id=1  // mysql主从备份serverId,canal中不能与此相同 ``` ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/9f2dac33e64dd05e2bbfb64eccafb207.png) 保存退出,重启mysql:service mysql restart 可能会退出docker镜像,注意重启启动docker的mysql。 ``` mysql -uroot -p show master status // binlog日志文件 reset master; // 重启日志 ``` ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/49cd2b4c0c335c775f6fd4c6464a1252.png) 查看是否配置成功: ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/e19b032621adfd0460c8c052bd3967fb.png) 查看日志文件: ``` cd /var/lib/mysql // 进入日志文件目录 mysqlbinlog -vv mysql-bin.000001 // row格式查看日志 ``` ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/704f71748b4558075e3872cfa2851da3.png) 使用数据库工具连接上docker中的mysql,然后创建dailyhub数据库,然后再查看日志(mysqlbinlog -vv mysql-bin.000001)可以看到截图如下: ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/396eebe80f7814f73125a4260f43dc45.png) 到这里,mysql已经安装成功了。 ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/0e5aeacc6ca2bf35bda807dd3b98aac1.png) 2、安装es ------ ``` docker pull elasticsearch:7.16.2 docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --name='es7162' -d elasticsearch:7.16.2 ``` 注意如果拉取不出对应的版本,可以上https://registry.hub.docker.com/\_/elasticsearch?tab=tags&page=1&ordering=last\_updated,查看对应的版本再拉取。我之前是拉取7.15.2的实验的,后来过来几天发现这版本已经拉取不了了,就改成了7.16.2。或者换低一点的版本也可以。 ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/5983cc6c5678b28114dee2a7fc42522b.png) 查看https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-elasticsearch/2.6.2,得到版本依赖关系,在springboot2.6.2版本下,7.15.2和7.16.2都可以用。 ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/e81aaf6e7687d70ee261241b06f83ed7.png) docker启动es: ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/de8708cd802991b9a22931fab52ed17a.png) ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/2ecdd7b32c8fda01dc89fd7422358980.png) 然后我们需要配置一下es的信息: ``` docker exec -ites es7162 /bin/bash cd config vi elasticsearch.yml ``` 配置文件: ``` cluster.name: dailyhub-es network.host: 0.0.0.0 node.name: node-1 http.port: 9200 http.cors.enabled: true http.cors.allow-origin: "*" node.master: true node.data: true ``` docker restart es7162 重启es,注意千万别写错配置的信息,否则启动会失败,启动失败是后可以通过docker logs -f es7162查看原因,但也只能重新来了。然后服务器访问: ``` // 查询es所有mapping http://119.45.25.164:9200/_mapping?pretty=true ``` 注意如果是云服务器的话,要在安全组中配置对应的端口开放、还有防火墙啥的,然后安全些的话,还需要给es配合账号密码啥的。我这里为了实验就简单来了。 #### 安装中文分词器 可以有两种方式安装中文分词器,如果在线安装的时候分词器插件下载不下来那就只能离线安装了。 1、在线安装中文分词器: ``` docker exec -ites es7162 /bin/bash ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.16.2/elasticsearch-analysis-ik-7.16.2.zip ``` ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/4e5066aded9268e5590c271b384e8b9f.png) 2、离线安装中文分词器: 首先打开这个链接:https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.16.2,把分词器插件下载下来, ``` # 把插件复制到容器内 docker cp elasticsearch-analysis-ik-7.16.2.zip es7162:/usr/share/elasticsearch/plugins docker exec -it es7162 /bin/bash cd /usr/share/elasticsearch/plugins/ mkdir ik unzip elasticsearch-analysis-ik-7.16.2.zip -d ik rm -rf elasticsearch-analysis-ik-7.16.2.zip ``` ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/519e3e47401889b832d66e8d26d0955b.png) 重启es,查看日志是否加载ik分词器成功! ``` docker restart es7162 docker logs es7162 或者 docker exec -it es7162 /bin/bash bin/elasticsearch-plugin list ``` ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/3f459adfe20b01f55af6c9711d2f57cc.png) 当你看到日志中有输出analysis-ik,说明已经安装成功。 3、安装canal-server ---------------- 拉取镜像并启动: ``` docker pull canal/canal-server:v1.1.5 docker run --name canal115 -p 11111:11111  --link mysql5736:mysql5736 -id canal/canal-server:v1.1.5 ``` 修改对应的配置: ``` docker exec -it canal115 /bin/bash cd canal-server/conf/example/ vi instance.properties // 修改配置 # 把0改成10,只要不和mysql的id相同就行 canal.instance.mysql.slaveId=10 # 修改成mysql对应的账号密码,mysql5736就是mysql镜像的链接别名 canal.instance.master.address=mysql5736:3306 canal.instance.dbUsername=root canal.instance.dbPassword=admin ``` ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/4e7bf10b735fd3efad6eaf80ef00f5c1.png) 验证配置是否成功: ``` #首先重启一下canal docker restart  canal115 docker exec -it canal115 /bin/bash cd canal-server/logs/example/ tail -100f example.log // 查看日志 ``` 截图如下,说明已经链接上了mysql主机,此时mysql中的数据变化,都会在canal中有同步。 ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/bd1e42bb7665cb2fdd953dcabb169a73.png) 可以通过Java程序测试有没连接上mysql: 导入canal-client包 ``` com.alibaba.otter canal.client 1.1.4 ``` * com.markerhub.SimpleCanalClientExample ``` /** * 公众号:MarkerHub * * 说明:用于测试canal是否已经连接上了mysql */ public class SimpleCanalClientExample { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("119.45.25.164", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } } ``` 当mysql的数据更新时候效果如下: ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/b3d0fc1ac5bdcf7bcf4b343b6b6332e5.png) 注意当后面canal-adapter也连接上canal-server后,程序就监听不到数据变化了。 4、安装canal-adapter ----------------- 由于目前canal-adapter没有官方docker镜像,所以拉去一个非官方的 ``` docker pull slpcat/canal-adapter:v1.1.5 docker run --name adapter115 -p 8081:8081 --link mysql5736:mysql5736 --link canal115:canal115 --link es7162:es7162 -d slpcat/canal-adapter:v1.1.5 ``` ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/560453a5cceb716d5c713175da6ec0bb.png) 修改配置: ``` docker exec -it adapter115 /bin/bash cd conf/ vi application.yml ``` 配置修改如下,一些不需要的配置或者注释掉的配置可以删除掉: ``` server:   port: 8081 spring:   jackson:     date-format: yyyy-MM-dd HH:mm:ss     time-zone: GMT+8     default-property-inclusion: non_null canal.conf:   mode: tcp #tcp kafka rocketMQ rabbitMQ   flatMessage: true   zookeeperHosts:   syncBatchSize: 1000   retries: 0   timeout:   accessKey:   secretKey:   consumerProperties:     # canal tcp consumer     canal.tcp.server.host: canal115:11111     canal.tcp.zookeeper.hosts:     canal.tcp.batch.size: 500     canal.tcp.username:     canal.tcp.password:   srcDataSources:     defaultDS:       url: jdbc:mysql://mysql5736:3306/dailyhub?useUnicode=true       username: root       password: admin   canalAdapters:   - instance: example # canal instance Name or mq topic name     groups:     - groupId: g1       outerAdapters:       - name: logger       - name: es7         hosts: es7162:9200 # 127.0.0.1:9200 for rest mode         properties:           mode: rest           # security.auth: test:123456 #  only used for rest mode           cluster.name: dailyhub-es ``` ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/fef185f933b686f2ac5d171d96cb8838.png) 接下来是修改表映射索引文件: ``` docker exec -it adapter115 /bin/bash cd conf/es7 cp -v mytest_user.yml dailyhub_collect.yml # 删除其他多余的 rm -rf biz_order.yml customer.yml mytest_user.yml vi dailyhub_collect.yml ``` 配置文件: ``` dataSourceKey: defaultDS destination: example groupId: g1 esMapping:   _index: dailyhub_collect   _id: _id   _type: _doc   upsert: true #  pk: id   sql: " SELECT         c.id AS _id,         c.user_id AS userId,         c.title AS title,         c.url AS url,         c.note AS note,         c.collected AS collected,         c.created AS created,         c.personal AS personal,         u.username AS username,         u.avatar AS userAvatar FROM         m_collect c LEFT JOIN m_user u ON c.user_id = u.id " #  objFields: #    _labels: array:; #   etlCondition: "where c.c_time>={}"   commitBatch: 3000 ``` 注意对于时间类型,在后端一定要使用LocalDateTime或者LocalDate类型,如果是Date类型,需要自己手动设置格式。 5、联合测试 ------ 然后就可以直接测试了,准备测试条件: * 在数据库中生成表和字段, * 然后elasticsearch中生成索引。先新建数据库dailyhub。然后数据表结构: ``` SET FOREIGN_KEY_CHECKS=0; -- ---------------------------- -- Table structure for m_collect -- ---------------------------- DROP TABLE IF EXISTS `m_collect`; CREATE TABLE `m_collect` (   `id` bigint(20) NOT NULL AUTO_INCREMENT,   `collected` date DEFAULT NULL,   `created` datetime(6) DEFAULT NULL,   `note` varchar(255) DEFAULT NULL,   `personal` int(11) DEFAULT NULL,   `title` varchar(255) DEFAULT NULL,   `url` varchar(255) DEFAULT NULL,   `user_id` bigint(20) DEFAULT NULL,   PRIMARY KEY (`id`),   KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),   CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4; -- ---------------------------- -- Records of m_collect -- ---------------------------- -- ---------------------------- -- Table structure for m_user -- ---------------------------- DROP TABLE IF EXISTS `m_user`; CREATE TABLE `m_user` (   `id` bigint(20) NOT NULL AUTO_INCREMENT,   `avatar` varchar(255) DEFAULT NULL,   `created` datetime(6) DEFAULT NULL,   `lasted` datetime(6) DEFAULT NULL,   `open_id` varchar(255) DEFAULT NULL,   `statu` int(11) DEFAULT NULL,   `username` varchar(255) DEFAULT NULL,   PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4; -- ---------------------------- -- Records of m_user -- ---------------------------- INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05 16:08:40.042000', '2022-01-06 13:07:45.153000', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', '公众号:MarkerHub'); ``` 接下来借postman来新建elasticsearch的索引: ``` # 创建索引并添加映射字段 PUT http://119.45.25.164:9200/dailyhub_collect {     "mappings": {         "properties": {             "collected": {                 "type": "date",                 "format": "date_optional_time||epoch_millis"             },             "created": {                 "type": "date",                 "format": "date_optional_time||epoch_millis"             },             "note": {                 "type": "text",                 "analyzer": "ik_max_word",                 "search_analyzer": "ik_smart"             },             "personal": {                 "type": "integer"             },             "title": {                 "type": "text",                 "analyzer": "ik_max_word",                 "search_analyzer": "ik_smart"             },             "url": {                 "type": "text"             },             "userAvatar": {                 "type": "text"             },             "userId": {                 "type": "long"             },             "username": {                 "type": "keyword"             }         }     } } ``` ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/1e86ac9039352412960fa1f91b81b8bc.png) 其他常用操作: ``` # 删除索引 PUT http://119.45.25.164:9200/dailyhub_collect # 查看素有索引映射 GET http://119.45.25.164:9200/_mapping?pretty=true # 搜索文档 GET http://119.45.25.164:9200/dailyhub_collect/_search # 删除ID为1的文档 DELETE http://119.45.25.164:9200/dailyhub_collect/_doc/1 ``` 然后我们打开canal-adapter的输入日志: ``` docker logs --tail 100  -f adapter115 ``` 然后我们在mysql的m\_collect中新添加一条记录,可以看到日志输出如下: ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/19bba865fb6e92ba1576f652a4111e14.png) 然后搜索全部文档,发现es中有数据啦。 ![](https://img-hello-world.oss-cn-beijing.aliyuncs.com/94332ff257ad048c574685ecbf574f2d.png) 如果看到adaptar115一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysql、es、canal、adapar ``` 2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK! ``` 到这里,实验成功,over,关注公众号:MarkerHub,带你做更多Java实验!视频讲解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/ ## 6. 途中可能出现的问题 ### 6.1 解决docker容器中编辑文件操作问题 > 问题描述: >> 在docker中搭建elasticsearch时因为跨域问题需要在服务端对CORS进行配置,使用vi命令打开elasticsearch.yml文件后进入编辑模式,出现方向键变为ABCD、退格键无反应等问题。 > >解决过程: >>查资料猜测是docker容器中linux环境下的vim模块有问题,欲卸载vim重装解决。 ```使用命令apt-get remove vim-common卸载掉原来的vim, 使用命令apt-get install vim重新安装, 这时出现报错E: Package 'vim' has no installation candidate, 使用命令apt update更新apt后解决, 重装vim后打开文档进入编辑模式发现可以正常编辑文件,问题解决。 ``` 本文转自 [https://mp.weixin.qq.com/s/vXLcC41CrWFlJUicyyqFLg](https://mp.weixin.qq.com/s/vXLcC41CrWFlJUicyyqFLg),如有侵权,请联系删除。
作者头像
admin
分享技术与生活
打赏作者

评论

暂无评论,快来抢沙发吧~