Springboot2.3.x整合Canal的方法(Canal,springboot,开发技术)

时间:2024-05-06 20:44:17 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

    一、故事背景

    最近工作中遇到了一个数据同步的问题

    我们这边系统的一个子业务需要依赖另一个系统的数据,当另一个系统数据变更时,我们这边的数据库要对数据进行同步…

    那么我自己想到的同步方式呢就两种:

    1、MQ订阅,另一个系统数据变更后将变更数据方式到MQ 我们这边订阅接受

    2、数据库的触发器

    但是呢,两者都被组长paas了!

    1、MQ呢,会造成代码侵入,但是另一个系统暂时不会做任何代码更改…

    2、数据库的触发器会直接跟生产数据库强关联,会抢占资源,甚至有可能造成生产数据库的不稳定…

    对此很是苦恼…

    于是啊,只能借由强大的google、百度,看看能不能解决我这个问题!一番搜索,有学习了一个很有趣的东西

    二、什么是Canal

    canal:阿里开源mysql binlog 数据组件

    早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。ps. 目前内部使用的同步,已经支持mysql5.x和oracle部分版本的日志解析

    Springboot2.3.x整合Canal的方法

    canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

    工作原理

    • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议

    • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

    • canal 解析 binary log 对象(原始为 byte 流)

    canal呢,实际是就是运用了Mysql的主从复制原理…

    MySQL主从复制实现

    Springboot2.3.x整合Canal的方法

    复制遵循三步过程:

    • 主服务器将更改记录到binlog中(这些记录称为binlog事件,可以通过来查看show binary events

    • 从服务器将主服务器的二进制日志事件复制到其中继日志。

    • 中继日志中的从服务器重做事件随后将更新其旧数据。

    如何运作

    Springboot2.3.x整合Canal的方法

    原理很简单:

    • Canal模拟MySQL从站的交互协议,伪装成MySQL从站,然后将转储协议发送到MySQL主服务器。

    • MySQL Master接收到转储请求,并开始将二进制日志推送到slave(即运河)。

    • 运河将二进制日志对象解析为其自己的数据类型(最初为字节流)

    通过官网的介绍,让我们了解到,canal实际上就是伪装为了一个从库,我们只需要订阅到数据变更的主库,那么canal就会以从库的身份读取到其主库的binlog日志!我们拿到canal解析好的binlog日志信息,就等于拿到了变更的数据啦!

    三、Canal安装

    (1)事前准备

    (1)数据库开启binlog

    使用canal呢,有一个前提条件,即被订阅的数据库需要开启binlog

    如何查看是否开启binlog呢?

    登录服务器上数据库或在可视化工具中 执行查询语句: 如果出现 log_bin ON 表示已开启Binlog

    showvariableslike'log_bin';

    Springboot2.3.x整合Canal的方法

    如果服务器上的数据库为自己安装的,则找到配置文件my.conf 添加以下内容,如果买的云实例,则询问厂商开启即可

    Springboot2.3.x整合Canal的方法

    在my.conf文件中的 [mysqld] 下添加以下三行内容

    log-bin=mysql-bin#开启binlogbinlog-format=ROW#选择ROW模式读行server_id=1#配置MySQLreplaction需要定义,不要和canal的slaveId重复
    (2)数据库新建账号,开启MySQL slav权限

    canaltest:作为slave 角色的账户 Canal123…:为密码

    CREATEUSERcanaltestIDENTIFIEDBY'Canal123..';GRANTSELECT,REPLICATIONSLAVE,REPLICATIONCLIENTON*.*TO'canaltest'@'%';GRANTALLPRIVILEGESON*.*TO'canaltest'@'%';FLUSHPRIVILEGES;

    Springboot2.3.x整合Canal的方法

    连接测试

    Springboot2.3.x整合Canal的方法

    那么到这里,准备工作就好了!

    可能呢,有的小伙伴有点懵,你这是在干啥?那么咱们就来理那么一理! 敲黑板了哈!

    Springboot2.3.x整合Canal的方法

    1、事前准备,是针对于订阅数据库的(即主库)

    2、实际步骤也就两步 1:更改配置,开启binlog 2:设置新账号,赋予slave权限,供canal读取Binlog桥梁使用

    3、以上操作与canal本身没啥关系,仅仅是使用canal的前提条件罢辽…

    (2)Canal Admin 安装

    canal admin 是 一个可视化的 canal web管理运维工程,脱离以往服务器运维,面向web…

    canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

    canal-admin的限定依赖:

    • MySQL,用于存储配置和节点等相关数据

    • canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

    • 需要JRE 环境 (安装JDK)

    下载

    wgethttps://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

    解压

    mkdir/usr/local/canal-admintarzxvfcanal.admin-1.1.4.tar.gz-C/usr/local/canal-admin

    进入canal-admin目录下查看

    cd/usr/local/canal-admin

    修改配置

    vimconf/application.yml

    里边的配置 按照自己的实际情况更改…

    server:port:8089spring:jackson:date-format:yyyy-MM-ddHH:mm:sstime-zone:GMT+8#这里是配置canal-admin所依赖的数据库,,,存放web管理中设置的配置等,,,spring.datasource:address:127.0.0.1:3306database:canal_managerusername:root password:123456driver-class-name:com.mysql.jdbc.Driverurl:jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size:30minimum-idle:1#连接所用的账户密码canal:adminUser:adminadminPasswd:leitest

    导入canaladmin 所需要的数据库文件

    这里需要注意了,要和 application.yml中的数据库名对应,你可以选择命令导入,也可以Navicat 可视化拖sql文件导入…一切…看你喜欢.

    我这个玩canal的服务器呢,是新安装的,mysql直接用docker安装即可,具体可查看我的博客:

    Docker在CentOS7下不能下载镜像timeout的解决办法(图解)

    CentOS 7安装Docker

    需要注意的是,使用docker 安装的mysql 是无法直接使用 mysql -uroot -p 命令的哦,需要先将脚本复制到容器中,docker不熟练或觉得麻烦的同鞋,请直接使用Navicat可视化工具…

    导入canal-admin服务所必需的sql文件

    如果是服务器软件软件安装的mysql 则直接执行以下命令即可

    mysql-uroot-p#.........#导入初始化SQL>sourceconf/canal_manager.sql

    Springboot2.3.x整合Canal的方法

    启动

    直接执行启动脚本即可

    cdbin./startup.sh

    Springboot2.3.x整合Canal的方法

    默认账户密码:

    admin:123456

    Springboot2.3.x整合Canal的方法

    (3)Canal Server 安装

    canal-server 才是canal的核心我们前边所讲的canal的功能,实际上讲述的就是canal-server的功能…admin 仅仅只是一个web管理而已,不要搞混主次关系…

    下载

    wgethttps://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

    解压

    mkdir/usr/local/canal-servertarzxvfcanal.deployer-1.1.4.tar.gz-C/usr/local/canal-server

    启动,并连接到canal-admin web端

    首先,我们需要修改配置文件

    cd/usr/local/canal-servervim/conf/canal_local.properties

    Springboot2.3.x整合Canal的方法

    Springboot2.3.x整合Canal的方法

    注意了,密码如何加密!!!

    要记得,前边 canal-admin 的 aplication.yml 中设置了账户密码为 admin:leitest

    #连接所用的账户密码canal:adminUser:adminadminPasswd:leitest

    所以,我们这里需要对明文 leitest 加密并替换即可

    使用数据库函数 PASSWORD 加密即可

    SELECT PASSWORD(‘要加密的明文’),然后去掉前边的* 号就行

    Springboot2.3.x整合Canal的方法

    启动并连接到admin

    shbin/startup.shlocal

    查看端口看是否有 11110 、11111、11112

    netstat -untlp 看了一下,发现没有,说明server 没有启动成功

    Springboot2.3.x整合Canal的方法

    看下日志

    vimlogs/canal/canal.log

    Springboot2.3.x整合Canal的方法

    解决办法:

    1、canal-admin 先停止后从起

    2、canal server 先以之前的形式运行,不输入后边 local 命令

    3、关闭canal server

    4、再以canal server 连接 admin 形式启动

    Springboot2.3.x整合Canal的方法

    admin页面上新建server

    Springboot2.3.x整合Canal的方法

    修改配置,注释 (instance连接信息,我们还是以前边设置的 admin:leitest 为准,所有这里需要注释掉,如果不注释,那么我们代码中连接则需要使用此账号以及密码)

    Springboot2.3.x整合Canal的方法

    接下来咱们创建instance

    如何理解server 和instance 呢,我认为,可以把它当做 java 中的 class 和 bean 即 类和对象

    server 为类 instance 为其具体的实例对象 ,可创建多个不同的实例…

    而我们这边监听到主库变化的呢,则是根据业务,对不同的实例即(instance )做不同配置即可…

    Springboot2.3.x整合Canal的方法

    Springboot2.3.x整合Canal的方法

    Springboot2.3.x整合Canal的方法

    Springboot2.3.x整合Canal的方法

    根据自己情况进行过滤数据

    canal.instance.filter.regexmysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 常见例子:1. 所有表:.* or .\… 2. canal schema下所有表: canal\…* 3. canal下的以canal打头的表:canal\.canal.* 4. canal schema下的一张表:canal\.test15. 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)canal.instance.filter.druid.ddl是否使用druid处理所有的ddl解析来获取库和表名truecanal.instance.filter.query.dcl是否忽略dcl语句falsecanal.instance.filter.query.dml是否忽略dml语句 (mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档)falsecanal.instance.filter.query.ddl是否忽略ddl语句false

    更多设置请见官网:https://github.com/alibaba/canal/wiki/AdminGuide

    如此一来,一个简单的canal环境就搭建好了,接下来,咱们开始测试吧!

    (4)springboot demo示例

    引入canal所需依赖

    <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>

    配置

    canal:#instance实例所在iphost:192.168.96.129#tcp通信端口port:11111#账号canal-adminapplication.yml设置的username:admin#密码password:leitest#实例名称instance:test

    代码

    packagecom.leilei;importcom.alibaba.otter.canal.client.CanalConnector;importcom.alibaba.otter.canal.client.CanalConnectors;importcom.alibaba.otter.canal.protocol.CanalEntry;importcom.alibaba.otter.canal.protocol.Message;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.ApplicationArguments;importorg.springframework.boot.ApplicationRunner;importorg.springframework.stereotype.Component;importjava.net.InetSocketAddress;importjava.util.List;/***@authorlei*@version1.0*@date2020/9/2722:23*@desc读取binlog日志*/@ComponentpublicclassReadBinLogServiceimplementsApplicationRunner{@Value("${canal.host}")privateStringhost;@Value("${canal.port}")privateintport;@Value("${canal.username}")privateStringusername;@Value("${canal.password}")privateStringpassword;@Value("${canal.instance}")privateStringinstance;@Overridepublicvoidrun(ApplicationArgumentsargs)throwsException{CanalConnectorconn=getConn();while(true){conn.connect();//订阅实例中所有的数据库和表conn.subscribe(".*\\..*");//回滚到未进行ack的地方conn.rollback();//获取数据每次获取一百条改变数据Messagemessage=conn.getWithoutAck(100);longid=message.getId();intsize=message.getEntries().size();if(id!=-1&&size>0){//数据解析analysis(message.getEntries());}else{Thread.sleep(1000);}//确认消息conn.ack(message.getId());//关闭连接conn.disconnect();}}/***数据解析*/privatevoidanalysis(List<CanalEntry.Entry>entries){for(CanalEntry.Entryentry:entries){//只解析mysql事务的操作,其他的不解析if(entry.getEntryType()==CanalEntry.EntryType.TRANSACTIONBEGIN){continue;if(entry.getEntryType()==CanalEntry.EntryType.TRANSACTIONEND){//解析binlogCanalEntry.RowChangerowChange=null;try{rowChange=CanalEntry.RowChange.parseFrom(entry.getStoreValue());}catch(Exceptione){thrownewRuntimeException("解析出现异常data:"+entry.toString(),e);if(rowChange!=null){//获取操作类型CanalEntry.EventTypeeventType=rowChange.getEventType();//获取当前操作所属的数据库StringdbName=entry.getHeader().getSchemaName();//获取当前操作所属的表StringtableName=entry.getHeader().getTableName();//事务提交时间longtimestamp=entry.getHeader().getExecuteTime();for(CanalEntry.RowDatarowData:rowChange.getRowDatasList()){dataDetails(rowData.getBeforeColumnsList(),rowData.getAfterColumnsList(),dbName,tableName,eventType,timestamp);System.out.println("-------------------------------------------------------------");}*解析具体一条Binlog消息的数据**@paramdbName当前操作所属数据库名称*@paramtableName当前操作所属表名称*@parameventType当前操作类型(新增、修改、删除)privatestaticvoiddataDetails(List<CanalEntry.Column>beforeColumns,List<CanalEntry.Column>afterColumns,StringdbName,StringtableName,CanalEntry.EventTypeeventType,longtimestamp){System.out.println("数据库:"+dbName);System.out.println("表名:"+tableName);System.out.println("操作类型:"+eventType);if(CanalEntry.EventType.INSERT.equals(eventType)){System.out.println("新增数据:");printColumn(afterColumns);}elseif(CanalEntry.EventType.DELETE.equals(eventType)){System.out.println("删除数据:");printColumn(beforeColumns);}else{System.out.println("更新数据:更新前数据--");System.out.println("更新数据:更新后数据--");System.out.println("操作时间:"+timestamp);privatestaticvoidprintColumn(List<CanalEntry.Column>columns){for(CanalEntry.Columncolumn:columns){System.out.println(column.getName()+":"+column.getValue()+"update="+column.getUpdated());*获取连接publicCanalConnectorgetConn(){returnCanalConnectors.newSingleConnector(newInetSocketAddress(host,port),instance,username,password);}

    测试查看

    数据库修改数据库时

    Springboot2.3.x整合Canal的方法

    数据新增数据时

    Springboot2.3.x整合Canal的方法

    删除数据(把我们才添加的小明删掉)

    Springboot2.3.x整合Canal的方法

    当我们操作监控的数据库DM L操作的时候呢,会被canal监听到&hellip;我们呢,通过canal监听,拿到修改的库,修改的表,修改的字段,便可以根据自己业务进行数据处理了!

    哎,这个时候啊,可能有小伙伴就要问了,那么,我能不能直接获取其操作的sql语句呢?

    目前,我是自己解析其列来手动拼接的sql语句实现了

    话不多说,先上效果:

    canal 监听到主库sql变化----> update students set id = '2', age = '999', name = '小三', city = '11', date = '2020-09-27 17:41:44', birth = '2020-09-27 18:00:48' where id=2
    canal 监听到主库sql变化----> delete from students where id=6
    canal 监听到主库sql变化----> insert into students (id,age,name,city,date,birth) VALUES ('89','98','测试新增','深圳','2020-09-27 22:46:53','')
    canal 监听到主库sql变化----> update students set id = '89', age = '98', name = '测试新增', city = '深圳', date = '2020-09-27 22:46:53', birth = '2020-09-27 22:46:56' where id=89

    Springboot2.3.x整合Canal的方法

    实际上呢,我们也就是拿到其执行前列数据变化 执行后列数据变化,自己拼接了一个sql罢了&hellip;附上代码

    packagecom.leilei;importcom.alibaba.otter.canal.client.CanalConnector;importcom.alibaba.otter.canal.client.CanalConnectors;importcom.alibaba.otter.canal.protocol.CanalEntry.*;importcom.alibaba.otter.canal.protocol.Message;importcom.alibaba.otter.canal.protocol.exception.CanalClientException;importcom.google.protobuf.InvalidProtocolBufferException;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.ApplicationArguments;importorg.springframework.boot.ApplicationRunner;importorg.springframework.stereotype.Component;importjava.net.InetSocketAddress;importjava.util.List;importjava.util.Queue;importjava.util.concurrent.ConcurrentLinkedQueue;/***@authorlei*@version1.0*@date2020/9/2722:33*@desc读取binlog日志*/@ComponentpublicclassReadBinLogToSqlimplementsApplicationRunner{//读取的binlogsql队列缓存一边Push一边pollprivateQueue<String>canalQueue=newConcurrentLinkedQueue<>();@Value("${canal.host}")privateStringhost;@Value("${canal.port}")privateintport;@Value("${canal.username}")privateStringusername;@Value("${canal.password}")privateStringpassword;@Value("${canal.instance}")privateStringinstance;@Overridepublicvoidrun(ApplicationArgumentsargs)throwsException{CanalConnectorconn=getConn();while(true){try{conn.connect();//订阅实例中所有的数据库和表conn.subscribe(".*\\..*");//回滚到未进行ack的地方conn.rollback();//获取数据每次获取一百条改变数据Messagemessage=conn.getWithoutAck(100);longid=message.getId();intsize=message.getEntries().size();if(id!=-1&&size>0){//数据解析analysis(message.getEntries());}else{Thread.sleep(1000);}//确认消息conn.ack(message.getId());}catch(CanalClientException|InvalidProtocolBufferException|InterruptedExceptione){e.printStackTrace();}finally{//关闭连接conn.disconnect();}}}privatevoidanalysis(List<Entry>entries)throwsInvalidProtocolBufferException{for(Entryentry:entries){if(EntryType.ROWDATA==entry.getEntryType()){RowChangerowChange=RowChange.parseFrom(entry.getStoreValue());EventTypeeventType=rowChange.getEventType();if(eventType==EventType.DELETE){saveDeleteSql(entry);}elseif(eventType==EventType.UPDATE){saveUpdateSql(entry);}elseif(eventType==EventType.INSERT){saveInsertSql(entry);}}}}/***保存更新语句**@paramentry*/privatevoidsaveUpdateSql(Entryentry){try{RowChangerowChange=RowChange.parseFrom(entry.getStoreValue());List<RowData>dataList=rowChange.getRowDatasList();for(RowDatarowData:dataList){List<Column>afterColumnsList=rowData.getAfterColumnsList();StringBuffersql=newStringBuffer("update"+entry.getHeader().getTableName()+"set");for(inti=0;i<afterColumnsList.size();i++){sql.append("").append(afterColumnsList.get(i).getName()).append("='").append(afterColumnsList.get(i).getValue()).append("'");if(i!=afterColumnsList.size()-1){sql.append(",");}}sql.append("where");List<Column>oldColumnList=rowData.getBeforeColumnsList();for(Columncolumn:oldColumnList){if(column.getIsKey()){sql.append(column.getName()).append("=").append(column.getValue());break;}}canalQueue.add(sql.toString());}}catch(InvalidProtocolBufferExceptione){e.printStackTrace();}}/***保存删除语句**@paramentry*/privatevoidsaveDeleteSql(Entryentry){try{RowChangerowChange=RowChange.parseFrom(entry.getStoreValue());List<RowData>rowDatasList=rowChange.getRowDatasList();for(RowDatarowData:rowDatasList){List<Column>columnList=rowData.getBeforeColumnsList();StringBuffersql=newStringBuffer("deletefrom"+entry.getHeader().getTableName()+"where");for(Columncolumn:columnList){if(column.getIsKey()){sql.append(column.getName()).append("=").append(column.getValue());break;}}canalQueue.add(sql.toString());}}catch(InvalidProtocolBufferExceptione){e.printStackTrace();}}/***保存插入语句**@paramentry*/privatevoidsaveInsertSql(Entryentry){try{RowChangerowChange=RowChange.parseFrom(entry.getStoreValue());List<RowData>datasList=rowChange.getRowDatasList();for(RowDatarowData:datasList){List<Column>columnList=rowData.getAfterColumnsList();StringBuffersql=newStringBuffer("insertinto"+entry.getHeader().getTableName()+"(");for(inti=0;i<columnList.size();i++){sql.append(columnList.get(i).getName());if(i!=columnList.size()-1){sql.append(",");}}sql.append(")VALUES(");for(inti=0;i<columnList.size();i++){sql.append("'"+columnList.get(i).getValue()+"'");if(i!=columnList.size()-1){sql.append(",");}}sql.append(")");canalQueue.add(sql.toString());}}catch(InvalidProtocolBufferExceptione){e.printStackTrace();}}/***获取连接*/publicCanalConnectorgetConn(){returnCanalConnectors.newSingleConnector(newInetSocketAddress(host,port),instance,username,password);}/***模拟消费canal转换的sql语句*/publicvoidexecuteQueueSql(){intsize=canalQueue.size();for(inti=0;i<size;i++){Stringsql=canalQueue.poll();System.out.println("canal监听到主库sql变化---->"+sql);}}}
     </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
    本文:Springboot2.3.x整合Canal的方法的详细内容,希望对您有所帮助,信息来源于网络。
    上一篇:怎么使用Python进行数独求解下一篇:

    6 人围观 / 0 条评论 ↓快速评论↓

    (必须)

    (必须,保密)

    阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18