# java

基于canal同步mysql的数据到es中

2022-06-14 16:49:48
19

首发公众号:MarkerHub

原创作者:吕一明

视频讲解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/

hello,大家好呀,好久没写过原创了,今天带大家做个实验吧,基于canal同步mysql的数据到es中!

原理啥的,都给我百度去吧,这里直接搞实验!

本文使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中。

实验中间件版本说明:

  • centos 8

  • mysql 5.7.36

  • es 7.16.2

  • cannal.server: 1.1.5

  • canal.adapter: 1.1.5

  • postman

0、安装docker

基本命令:

#centos 7 安装 docker  
yum install docker  
  
#centos 8 安装docker  
yum erase podman buildah   
yum install -y yum-utils  
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo  
yum install docker-ce docker-ce-cli containerd.io  
  
#检验安装是否成功  
[root@localhost opt]# docker --version  
Docker version 20.10.12, build e91ed57  
  
#启动  
systemctl start docker  
  
#换镜像源  
sudo vim /etc/docker/daemon.json  
内容如下:  
{  
 "registry-mirrors": ["https://m9r2r2uj.mirror.aliyuncs.com"]  
}  
保存退出,重启docker  
  
#重启  
sudo service docker restart  
  
#列出镜像  
docker images  
  
#查看运行进程  
docker ps  

1、安装mysql

docker pull mysql:5.7.36  
docker run --name mysql5736 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7.36  
  
docker exec -it mysql5736 /bin/bash  
apt-get update  
apt-get install vim  
cd /etc/mysql/mysql.conf.d  
vim mysqld.cnf  // 修改mysql配置  

配置:

[mysqld]  
#binlog setting  
log-bin=mysql-bin  // 开启logbin  
binlog-format=ROW  // binlog日志格式  
server-id=1  // mysql主从备份serverId,canal中不能与此相同  

保存退出,重启mysql:service mysql restart

可能会退出docker镜像,注意重启启动docker的mysql。

mysql -uroot -p  
show master status  // binlog日志文件  
reset master; // 重启日志  

查看是否配置成功:

查看日志文件:

cd /var/lib/mysql  // 进入日志文件目录  
mysqlbinlog -vv mysql-bin.000001  // row格式查看日志  

使用数据库工具连接上docker中的mysql,然后创建dailyhub数据库,然后再查看日志(mysqlbinlog -vv mysql-bin.000001)可以看到截图如下:

到这里,mysql已经安装成功了。

2、安装es

docker pull elasticsearch:7.16.2  
docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --name='es7162' -d elasticsearch:7.16.2  

注意如果拉取不出对应的版本,可以上https://registry.hub.docker.com/_/elasticsearch?tab=tags&page=1&ordering=last_updated,查看对应的版本再拉取。我之前是拉取7.15.2的实验的,后来过来几天发现这版本已经拉取不了了,就改成了7.16.2。或者换低一点的版本也可以。 

查看https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-elasticsearch/2.6.2,得到版本依赖关系,在springboot2.6.2版本下,7.15.2和7.16.2都可以用。

docker启动es:

然后我们需要配置一下es的信息:

docker exec -ites es7162 /bin/bash  
cd config  
vi elasticsearch.yml  

配置文件:

cluster.name: dailyhub-es  
network.host: 0.0.0.0  
  
node.name: node-1  
http.port: 9200  
http.cors.enabled: true  
http.cors.allow-origin: "*"  
node.master: true  
node.data: true  

docker restart es7162 重启es,注意千万别写错配置的信息,否则启动会失败,启动失败是后可以通过docker logs -f es7162查看原因,但也只能重新来了。然后服务器访问:

// 查询es所有mapping  
http://119.45.25.164:9200/_mapping?pretty=true  

注意如果是云服务器的话,要在安全组中配置对应的端口开放、还有防火墙啥的,然后安全些的话,还需要给es配合账号密码啥的。我这里为了实验就简单来了。

安装中文分词器

可以有两种方式安装中文分词器,如果在线安装的时候分词器插件下载不下来那就只能离线安装了。

1、在线安装中文分词器:

docker exec -ites es7162 /bin/bash  
  
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.16.2/elasticsearch-analysis-ik-7.16.2.zip  

2、离线安装中文分词器:

首先打开这个链接:https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.16.2,把分词器插件下载下来,

# 把插件复制到容器内  
docker cp elasticsearch-analysis-ik-7.16.2.zip es7162:/usr/share/elasticsearch/plugins  
  
docker exec -it es7162 /bin/bash  
cd /usr/share/elasticsearch/plugins/  
mkdir ik  
unzip elasticsearch-analysis-ik-7.16.2.zip -d ik  
rm -rf elasticsearch-analysis-ik-7.16.2.zip  

重启es,查看日志是否加载ik分词器成功!

docker restart es7162  
docker logs es7162  
或者 
docker exec -it es7162 /bin/bash  
bin/elasticsearch-plugin list

当你看到日志中有输出analysis-ik,说明已经安装成功。

3、安装canal-server

拉取镜像并启动:

docker pull canal/canal-server:v1.1.5  
  
docker run --name canal115 -p 11111:11111  --link mysql5736:mysql5736 -id canal/canal-server:v1.1.5  

修改对应的配置:

docker exec -it canal115 /bin/bash  
cd canal-server/conf/example/  
vi instance.properties  // 修改配置  
  
# 把0改成10,只要不和mysql的id相同就行  
canal.instance.mysql.slaveId=10  
# 修改成mysql对应的账号密码,mysql5736就是mysql镜像的链接别名  
canal.instance.master.address=mysql5736:3306  
canal.instance.dbUsername=root  
canal.instance.dbPassword=admin  

验证配置是否成功:

#首先重启一下canal  
docker restart  canal115  
  
docker exec -it canal115 /bin/bash  
cd canal-server/logs/example/  
tail -100f example.log  // 查看日志  

截图如下,说明已经链接上了mysql主机,此时mysql中的数据变化,都会在canal中有同步。 

可以通过Java程序测试有没连接上mysql:

导入canal-client包

<!-- 为了测试canal-server是否连接mysql成功,1.1.5版本少包,所以用1.1.4版本 -->  
<dependency>  
   <groupId>com.alibaba.otter</groupId>  
   <artifactId>canal.client</artifactId>  
   <version>1.1.4</version>  
</dependency>  

  • com.markerhub.SimpleCanalClientExample
/**  
 * 公众号:MarkerHub  
 *  
 * 说明:用于测试canal是否已经连接上了mysql  
 */  
public class SimpleCanalClientExample {  
    public static void main(String args[]) {  
        // 创建链接  
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("119.45.25.164",  
                11111), "example", "", "");  
        int batchSize = 1000;  
        int emptyCount = 0;  
        try {  
            connector.connect();  
            connector.subscribe(".*\\..*");  
            connector.rollback();  
            int totalEmptyCount = 120;  
            while (emptyCount < totalEmptyCount) {  
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据  
                long batchId = message.getId();  
                int size = message.getEntries().size();  
                if (batchId == -1 || size == 0) {  
                    emptyCount++;  
                    System.out.println("empty count : " + emptyCount);  
                    try {  
                        Thread.sleep(1000);  
                    } catch (InterruptedException e) {  
                    }  
                } else {  
                    emptyCount = 0;  
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);  
                    printEntry(message.getEntries());  
                }  
                connector.ack(batchId); // 提交确认  
                // connector.rollback(batchId); // 处理失败, 回滚数据  
            }  
            System.out.println("empty too many times, exit");  
        } finally {  
            connector.disconnect();  
        }  
    }  
    private static void printEntry(List<Entry> entrys) {  
        for (Entry entry : entrys) {  
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
                continue;  
            }  
            RowChange rowChage = null;  
            try {  
                rowChage = RowChange.parseFrom(entry.getStoreValue());  
            } catch (Exception e) {  
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
                        e);  
            }  
            EventType eventType = rowChage.getEventType();  
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                    eventType));  
            for (RowData rowData : rowChage.getRowDatasList()) {  
                if (eventType == EventType.DELETE) {  
                    printColumn(rowData.getBeforeColumnsList());  
                } else if (eventType == EventType.INSERT) {  
                    printColumn(rowData.getAfterColumnsList());  
                } else {  
                    System.out.println("-------&gt; before");  
                    printColumn(rowData.getBeforeColumnsList());  
                    System.out.println("-------&gt; after");  
                    printColumn(rowData.getAfterColumnsList());  
                }  
            }  
        }  
    }  
    private static void printColumn(List<Column> columns) {  
        for (Column column : columns) {  
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  
        }  
    }  
}  

当mysql的数据更新时候效果如下: 

注意当后面canal-adapter也连接上canal-server后,程序就监听不到数据变化了。

4、安装canal-adapter

由于目前canal-adapter没有官方docker镜像,所以拉去一个非官方的

docker pull slpcat/canal-adapter:v1.1.5  
  
docker run --name adapter115 -p 8081:8081 --link mysql5736:mysql5736 --link canal115:canal115 --link es7162:es7162 -d slpcat/canal-adapter:v1.1.5  

修改配置:

docker exec -it adapter115 /bin/bash  
cd conf/  
vi application.yml  

配置修改如下,一些不需要的配置或者注释掉的配置可以删除掉:

server:  
  port: 8081  
spring:  
  jackson:  
    date-format: yyyy-MM-dd HH:mm:ss  
    time-zone: GMT+8  
    default-property-inclusion: non_null  
  
canal.conf:  
  mode: tcp #tcp kafka rocketMQ rabbitMQ  
  flatMessage: true  
  zookeeperHosts:  
  syncBatchSize: 1000  
  retries: 0  
  timeout:  
  accessKey:  
  secretKey:  
  consumerProperties:  
    # canal tcp consumer  
    canal.tcp.server.host: canal115:11111  
    canal.tcp.zookeeper.hosts:  
    canal.tcp.batch.size: 500  
    canal.tcp.username:  
    canal.tcp.password:  
  srcDataSources:  
    defaultDS:  
      url: jdbc:mysql://mysql5736:3306/dailyhub?useUnicode=true  
      username: root  
      password: admin  
  canalAdapters:  
  - instance: example # canal instance Name or mq topic name  
    groups:  
    - groupId: g1  
      outerAdapters:  
      - name: logger  
      - name: es7  
        hosts: es7162:9200 # 127.0.0.1:9200 for rest mode  
        properties:  
          mode: rest  
          # security.auth: test:123456 #  only used for rest mode  
          cluster.name: dailyhub-es  

接下来是修改表映射索引文件:

docker exec -it adapter115 /bin/bash  
cd conf/es7  
  
cp -v mytest_user.yml dailyhub_collect.yml  
# 删除其他多余的  
rm -rf biz_order.yml customer.yml mytest_user.yml  
vi dailyhub_collect.yml  

配置文件:

dataSourceKey: defaultDS  
destination: example  
groupId: g1  
esMapping:  
  _index: dailyhub_collect  
  _id: _id  
  _type: _doc  
  upsert: true  
#  pk: id  
  sql: "  
SELECT  
        c.id AS _id,  
        c.user_id AS userId,  
        c.title AS title,  
        c.url AS url,  
        c.note AS note,  
        c.collected AS collected,  
        c.created AS created,  
        c.personal AS personal,  
        u.username AS username,  
        u.avatar AS userAvatar  
FROM  
        m_collect c  
LEFT JOIN m_user u ON c.user_id = u.id  
  
"  
#  objFields:  
#    _labels: array:;  
#   etlCondition: "where c.c_time>={}"  
  commitBatch: 3000  

注意对于时间类型,在后端一定要使用LocalDateTime或者LocalDate类型,如果是Date类型,需要自己手动设置格式。

5、联合测试

然后就可以直接测试了,准备测试条件:

  • 在数据库中生成表和字段,

  • 然后elasticsearch中生成索引。先新建数据库dailyhub。然后数据表结构:

SET FOREIGN_KEY_CHECKS=0;  
  
-- ----------------------------  
-- Table structure for m_collect  
-- ----------------------------  
DROP TABLE IF EXISTS `m_collect`;  
CREATE TABLE `m_collect` (  
  `id` bigint(20) NOT NULL AUTO_INCREMENT,  
  `collected` date DEFAULT NULL,  
  `created` datetime(6) DEFAULT NULL,  
  `note` varchar(255) DEFAULT NULL,  
  `personal` int(11) DEFAULT NULL,  
  `title` varchar(255) DEFAULT NULL,  
  `url` varchar(255) DEFAULT NULL,  
  `user_id` bigint(20) DEFAULT NULL,  
  PRIMARY KEY (`id`),  
  KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),  
  CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)  
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;  
  
-- ----------------------------  
-- Records of m_collect  
-- ----------------------------  
  
-- ----------------------------  
-- Table structure for m_user  
-- ----------------------------  
DROP TABLE IF EXISTS `m_user`;  
CREATE TABLE `m_user` (  
  `id` bigint(20) NOT NULL AUTO_INCREMENT,  
  `avatar` varchar(255) DEFAULT NULL,  
  `created` datetime(6) DEFAULT NULL,  
  `lasted` datetime(6) DEFAULT NULL,  
  `open_id` varchar(255) DEFAULT NULL,  
  `statu` int(11) DEFAULT NULL,  
  `username` varchar(255) DEFAULT NULL,  
  PRIMARY KEY (`id`)  
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;  
  
-- ----------------------------  
-- Records of m_user  
-- ----------------------------  
INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05 16:08:40.042000', '2022-01-06 13:07:45.153000', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', '公众号:MarkerHub');  
  

接下来借postman来新建elasticsearch的索引:

# 创建索引并添加映射字段  
PUT http://119.45.25.164:9200/dailyhub_collect  
  
{  
    "mappings": {  
        "properties": {  
            "collected": {  
                "type": "date",  
                "format": "date_optional_time||epoch_millis"  
            },  
            "created": {  
                "type": "date",  
                "format": "date_optional_time||epoch_millis"  
            },  
            "note": {  
                "type": "text",  
                "analyzer": "ik_max_word",  
                "search_analyzer": "ik_smart"  
            },  
            "personal": {  
                "type": "integer"  
            },  
            "title": {  
                "type": "text",  
                "analyzer": "ik_max_word",  
                "search_analyzer": "ik_smart"  
            },  
            "url": {  
                "type": "text"  
            },  
            "userAvatar": {  
                "type": "text"  
            },  
            "userId": {  
                "type": "long"  
            },  
            "username": {  
                "type": "keyword"  
            }  
        }  
    }  
}  

其他常用操作:

# 删除索引  
PUT  http://119.45.25.164:9200/dailyhub_collect  
  
# 查看素有索引映射  
GET  http://119.45.25.164:9200/_mapping?pretty=true  
  
# 搜索文档  
GET http://119.45.25.164:9200/dailyhub_collect/_search  
  
# 删除ID为1的文档  
DELETE  http://119.45.25.164:9200/dailyhub_collect/_doc/1  

然后我们打开canal-adapter的输入日志:

docker logs --tail 100  -f adapter115  

然后我们在mysql的m_collect中新添加一条记录,可以看到日志输出如下: 

然后搜索全部文档,发现es中有数据啦。

如果看到adaptar115一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysql、es、canal、adapar

2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!  
  

到这里,实验成功,over,关注公众号:MarkerHub,带你做更多Java实验!视频讲解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/

6. 途中可能出现的问题

6.1 解决docker容器中编辑文件操作问题

问题描述:

在docker中搭建elasticsearch时因为跨域问题需要在服务端对CORS进行配置,使用vi命令打开elasticsearch.yml文件后进入编辑模式,出现方向键变为ABCD、退格键无反应等问题。

解决过程:

查资料猜测是docker容器中linux环境下的vim模块有问题,欲卸载vim重装解决。


使用命令apt-get install vim重新安装,

这时出现报错E: Package 'vim' has no installation candidate,

使用命令apt update更新apt后解决,

重装vim后打开文档进入编辑模式发现可以正常编辑文件,问题解决。

本文转自 https://mp.weixin.qq.com/s/vXLcC41CrWFlJUicyyqFLg ,如有侵权,请联系删除。

最后编辑于 2024-10-31 14:05:41