maxwell简介 maxwell是一个由Java编写的守护进程,可以实时读取mysql binlog并将行更新以JSON格式写入Kafka,Kinesis,RabbitMQ,Google Cloud Pub / Sub或Redis(Pub / Sub或LPUSH)。(以上内容摘自maxwell官网)。可以想象,有了mysql增量数据流,使用场景就很多了,比如:实时同步数据到缓存,同步数据到ElasticSearch,数据迁移等等。与canal(ali)相比,更加轻量 maxwell还提供以下功能: * 使用`SELECT * FROM table` 的方式做全量数据初始化 * 支持主库发生failover后,自动恢复binlog位置 * 对数据进行分区,解决数据倾斜的问题 * 伪装成mysql从库,接收binlog maxwell官网:[http://maxwells-daemon.io/](http://maxwells-daemon.io/) maxwell源码:[https://github.com/zendesk/maxwell](https://github.com/zendesk/maxwell) maxwell使用 mysql配置 需要mysql开启binlog,而binlog默认是关闭的,需要开启,并且为了保证_同步数据的一致性_,使用的日志格式为_row-based replication(RBR)_,新建或修改`my.conf`开启binlog。 ```java $ vim /etc/my.cnf ``` 添加内容 ```java [mysqld] log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #随机指定一个不能和其他集群中机器重名的字符串,如果只有一台机器,那就可以随便指定了 ``` 重启mysql, 查询是否已开启bin ```java show variables like '%log_bin%' ```  配置jdk环境(略) maxwell 依赖java sdk,所以需要先配置JDK环境。 下载maxwell:[https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz](https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz) 解压后  修改config.properties.example为config.properties 到目前为止,前期准备已完成 实战项目:配置maxwell发送消息到rabbitMQ, 监听MQ队列,处理消息 mysql db如下:  修改maxwell的config.properties ```java # tl;dr config 生产环境配置为info级别 log_level=DEBUG producer=rabbitmq # mysql login info, mysql用户必须拥有读取binlog权限和新建库表的权限 host=127.0.0.1 user=xiehd password=xiehd2018 output_nulls=true # options to pass into the jdbc connection, given as opt=val&opt2=val2 #jdbc_options=opt1=100&opt2=hello jdbc_options=autoReconnet=true #需要同步的数据库,表,及不包含的字段 #filter=exclude: *.*, include: foo.*, include: bar.baz filter=exclude: *.*,include: xhd.* #replica_server_id 和 client_id 唯一标示,用于集群部署 replica_server_id=64 client_id=maxwell_dev metrics_type=http metrics_slf4j_interval=60 http_port=8111 http_diagnostic=true # default false #rabbitmq rabbitmq_host=192.168.50.184 rabbitmq_port=5672 rabbitmq_user=maxwell rabbitmq_pass=maxwell@2018 rabbitmq_virtual_host=/ rabbitmq_exchange=maxwell rabbitmq_exchange_type=topic rabbitmq_exchange_durable=false rabbitmq_exchange_autodelete=false rabbitmq_routing_key_template=%db%.%table% rabbitmq_message_persistent=false rabbitmq_declare_exchange=true ``` 启动maxwell ```java ./bin/maxwell ``` 启动成功  此时会自动生成maxwell库,该库记录了maxwell同步的状态,最后一次同步的id等等信息,在主库失败或同步异常后,只要maxwell库存在,下次同步会根据最后一次同步的id。如果没有生成maxwell库或报错,可能config.properties中配置的mysql用户权限不够  此时修改mysql中的SYS\_ROLE表数据,maxwell控制台会打印相应的json格式的日志  rabbitMQ控制台中会自动创建名称为maxwell的exchange(该exchange为config.properties中配置的)  新建Queue并绑定exchange,并设置routingkey为%db%.%table%(该routingkey为config.properties中配置)  修改表SYS\_USER中数据,此时修改的数据会被发送到rabbitMQ  一般情况下,一张表对应一个queue, 也可以多张表共用一个queue, 根据实际吞吐量灵活使用。 MQ消费端 新建一个MaxwellData ```java public class MaxwellData implements Serializable { private String type; private String database; private String table; private Map data; private Map old; public MaxwellData() { } public String getType() { return this.type; } public void setType(String type) { this.type = type; } public String getDatabase() { return this.database; } public void setDatabase(String database) { this.database = database; } public String getTable() { return this.table; } public void setTable(String table) { this.table = table; } public Map getData() { return this.data; } public void setData(Map data) { this.data = data; } public Map getOld() { return this.old; } public void setOld(Map old) { this.old = old; } ``` 解析maxwell数据 ```java @RabbitHandler @RabbitListener(queues = "SYS_USER") public void process(byte[] data) { String s = new String(data); MaxwellData maxwellData = decodeMsg(s); logger.info("maxwellData:" + maxwellData); //to do } private MaxwellData decodeMsg(String msg) { if (msg == null) { return null; } return JSON.parseObject(msg, MaxwellData.class); } ``` 当然也可以同步到Redis, kafka, rocketMQ(rocketmq需要自行实现producer)等 全量同步 使用maxwell-bootstrap命令 ./bin/maxwell-bootstrap --database xhd --table xhd-sso --host 127.0.0.1 --user xiehd --password xiehd2018 --client\_id maxwell\_dev 同步xhd.xhd-sso表的所有数据,并指定client\_id示maxwell\_dev的maxwell执行同步 上一个命令先开着,然后再启动client\_id=maxwell\_dev的maxwell ./bin/maxwell --client\_id maxwell\_dev 等待执行完成即可 本文转自 [https://blog.csdn.net/xiehd313/article/details/81289150](https://blog.csdn.net/xiehd313/article/details/81289150),如有侵权,请联系删除。
评论