admin 管理员组文章数量: 1184232
canal
一、应用场景
在前面Echarts - 实现图表显示中,我们使用了服务调用(统计表中的信息通过调用用户模块服务来获取)获取统计信息,这样耦合度高,效率相对较低,目前有另一种方法,通过实时同步数据库表的方法实现。例如每天统计登录注册人数,我们只需要把用户表同步到统计库中,实现本地统计就可以了,这样效率更高,耦合度更低,canal就是一个数据库同步工具。也可以将mysql数据库中的数据同步到中间件。
canal是阿里巴巴旗下的一款开源项目,纯java开发。基于数据库增量日志解析,提供增量数据订阅与消费,目前主要支持MySQL。
二、Canal环境搭建
1、canal的原理是基于MySQL binlog技术,所以需要开启MySQL的binlog写入功能
show VARIABLES like 'log_bin';
2、如果log_bin没有开启显示OFF,则需要在MySQL的配置文件中添加配置信息后进行MySQL的重启。
log-bin=mysql-bin #binlog文件名
binlog_format=ROW #选择row模式
server_id=1 #mysql实例id,不能和canal的slaveId重复
3、安装canal数据同步工具
下载canal工具
下载后解压到安装的目录中
通过以下路径找到并修改配置文件
需要修改这三个地方,分别是本地mysql端口号,用户名与密码以及canal同步的正则表达式。
#需要改成自己的数据库信息
canal.instance.master.address=192.168.0.145:3306
#需要改成自己的数据库用户名与密码
canal.instance.dbUsername=root
canal.instance.dbPassword=root
#需要改成同步的数据库表规则,例如只是同步一下表
#canal.instance.filter.regex=.*..*
#指定某个库的某个表
canal.instance.filter.regex=canal_test.members
4、启动canal数据同步工具
需提前在虚拟机安装jdk
在文件所在目录下bin目录中有startup.sh启动
三、客户端代码编写
1、创建canal模块
2、引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- mysql --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>commons-dbutils</groupId><artifactId>commons-dbutils</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId></dependency></dependencies>
3、创建配置文件
# 端口号
server:port: 10000
spring:application:# 应用名name: canal-clientprofiles:active: dev# 数据库连接datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/leader?serverTimezone=GMT%2B8username: rootpassword: root
4、创建canal客户端类,在启动类执行
@Component
public class CanalClient {//sql队列private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();@Resourceprivate DataSource dataSource;/*** canal入库方法*/public void run() {/*** hostname: 虚拟机ip地址* port:canal固定端口号11111* destination:查找虚拟机canal配置文件example* username 与 password 连接数据库*/CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.0.145",11111), "example", "root", "root");int batchSize = 1000;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 1) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}/*** 模拟执行队列里面的sql语句*/public void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();System.out.println("[sql]----> " + sql);this.execute(sql.toString());}}/*** 数据处理** @param entrys*/private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {for (Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private void saveUpdateSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private void saveDeleteSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private void saveInsertSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 入库** @param sql*/public void execute(String sql) {Connection con = null;try {if(null == sql) return;con = dataSource.getConnection();QueryRunner qr = new QueryRunner();int row = qr.execute(con, sql);System.out.println("update: "+ row);} catch (SQLException e) {e.printStackTrace();} finally {DbUtils.closeQuietly(con);}}
}
@SpringBootApplication
public class CanalApplication implements CommandLineRunner {@Resourceprivate CanalClient canalClient;public static void main(String[] args) {SpringApplication.run(CanalApplication.class, args);}@Overridepublic void run(String... args) throws Exception {canalClient.run();}
}
四、最终测试
启动canal,启动CanalApplication
通过向虚拟机中的表插入一行数据后,查看本地MySQL是否同步更新此数据。
虚拟机数据库中有此信息
本机数据库中也有此信息
测试成功。
以上
本文标签: canal
版权声明:本文标题:canal 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/b/1686858835a42621.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论