admin 管理员组

文章数量: 1184232

一、下载


cd /home/
tar -xvzf dlink-release-0.7.3.tar.gz
mv dlink-release-0.7.3 dlink
cd dlink

环境
mysql数据库部署在172服务器上
Dinky部署在172服务器上,需要部署在hadoop集群中(172、166、168)

二、部署

1、数据库初始化

Dinky 采用 mysql 作为后端的存储库,mysql 支持 5.7+。这里假设你已经安装了 mysql 。首先需要创建 Dinky 的后端数据库,这里以配置文件中默认库创建。

在 Dinky 根目录 sql 文件夹下分别放置了 dinky.sql 、 upgrade/${version}_schema/mysql/ddl 和 dml。如果第一次部署,可以直接将 sql/dinky.sql 文件在 dinky 数据库下执行。(如果之前已经部署,那 upgrade 目录下 存放了各版本的升级 sql ,根据版本号按需执行即可)
在10.9.70.172服务器上执行

cd /home/dlink/sql/
mysql -h127.0.0.1 -uroot -p

创建数据库并授权给dinky用户

#创建数据库
mysql>
create database dinky;#创建用户并允许远程登录
mysql>
create user 'dinky'@'%' IDENTIFIED WITH mysql_native_password by 'Stwc.2wsx';#授权
mysql>
grant ALL PRIVILEGES ON dinky.* to 'dinky'@'%';
mysql>
flush privileges;

mysql -h 10.9.70.172  -udinky -p
mysql>
use dinky;
mysql> source /home/dlink/sql/dinky.sql

2、配置文件

创建好数据库后,修改 Dinky 连接 mysql 的配置文件。

#切换目录
cd /home/dlink/config/
vim application.yml

3、加载依赖

Dinky 需要具备自身的 Flink 环境,该 Flink 环境的实现需要用户自己在 Dinky 根目录下 plugins/flink${FLINK_VERSION} 文件夹并上传相关的 Flink 依赖
注意事项 :::danger
Dinky 当前版本的 yarn 的 perjob 与 application 执行模式依赖 flink-shade-hadoop ,如果你的 Hadoop 版本为 2+ 或 3+,需要额外添加 flink-shade-hadoop-uber-3 包,请手动删除该包内部的 javax.servlet 等冲突内容。 当然如果你的 Hadoop 为 3+ 也可以自行编译对于版本的 dinky-client-hadoop.jar 以替代 uber 包,
:::
用解压工具打开uber的 flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar 包,找到/javax目录,删除servlet文件夹
将flink/lib下所有依赖包放在/home/dlink/plugins/flink1.17/目录下,注意flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar替换

4、启动 Dinky

cd /home/dlink/
#启动
sh auto.sh start#停止
sh auto.sh stop
#重启
sh auto.sh restart
#查看状态
sh auto.sh status

当前启动的Flink版本为1.14

版本适配
启动指令的第二个参数则是版本选择,支持 【1.13, 1.14, 1.15, 1.16】,参考指令如下:

#启动
sh auto.sh start 1.17

5、访问


默认密码:admin/admin
修改密码:Stwc.2wsx

三、配置

1、系统配置

登陆Dlink后,在配置中心配置Flink

在dlink安装目录jar文件夹下的文件 dlink-app-1.17-0.7.3-jar-with-dependencies.jar 上传到 hdfs

hdfs:///dlink/jar/dlink-app-1.17-0.7.3-jar-with-dependencies.jar

参数配置说明:

  • 提交 FlinkSQL 的 Jar文件路径: 此参数是为了引入 Dinky 中提交 Application 模式 的 jar包文件,
    • 服务器部署方式: 需要上传到相应的HDFS路径,jar包在 Dinky 解压根目录下的jar文件夹下;eg: hdfs:///dlink/jar/dlink-app-${dlink-version}-jar-with-dependencies.jar
    • 本地调试方式: 需要本地 install后 将其设置为eg: i d e a w o r k d i r / d l i n k / d l i n k − a p p / t a r g e t / d l i n k − a p p − idea_work_dir/dlink/dlink-app/target/dlink-app- i d e a w or k d i r / d l ink / d l ink a pp / t a r g e t / d l ink a pp {dlink-version}-jar-with-dependencies.jar
  • 提交 FlinkSQL 的 Jar 的主类入参: 默认为空,不需要修改,配合提交FlinkSQL的Jar文件路径使用;
  • 提交 FlinkSQL 的 Jar 的主类: 默认 com.dlink.app.MainApp,不需要修改,配合提交FlinkSQL的Jar文件路径使用;
  • 使用 RestAPI: 默认开启,开启后 FlinkSQL 停止等操作通过此参数进行;
  • FlinkSQL 语句分隔符: 默认是分号,即";"。多个语句间可以用分号隔开; 此项支持自定义 eg: ;\r\n

2、Flink集群配置

1)、Standalone注册集群实例

注册中心 > 集群管理 > 集群实例管理 > 新建

参数配置说明:

  • 名称: 自定义(必填)
  • 别名: 自定义,默认同名称
  • 类型: 支持 Standalone,Yarn Session 和 Kubernetes Session 三种类型的集群提交任务,其他类型的集群只能查看作业信息
  • JobManager HA地址: JobManager 的 RestAPI 地址,当 HA 部署时,将可能出现的多个 RestAPI 地址均写入,且采用英文逗号隔开
  • 注释: 自定义

2)、Flink On Yarn注册集群实例、配置管理




将Flink/lib中的jar上传到hdfs

参数配置说明:

  • 类型: 支持 Flink on Yarn 和 Flink on Kubernetes
  • hadoop 配置
    • 配置文件路径: hadoop 配置文件路径,指定配置文件路径(末尾无/),需要包含以下文件:core-site.xml,hdfs-site.xml,yarn-site.xml
    • 自定义配置(高优先级,目前不生效,请跳过)
      • ha.zookeeper.quorum: zookeeper 访问地址
      • 其他配置: hadoop 的其他参数配置(默认不填写)
  • Flink 配置
    • lib 路径: 指定 lib 的 hdfs 路径(末尾无/),需要包含 Flink 运行时的依赖
    • 配置文件路径: 指定 flink-conf.yaml 的具体路径(末尾无/),必填
    • 自定义配置(高优先级): Flink参数配置
  • 基本配置
    • 标识: 唯一英文标识(必填)
    • 名称: 自定义,默认同标识
    • 注释: 自定义
    • 是否启用: 默认禁用,需要开启

3)、Yarn Per-job注册集群实例


4)、Yarn Application注册集群实例

四、作业开发

1、emp的Demo

1)、新建作业

CREATE TABLE employees_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    emp_no int NOT NULL,
    birth_date date,
    first_name STRING,
    last_name STRING,
    gender STRING,
    hire_date date,
    PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '10.9.70.172','port' = '3306','username' = 'root','password' = 'Stwc.2wsx','database-name' = 'emp_[0-9]+','table-name' = 'employees_[0-9]+');
CREATE TABLE cdc_doris_sink (
	emp_no       int,
	birth_date   STRING,
	first_name   STRING,
	last_name    STRING,
	gender       STRING,
	hire_date    STRING,
	database_name STRING,
	table_name    STRING
) 
WITH ('connector' = 'doris','fenodes' = '10.9.70.87:8030','table.identifier' = 'demo.all_employees_info','username' = 'root','password' = 'Stwc.2wsx','sink.properties.two_phase_commit'='true','sink.label-prefix'='doris_demo_emp_006');
insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date,database_name,table_name)select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date ,database_name,table_name from employees_source;

2)、检查语句


3)、作业配置


4)、执行配置

2、整库备份

Flink 版本区分

:::danger

Flink 版本区分

:::
:::danger
目前 dlink-client-1.14 内的整库同步能力最多且主要维护,如果要使用其他 flink 版本的整库同步,如果 SQLSink 不满足需求,需要DataStreamSink 支持,请手动仿照 dlink-client-1.14 扩展相应代码实现,很简单。
:::
下载源码


修改项目根目录的POM

<plugin><groupId>com.diffplug.spotless</groupId><artifactId>spotless-maven-plugin</artifactId><version>${spotless.version}</version><configuration><!-- optional: limit format enforcement to just the files changed by this feature branch --><ratchetFrom>HEAD</ratchetFrom><java><eclipse><file>style/spotless_dlink_formatter.xml</file></eclipse><removeUnusedImports/><importOrder><order>\#,com.dlink,org.apache,java,javassist,javax,org,com,</order></importOrder><licenseHeader><file>style/license_header</file></licenseHeader><replaceRegex><name>Remove wildcard imports</name><searchRegex>import\s+[^\*\s]+\*;(\r\n|\r|\n)</searchRegex><replacement>$1</replacement></replaceRegex></java><pom><sortPom><encoding>UTF-8</encoding><nrOfIndentSpace>4</nrOfIndentSpace><keepBlankLines>true</keepBlankLines><indentBlankLines>false</indentBlankLines><indentSchemaLocation>true</indentSchemaLocation><spaceBeforeCloseEmptyElement>true</spaceBeforeCloseEmptyElement><sortModules>false</sortModules><sortExecutions>false</sortExecutions><predefinedSortOrder>custom_1</predefinedSortOrder><expandEmptyElements>false</expandEmptyElements><sortProperties>false</sortProperties></sortPom><replace><name>Leading blank line</name><search>project</search><replacement>project</replacement></replace></pom><markdown><includes><include>**/*.md</include></includes><flexmark/></markdown></configuration><executions><execution><!--                        <goals>--><!--                            <goal>check</goal>--><!--                        </goals>--><phase>compile</phase></execution></executions></plugin>

将下列代码注释

<goals><goal>check</goal></goals>

只将cdc的代码与1.14的合并代码


mvn install
打包好的dlink-client-1.17-0.7.3.jar放入Flink\lib下
替换目录:::warning
172、166、168
/opt/software/flink/flink-1.17.0/lib/
/home/dlink/plugins/flink1.17/dinky/
hdfs:///flink/lib
:::
重启dlink

cd /home/dlink/
#启动sh auto.sh start

1)、依赖上传

由于 CDCSOURCE 是 Dinky 封装的新功能,Apache Flink 源码不包含,非 Application 模式提交需要在远程 Flink 集群所使用的依赖里添加一下依赖:

将下面 Dinky根目录下 整库同步依赖包放置 $FLINK_HOME/lib下

lib/dlink-client-base- v e r s i o n . j a r l i b / d l i n k − c o m m o n − {version}.jar lib/dlink-common- v ers i o n . ja r l ib / d l ink co mm o n {version}.jar
plugins/flink- f l i n k − v e r s i o n / d l i n k − c l i e n t − {flink-version}/dlink-client- f l ink v ers i o n / d l ink c l i e n t {version}.jar



将这三个包放到3台服务器Flink的lib下

别忘了将包放到hdfs中

2)、Application 模式提交

目前已经支持 application ,需提前准备好相关jar包,或者和 add jar语法并用。以 mysqlcdc-2.3.0 和 flink-1.14 为例,需要以下 jar

  • flink-shaded-guava-18.0-13.0.jar
  • HikariCP-4.0.3.jar
  • druid-1.2.8.jar
  • dlink-metadata-mysql-0.7.2.jar
  • dlink-metadata-base-0.7.2.jar
  • jackson-datatype-jsr310-2.13.4.jar
  • flink-sql-connector-mysql-cdc-2.3.0.jar
  • dlink-client-1.14-0.7.2.jar
  • cdcsource_example.png
    :::danger
    注意:
    一个 FlinkSQL 任务只能写一个 CDCSOURCE,CDCSOURCE 前可写 set、add jar 和 ddl 语句。
    配置项中的英文逗号前不能加空格,需要紧随右单引号。
    :::

3)、数据源管理

注册中心 > 数据源管理,点击 新建 即可。

名称: 输入英文唯一标识
别名: 自定义,默认同名称
分组类型: 包括来源、数仓、应用、备份、其他
url: 数据库连接地址,如 jdbc:mysql://127.0.0.1:3306/dlink
用户名: 连接数据库的用户名
密码: 连接数据库的密码
Flink 连接配置: 避免私密信息泄露,同时作为全局变量复用连接配置,在FlinkSQL中可使用 名称 ∗ ∗ 来加载连接配置,如 ∗ ∗ {名称}** 来加载连接配置,如 ** 名称 来加载连接配置,如 {ods} 。说明:名称指的是英文唯一标识,即如图所示的名称。注意需要开启全局变量(原片段机制)
Flink 连接模板: Flink 连接模板作用是为生成 FlinkSQL DDL 而扩展的功能。其中 s c h e m a N a m e ∗ ∗ 动态获取数据库, ∗ ∗ {schemaName}** 动态获取数据库,** sc h e ma N am e 动态获取数据库, {tableName} 动态获取表名称。更多参数请参考
注释: 自定义
是否启用: 默认禁用,需要开启


4)、整库同步到Doris

doris建表时开启light_schema_change
如:

CREATETABLE`employees_1`(`emp_no`intNOTNULL,`birth_date`varchar(50),`first_name`varchar(50))UNIQUEKEY(`emp_no`)DISTRIBUTEDBYHASH(`emp_no`) BUCKETS 1
PROPERTIES ("replication_allocation"="tag.location.default: 1","light_schema_change"="true");

Doris 的 Flink 连接器参数随版本变化较大,以下为 Doris 1.2.0 版本的参数配置。
每次提交作业都需要手动修改 ‘sink.sink.label-prefix’ = ‘ KaTeX parse error: Expected group after '_' at position 13: {schemaName}_̲ {tableName}_1’ 的值,比如改变尾部的数值。

EXECUTE CDCSOURCE demo_doris WITH('connector'='mysql-cdc','hostname'='10.9.70.96','port'='3306','username'='root','password'='Stwc.2wsx','checkpoint'='10000','scan.startup.mode'='initial','parallelism'='1','database-name'='production_management','table-name'='production_management\..*','sink.connector'='doris','sink.fenodes'='10.9.70.87:8030','sink.username'='root','sink.password'='Stwc.2wsx','sink.doris.batch.size'='1000','sink.sink.max-retries'='1','sink.sink.batch.interval'='60000','sink.sink.db'='production_management','sink.sink.properties.format'='json','sink.sink.properties.read_json_by_line'='true','sink.table.identifier'='${schemaName}.${tableName}','sink.sink.label-prefix'='${schemaName}_${tableName}_1');

参数说明’sink.sink.label-prefix’ = ‘ KaTeX parse error: Expected group after '_' at position 13: {schemaName}_̲ {tableName}_1’
不写也可以,会自动添加随机值

字段模式演变

自动同步列新增和删除列,库表名需要与源库相同。

EXECUTE CDCSOURCE psc_doris_schema_evolution WITH('connector'='mysql-cdc','hostname'='10.9.70.96','port'='3306','username'='root','password'='Stwc.2wsx','checkpoint'='10000','scan.startup.mode'='initial','parallelism'='1','database-name'='production_management','table-name'='production_management\..*','sink.connector'='datastream-doris-schema-evolution','sink.fenodes'='10.9.70.87:8030','sink.username'='root','sink.password'='Stwc.2wsx','sink.doris.batch.size'='1000','sink.sink.max-retries'='1','sink.sink.batch.interval'='60000','sink.sink.db'='production_management','sink.table.prefix'='ODS_''sink.table.identifier'='${schemaName}.${tableName}');

[参考]

1、DLink部署文档

本文标签: 路径 系统 支持