admin 管理员组

文章数量: 1184232


2024年3月5日发(作者:软件格式工厂下载)

SymmetricDS数据同步工具

安装使用说明书

1. 背景

随着大数据产品功能的丰富,以及用户对于大数据的时效性的需求,需要一款能够实现跨数据库,以及同一种数据库跨版本的数据同步工具,同时要能够支撑数据库到Kafka的数据推送。

目前大数据技术组研发了一款yhbi的同步工具,支持从oracle到oracle的同步,以及oracle到Kafka的同步,但是受限于不能支持oracle 12c版本数据的同步。

OGG也可以实现oracle到oracle,以及oracle到其他目标端数据库或者消息队列的同步,但OGG的费用、维护成本较高。

故采用了一款名为SymmetricDS的开源数据库同步工具。

2. 简介

SymmetricDS就像其名称一样,是为了实现数据源的“对称性“,也就是数据同步。

SymmetricDS是用于数据库和文件同步的开源软件,支持多主复制、过滤同步和转换。它使用web(HTTP传输)和数据库技术(触发器)将更改数据复制为预定的或接近实时的操作,并且它还包含了完整数据负载的初始负载特性。该软件的设计目的是针对大量节点,跨低带宽连接工作,并经得起网络中断。

2.1. 概述

SymmetricDS的同步原理如下图所示。

节点负责使用HTTP将来自数据库或文件系统的数据与网络中的其他节点进行同步。节点被分配给作为一个单元配置在一起的节点组之一。节点组与组链接链接在一起,以定义推或拉通信。pull使一个节点与其他节点连接并请求正在等待的更改,而push使一个节点在需要发送更改时与其他节点连接。

每个节点都通过使用连接URL、用户名和密码的Java数据库连接(JDBC)驱动程序连接到数据库。虽然节点可以跨广泛的区域网络进行分隔,但是为了获得最佳性能,节点所连接的数据库应该位于局域网络附近。节点使用其数据库连接创建表作为配置设置和运行时操作的数据模型。用户填充配置表以定义同步,运行时表捕获更改并跟踪活动。要同步的表可以位于连接可访问的任何目录和模式中,而要同步的文件可以位于本地服务器可访问的任何目录中。

在启动时,SymmetricDS查找节点属性文件,并为找到的每个文件启动一个节点,这允许多个节点在同一个实例中运行并共享资源。节点的属性文件包含其外部ID、节点组、注册服务器URL和数据库连接信息。外部ID是用于从其他节点识别它的节点的名称。一个节点被配置为存储主配置的注册服务器。当第一次启动节点时,它使用发送外部ID和节点组的注册进程与注册服务器联系。作为响应,节点接收其配置和节点密码,这些密码必须在与其他节点同步时作为身份验证发送。

2.2. 架构

节点中的每个子系统负责部分数据移动,并通过配置进行控制。数据流经系统的步骤如下:

1. 捕获到源数据库的运行时表中

2. 发送到目标节点和成批组的路由

3. 提取并转换为输出批处理所需的行、列和值

4. 将发出的批处理发送到目标节点

5. 在目标节点接收到的批处理

6. 转换为进入批处理所需的行、列和值

7. 加载数据并向源节点返回确认信息

架构图如下图所示:

2.3. 特点

SymmetricDS提供了丰富的特性集,具有灵活的配置,可用于在具有多个系统的混合环境中进行大规模部署。

2.3.1. 数据同步

更改关系数据库的数据捕获和文件系统的文件同步可以是周期性的,也可以是接近实时的,有一个初始负载特性可以完全填充节点。

2.3.2. 集中管理

从一个可以调查和解决冲突和错误的集中位置配置、监视和故障排除同步。

2.3.3. 自动恢复

数据传送是持久的和低维护,承受停机时间和自动从网络故障恢复。

2.3.4. 安全和高效

通信使用为低带宽网络设计的数据协议,并通过HTTPS传输加密传输。

2.3.5. 转换

在多个点上操作数据,以过滤、子集、转换、合并和丰富数据。

2.3.6. 冲突管理

通过配置自动和手动解决的规则来加强双向同步的一致性。

2.3.7. 可扩展

可以配置脚本和Java代码来处理事件、转换数据和创建自定义行为。

2.3.8. 部署选项

软件可以作为独立的自包含服务器安装,部署到web应用服务器,或者嵌入到应用程序中。

3. 安装

SymmetricDS官方提供了3种安装方法。

 独立安装

SymmetricDS安装和运行作为一个独立的进程使用内置Jetty web服务器。这是安装实例最简单也是最推荐的方法。

 Web Archive (WAR)

将SymmetricDS Web Archive (WAR)文件部署到单独安装、维护和运行的现有Web应用程序容器中。

 嵌入式

SymmetricDS嵌入在现有的应用程序中。在这个选项中,编写了一个定制的包装程序,它调用SymmetricDS API来同步数据。

在本文档中,只对前两种方法做介绍,并且是在同一台服务器上部署corp节点(主)和store节点(从),实现一主一从的数据同步。

3.1. 安装前准备

3.1.1. 下载

下载地址:/projects/symmetricds/files/

下载后的文件是一个zip压缩文件。

3.1.2. 上传

把文件上传到服务器。

3.2. 独立安装

3.2.1. 主节点

# unzip

# mv symmetric-server-3.9.13 symmetric-corp

3.2.1.1. 配置文件

# cd symmetric-corp

sample目录下默认放置了一些配置文件,需要复制到engines目录下。

# cp samples/ties engines/

# vi engines/ties

需要修改

源库连接信息

rd

同步的主节点地址

3.2.1.2. 端口配置

# vi conf/ties

3.2.2. 从节点

# unzip

# mv symmetric-server-3.9.13 symmetric-corp

3.2.2.1. 配置文件

# cd symmetric-corp

# cp samples/ties engines/

# vi engines/ties

需要修改

目标库连接信息

rd

从节点的注册地址,与主节点的保持一致。

3.2.2.2. 端口配置

# vi conf/ties

3.2.3. 初始化主节点

导入item相关表

./bin/dbimport --engine corp-000 --format XML --alter-case

samples/create_

导入系统表

./bin/symadmin --engine corp-000 create-sym-tables

导入item相关表的数据

./bin/dbimport --engine corp-000 samples/insert_

3.2.4. 启动

在主节点目录

# ./bin/sym

等待主节点启动完毕。

在从节点目录

# ./bin/sym

Store-001节点启动后,需要等待一段时间,主节点corp-00会把系统配置数据发送到从节点store-001上。当日志信息稳定后,查询store-001的目标数据库的item表,如果有数据表示同步已完成。

3.2.5. 测试同步

修改主节点corp-000源库的item表数据,查询从节点store-001目标数据库里的item表是否有变化,如果数据一样,表示同步成功。

3.3. Web Archive

使用命令进行打包。

# bin/symadmin -p engine/ties create-war /some/path/to/

注意事项

 ties文件是指的ties或者ties。打包完成后,该文件在WEB-INF/classes下面,并且名称变为了ties。

 打包后可发布在Tomcat, Jetty, or JBoss,不支持直接发布在weblogic,需要配置。

 一主一从需要打包两个war包。

 注意修改ties里面的url地址。

4. 使用

4.1. 配置同步表

接下来是配置我们需要同步的表。首先要明确我们需要同步哪些表,需要列出一个同步表的清单。假设我们需要同步AC01表。

4.1.1. 创建同步表

在源库中找到AC01表的建表sql,在目标库中执行该sql创建同步表。

4.1.2. SYM_CHANNEL

SYM_CHANNEL表保存的是同步数据时的通道(管道)。在源库中执行以下sql。

INSERT INTO SYM_CHANNEL

(channel_id,

data_event_action,

create_time,

last_update_by,

last_update_time)

VALUES

('ac01', NULL, SYSTIMESTAMP, 'admin', SYSTIMESTAMP);

4.1.3. SYM_TRIGGER

SYM_TRIGGER保存的是要同步的表的触发器。在源库中执行以下sql。

insert into SYM_TRIGGER

(trigger_id,

source_table_name,

channel_id,

last_update_time,

create_time)

values

('ac01', 'ac01', 'ac01', systimestamp, systimestamp);

4.1.4. SYM_TRIGGER_ROUTER

该表是保存的触发器路由信息,表示从哪个节点发送到哪个节点,此处应该是从主节点到从节点。在源库中执行以下sql。

insert into SYM_TRIGGER_ROUTER

(trigger_id,

router_id,

initial_load_order,

create_time,

last_update_time)

values

('ac01', 'corp_2_store', 1, systimestamp, systimestamp);

4.1.5. 初始化负载

当配置好了后,需要把源库中表的数据初始化同步到目标库的同步表中。在主节点的命令窗口执行以下命令。

# ./bin/symadmin --engine corp-000 reload-node 001

NOTE:

初始化负载之前一定要先创建好通道、触发器以及触发器路由。

4.1.5.1. 取消初始化负载

修改SYM_TRIGGER_ROUTER表的INITIAL_LOAD_ORDER字段值为负数,则下次再执行初始化命令时,将不会再初始化该表数据。

4.1.5.2. 自动建目标表

自动创建表容易出现异常,暂不建议使用。

在SYM_TABLE_RELOAD_REQUEST表中插入一条记录,create_table设置为1,表示在执行初始化命令时,先在目标数据库中创建表。

insert into SYM_TABLE_RELOAD_REQUEST

(target_node_id,

source_node_id,

trigger_id,

router_id,

create_time,

create_table,

last_update_time)

values

('corp-000',

'store-001',

'ac01',

'corp_2_store',

current_timestamp,

1,

current_timestamp);

4.2. Kafka

SymmetricDS不仅可以实现数据库导数据库的同步,还可以实现数据库到kafka的同步推送。采用扩展的形式实现数据的同步推送。

在源库中执行以下脚本添加扩展代码,即可完成数据推送。

declare

java_text sym_ION_TEXT%type;

begin

java_text := 'import er.*;

import ;

import a;

import ntext;

import entType;

import aseWriterFilter;

import ;

import Factory;

import p;

import ;

/**

* kafka写入过滤器

*

* @author 詹祥

* @created 2019-09-20 16:09

* @modified on 2019-02-21 18:00 添加update的处理,包含oldData

newData

**/

public class KafkaWriterFilter implements IDatabaseWriterFilter {

protected final String KAKFA_TEXT_CACHE = "KAKFA_TEXT_CACHE" +

de();

private final Logger log = ger(getClass());

public boolean beforeWrite(DataContext context, Table table, CsvData

data) {

//如果是symmetricDS的系统表,则不做处理

if (e().toUpperCase().startsWith("SYM_")) {

return true;

}else {

//("Processing table " + table + " for Kafka");

String[] oldData = null;

String[] rowData = sedData(_DATA);

if (aEventType() == ) {

rowData = sedData(_DATA);

}else if (aEventType() == ){

oldData=sedData(_DATA);

}

StringBuffer kafkaText = new StringBuffer();

if ((KAKFA_TEXT_CACHE) != null) {

kafkaText = (StringBuffer) (KAKFA_TEXT_CACHE);

}

//是否使用json格式传输数据

boolean useJson = true;

char t = ''"'';

Character s = new Character(t);

String quote = ng();

if (useJson) {

//UPDATE 需要原有数据,存在两个数据头 newData

oldData

if(aEventType().toString().equals("UPDATE")){

("{"+quote+"tablename"+quote+":

"+quote).append(e() +

quote+",").append(quote+"eventType"+quote+":"+quote+ aEventType()

+ quote+",").append(quote+"newData"+quote+": { ");

for (int i = 0; i < umnNames().length; i++) {

(quote +

umnNames()[i] + quote+": "+quote + rowData[i]);

if (i + 1 < umnNames().length) {

(quote+",");

}

}

(quote+"},"+quote+"oldData"+quote+":{");

for (int i = 0; i < umnNames().length; i++) {

(quote +

umnNames()[i] + quote+": "+quote + oldData[i]);

if (i + 1 < umnNames().length) {

(quote+",");

}

}

(quote+" } }");

//INSERT和DELETE直接用data的数据头

}else{

("{"+quote+"tablename"+quote+":

"+quote).append(e() +

quote+",").append(quote+"eventType"+quote+":"+quote+ aEventType()

+ quote+",").append(quote+"data"+quote+": { ");

for (int i = 0; i < umnNames().length; i++) {

(quote +

umnNames()[i] + quote+": "+quote + rowData[i]);

if (i + 1 < umnNames().length) {

(quote+",");

}

}

(quote+" } }");

}

}else {

("TABLE").append(",").append(e()).append(",").append("EVENT").append(",").append(aEventType()).append(",");

for (int i = 0; i < umnNames().length; i++) {

(umnNames()[i]).append(",").append(rowData[i]);

if (i + 1 < umnNames().length) {

(",");

}

}

}

sendKafkaMessage(ng());

// (KAKFA_TEXT_CACHE, kafkaText);

}

return true;

}

public void afterWrite(DataContext context, Table table, CsvData data) {

}

public boolean handlesMissingTable(DataContext context, Table table) {

return true;

}

public void earlyCommit(DataContext context) {

}

public void batchComplete(DataContext context) {

}

public void batchCommitted(DataContext context) {

}

public void batchRolledback(DataContext context) {

}

public void sendKafkaMessage(String kafkaText) {

Map configs = new HashMap();

// 公司环境

//(RAP_SERVERS_CONFIG,

"192.168.26.220:6667,192.168.26.221:6667,192.168.26.222:6667");

// 四川省就业局环境

(RAP_SERVERS_CONFIG,

"10.160.7.131:6667,10.160.7.132:6667,10.160.7.133:6667,10.160.7.134:6667,10.160.7.135:6667");

(_SERIALIZER_CLASS_CONFIG,

"Serializer");

(_SERIALIZER_CLASS_CONFIG,

"Serializer");

(_ID_CONFIG, "symmetricds-producer");

String topicName = "SCJY";

KafkaProducer producer = new KafkaProducer

String>(configs);

ProducerRecord producerRecord = new

ProducerRecord(topicName, "symmetricDS", kafkaText);

(producerRecord);

//("Data to be sent to Kafka-" + kafkaText);

();

}

}';

insert into SYM_EXTENSION (EXTENSION_ID, EXTENSION_TYPE,

INTERFACE_NAME, NODE_GROUP_ID, ENABLED, EXTENSION_ORDER,

EXTENSION_TEXT, CREATE_TIME, LAST_UPDATE_BY, LAST_UPDATE_TIME)

values

('KafkaDataWriter','java','aseWriterFilter','store',1,1,java_text,systimestamp,'admin',systimestamp);

commit;

end;

需要修改上述的两处红色部分。

192.168.26.220:6667,192.168.26.221:6667,192.168.26.222:6667:bootstrap-servers地址

“SCJY”:topic名称

5. 注意事项

5.1. 启动顺序

必须严格按照文档的部署执行顺序。

5.2. 自动创建表

自动创建表有时会出现异常,创建表时报错。

解决方法:

1、SYM_TABLE_RELOAD_REQUEST表中不要增加记录。

2、如果已经增加了记录,先删除,查看symmetric从节点的日志,找到出错的batch_id,根据batch_id把主节点的sym_outgoing_batch表的

status和error_flag字段更新成‘OK’和‘0’。同理修改从节点的sym_incoming_batch表。

执行SQL如下:

update sym_outgoing_batch set status = 'OK', error_flag =

'0' where batch_id = '3919';

update sym_incoming_batch set status = 'OK', error_flag =

'0' where batch_id = '3919';

5.3. CLOB字段同步

工具只支持CLOB字段的字节数在4000以内的,超过4000会造成触发器报错,数据初始化加载报错。中文字符1个汉字需要3个字节。

解决方法:

1、触发器报错,可修改数据库触发器代码解决。

2、数据初始化加载报错,需要修改symmetric源代码。

3、如果CLOB字段不是计算的必要字段,可以去掉源表的CLOB字段。


本文标签: 节点 数据 数据库 配置