# 技术杂谈

使用maxwell实时同步mysql数据到消息队列(rabbitMQ)

2023-02-06 16:25:50
17

maxwell简介

maxwell是一个由Java编写的守护进程,可以实时读取mysql binlog并将行更新以JSON格式写入Kafka,Kinesis,RabbitMQ,Google Cloud Pub / Sub或Redis(Pub / Sub或LPUSH)。(以上内容摘自maxwell官网)。可以想象,有了mysql增量数据流,使用场景就很多了,比如:实时同步数据到缓存,同步数据到ElasticSearch,数据迁移等等。与canal(ali)相比,更加轻量

maxwell还提供以下功能:

  • 使用SELECT * FROM table 的方式做全量数据初始化
  • 支持主库发生failover后,自动恢复binlog位置
  • 对数据进行分区,解决数据倾斜的问题
  • 伪装成mysql从库,接收binlog

maxwell官网:http://maxwells-daemon.io/

maxwell源码:https://github.com/zendesk/maxwell

maxwell使用

mysql配置

需要mysql开启binlog,而binlog默认是关闭的,需要开启,并且为了保证_同步数据的一致性_,使用的日志格式为_row-based replication(RBR)_,新建或修改my.conf开启binlog。

$ vim /etc/my.cnf

添加内容

[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #随机指定一个不能和其他集群中机器重名的字符串,如果只有一台机器,那就可以随便指定了

重启mysql, 查询是否已开启bin

show variables like '%log_bin%'

配置jdk环境(略)

maxwell 依赖java sdk,所以需要先配置JDK环境。

下载maxwell:https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz

解压后

修改config.properties.example为config.properties

到目前为止,前期准备已完成

实战项目:配置maxwell发送消息到rabbitMQ, 监听MQ队列,处理消息

mysql db如下:

修改maxwell的config.properties

# tl;dr config 生产环境配置为info级别
log_level=DEBUG

producer=rabbitmq


# mysql login info, mysql用户必须拥有读取binlog权限和新建库表的权限
host=127.0.0.1
user=xiehd
password=xiehd2018

output_nulls=true
# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello
jdbc_options=autoReconnet=true

#需要同步的数据库,表,及不包含的字段
#filter=exclude: *.*, include: foo.*, include: bar.baz
filter=exclude: *.*,include: xhd.*

#replica_server_id 和 client_id 唯一标示,用于集群部署
replica_server_id=64
client_id=maxwell_dev

metrics_type=http
metrics_slf4j_interval=60
http_port=8111
http_diagnostic=true # default false


#rabbitmq
rabbitmq_host=192.168.50.184
rabbitmq_port=5672
rabbitmq_user=maxwell
rabbitmq_pass=maxwell@2018
rabbitmq_virtual_host=/
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=topic
rabbitmq_exchange_durable=false
rabbitmq_exchange_autodelete=false
rabbitmq_routing_key_template=%db%.%table%
rabbitmq_message_persistent=false
rabbitmq_declare_exchange=true

启动maxwell

./bin/maxwell

启动成功

此时会自动生成maxwell库,该库记录了maxwell同步的状态,最后一次同步的id等等信息,在主库失败或同步异常后,只要maxwell库存在,下次同步会根据最后一次同步的id。如果没有生成maxwell库或报错,可能config.properties中配置的mysql用户权限不够

此时修改mysql中的SYS_ROLE表数据,maxwell控制台会打印相应的json格式的日志

rabbitMQ控制台中会自动创建名称为maxwell的exchange(该exchange为config.properties中配置的)

新建Queue并绑定exchange,并设置routingkey为%db%.%table%(该routingkey为config.properties中配置)

修改表SYS_USER中数据,此时修改的数据会被发送到rabbitMQ

 一般情况下,一张表对应一个queue, 也可以多张表共用一个queue, 根据实际吞吐量灵活使用。

MQ消费端

新建一个MaxwellData

public class MaxwellData implements Serializable {
    private String type;
    private String database;
    private String table;
    private Map<String, Object> data;
    private Map<String, Object> old;

    public MaxwellData() {
    }

    public String getType() {
        return this.type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getDatabase() {
        return this.database;
    }

    public void setDatabase(String database) {
        this.database = database;
    }

    public String getTable() {
        return this.table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public Map<String, Object> getData() {
        return this.data;
    }

    public void setData(Map<String, Object> data) {
        this.data = data;
    }

    public Map<String, Object> getOld() {
        return this.old;
    }

    public void setOld(Map<String, Object> old) {
        this.old = old;
    }

解析maxwell数据

    @RabbitHandler
    @RabbitListener(queues = "SYS_USER")
    public void process(byte[] data) {
        String s = new String(data);
        MaxwellData maxwellData = decodeMsg(s);
        logger.info("maxwellData:" + maxwellData);
        //to do
    }
    private MaxwellData decodeMsg(String msg) {
        if (msg == null) {
            return null;
        }
        return JSON.parseObject(msg, MaxwellData.class);
    }

当然也可以同步到Redis, kafka, rocketMQ(rocketmq需要自行实现producer)等

全量同步

使用maxwell-bootstrap命令

./bin/maxwell-bootstrap --database xhd --table xhd-sso --host 127.0.0.1 --user xiehd --password xiehd2018 --client_id maxwell_dev

同步xhd.xhd-sso表的所有数据,并指定client_id示maxwell_dev的maxwell执行同步

上一个命令先开着,然后再启动client_id=maxwell_dev的maxwell

./bin/maxwell --client_id maxwell_dev

等待执行完成即可

本文转自 https://blog.csdn.net/xiehd313/article/details/81289150 ,如有侵权,请联系删除。

最后编辑于 2024-10-31 14:05:47