Dinky CDCSOURCE 整库同步踩坑记录(MySQL数据通过CDCSOURCE同步到Doris以及Kafka中)

Dinky CDCSOURCE 整库同步踩坑记录(MySQL数据通过CDCSOURCE同步到Doris以及Kafka中)插图

目前通过 FlinkCDC 进行会存在诸多问题,如需要定义大量的 DDL 和编写大量的 INSERT INTO,更为严重的是会占用大量的数据库连接,对 Mysql 和网络造成压力。

Dinky 定义了 CDCSOURCE 整库同步的语法,该语法和 CDAS 作用相似,可以直接自动构建一个整库入仓入湖的实时任务,并且对 source 进行了合并,不会产生额外的 Mysql 及网络压力,支持对任意 sink 的同步,如 kafka、doris、hudi、jdbc 等等

最近,我接到一个需求。原本其他同事采用 FlinkCDC 编写的脚本,运行一段时间之后,就会自动挂掉,反正各种问题,于是乎,领导叫我要用CDCSOURCE整库同步的方式进行重写,但是我从来没用过这个玩意,而且我只是一个Java程序员,不懂大数据的玩意,但是来活了只能开干。大概的需求是:通过binlog监控MySQL数据变更,然后通过CDCSOURCE监控指定的表数据,然后插入到Doris数据库,同时发送到Kafka的MQ消息中,后面再用Java开发Flink实时计算逻辑处理MQ里面的数据。本文主要是讲CDCSOURCE这个部分,不讲Java部分。

开始之前,必须吐槽Dinky官方的技术文档,按照官方文档DEMO执行的脚本,根本无法执行成功,巨坑巨坑巨坑……因为踩了很多坑,今天才终于把问题解决,所以发个文章记录下,以下是遇到我问题,以及解决方法:

1、按照Dinky官方CDCSOURCE Demo源码示例去写,一直无法把数据发送到kafka,最终发现是官方dinky代码有问题(0.7.4、1.0.0版本都有这个问题)

Dinky CDCSOURCE 整库同步踩坑记录(MySQL数据通过CDCSOURCE同步到Doris以及Kafka中)插图1

官方提供的demo脚本,查看详情 http://www.dlink.top/docs/0.7/data_integration_guide/cdcsource_statements

解决方法:

一开始,一直怀疑是账号密码有特殊字符需要转义,折腾了很久发现没用。最后才发现是没有配置useSSL导致,dinky 官方文档说支持设置 'jdbc.properties.useSSL' = 'false' (1.0版本才有这个key,0.7版本没有),即便是1.0,这个设置也是无效的,看了代码,发现这个设置只会在 fromSource 生效,在拼接JDBC连接的时候并不会生效导致我们一直无法连接成功,而且0.7版本报错界面没有有效错误提示(1.0版本报错日志是比较好的)。

建议大家直接去服务器目录查看完整报错日志,不然无法入手 /xxx/dlink-release-0.7.4/logs/dlink.log,这个dlink.log里面有完整的报错内容,根据报错提示,然后修改dinky官方源码,增加 'jdbc.properties.useSSL' 属性,问题解决~~

Dinky CDCSOURCE 整库同步踩坑记录(MySQL数据通过CDCSOURCE同步到Doris以及Kafka中)插图2

解决问题后,最终的脚本如上

CDCSOURCE整库同步到kafka的完整脚本如下:(不需要开启右侧面板的“全局变量”)

EXECUTE CDCSOURCE cdc_kafka_mul WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '10.0.10.200',
 'port' = '6606',
 'username' = 'user_test',
 'password' = 'D#abc123',
 'jdbc.properties.useSSL' = 'false',
 'jdbc.properties.tinyInt1isBit' = 'false',
 'checkpoint' = '5000',
 'scan.startup.mode' = 'initial',
 'parallelism' = '1',
 'database-name' = 'db_test',
 'table-name' = 'db_test\.user_type',
 'sink.connector' = 'datastream-kafka',
 'sink.brokers' = '10.0.10.100:9092,10.0.10.101:9092,10.0.10.102:9092',
 'sink.properties.transaction.timeout.ms' = '60000'
)

2、按照Dinky官方CDCSOURCE Demo源码示例去写,一直无法把数据发送到doris,Dinky这边没任何报错,但是运行的时候报错了

2024-01-31 13:01:13,478 ERROR com.dlink.cdc.AbstractSinkBuilder [] - SchameTable: db_test.user_test - Row: {"before":null,"after":{xxx} - Exception:
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Boolean

解决方法:

这个报错是在job-manager的日志里面,所以dinky那边看不到报错的,找了很久才发现。只需要如上图设置,配置 'jdbc.properties.tinyInt1isBit' = 'false' 即可解决问题。因为系统默认会自动把 tinyint 转成 Boolean 布尔类型,需要设置成 false。

CDCSOURCE整库同步到doris的完整脚本如下:(根据实际情况,决定是否开启右侧面板的“全局变量”,写脚本的时候,Dinky右侧面板“作业配置”——可以开启“全局变量”)

EXECUTE CDCSOURCE cdc_doris_mul WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '10.0.10.200',
 'port' = '6606',
 'username' = 'user_test',
 'password' = 'D#abc123',
 'jdbc.properties.useSSL' = 'false',
 'jdbc.properties.tinyInt1isBit' = 'false',
 'checkpoint' = '5000',
 'scan.startup.mode' = 'initial',
 'parallelism' = '1',
 'database-name' = 'db_test',
 'table-name' = 'db_test\.user_type',
 'sink.connector' = 'doris',
 'sink.fenodes' = '10.0.10.300:8030',
 'sink.username' = 'root',
 'sink.password' = '123456',
 'sink.doris.batch.size' = '1',
 'sink.sink.max-retries' = '1',
 'sink.sink.db' = 'bi_ods',
 'sink.table.prefix' = 'ods_',
 'sink.table.suffix' = '_test',
--  如果你不想使用随机数,每次重启任务后,可以把xxx改成不同的数字,这样就不用开启“全局变量”(使用随机数必须开启全局变量)
--  'sink.sink.label-prefix' = '#{schemaName}_#{tableName}_xxx',
 'sink.sink.label-prefix' = '${idUtil.simpleUUID()}',
--  如果你的是Dinky 0.7.4 并且还没修改源码,这里是 $ 符号,不是 # 号
--  'sink.table.identifier' = '${schemaName}.${tableName}',
 'sink.table.identifier' = '#{schemaName}.#{tableName}',
 'sink.sink.enable-delete' = 'true',
 'sink.sink.properties.format' ='json',
 'sink.sink.properties.read_json_by_line' ='true'
)

3、按照上面的操作,成功把数据写入到kafka、doris,以为问题就可以了?重启上面的cdc任务之后,发现一直报错“Load status is Label Aleady Exists and load job finished, change you label prefix or restore from latest savepoint!”

Dinky CDCSOURCE 整库同步踩坑记录(MySQL数据通过CDCSOURCE同步到Doris以及Kafka中)插图3

报错内容

解决方法:

需要每次手动修改 'sink.sink.label-prefix' = 'db_test_user_test_xxx' 的值才可以重新运行(xxx 需要每次都不一样,所以可以设置成不同的数字或者不一样的随机数)。

我们参考1.0.0版本,在0.7.4版本源码新增了2个获取随机ID的方法:${idUtil.randomUUID()}(字符串中间带横向)、${idUtil.simpleUUID()}(字符串中间不带横向),写脚本的时候,Dinky右侧面板“作业配置”必须勾选且开启“全局变量”才可以使用这两个函数。至于为什么要去修改源码?因为不改的话,只能每次手动修改,比如这次把xxx改成1,下次改成2,太累了……

Dinky CDCSOURCE 整库同步踩坑记录(MySQL数据通过CDCSOURCE同步到Doris以及Kafka中)插图4

官方 1.0.0 版本新增的随机函数,查看详情 http://www.dlink.top/docs/next/user_guide/register_center/global_var

Dinky CDCSOURCE 整库同步踩坑记录(MySQL数据通过CDCSOURCE同步到Doris以及Kafka中)插图5

使用内置变量/函数,需要开启全局变量

4、新增2个获取随机ID方法之后,因为开启了“全局变量”,但是这个操作会导致FlinkSQL的局部变量失效,报错提示“The fragment of sql schemaName does not exist.”,

Dinky CDCSOURCE 整库同步踩坑记录(MySQL数据通过CDCSOURCE同步到Doris以及Kafka中)插图6

The fragment of sql schemaName does not exist 报错提示截图

解决方法:

出现上面的报错,是因为开启“全局变量”之后,找不到找不到“${schemaName}.${tableName}”变量。最后发现1.0.0版本为了解决这个问题,把语法改成“#{schemaName}.#{tableName}”,因此再次改0.7.4版本代码兼容处理,具体修改逻辑自行参考1.0.0源码

5、前面都是单个sink,已经全部正常了,但是如果把Dinky CDCSOURCE改成同时写入2个sink,先插入到doris,再插入到kafka有发现了报错,提示“Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='datastream-kafka'”

Dinky CDCSOURCE 整库同步踩坑记录(MySQL数据通过CDCSOURCE同步到Doris以及Kafka中)插图7

官方DEMO是不对的,不要按照他这个去写(Dinky 0.7.4、1.0.0 都全都不行)

前面单独使用 sink.connector' = 'doris' 或者单独使用 'sink.connector' = 'datastream-kafka' 都是可以同步数据的,但是如果需要同时使用 doris、datastream-kafka 直接报错(官方demo也是同时用这两个,超级巨坑,用这两个是不行的)

解决方法:

通过查看 /xxx/dlink-release-0.7.4/logs/dlink.log 的日志文件,可以看到报错提示“Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='datastream-kafka'”,隐藏的比较深,大家要看仔细一点。在这段英文报错的后面一点,可以看到提示:

Available factory identifiers are:
blackhole
datagen
doris
filesystem
jdbc
kafka
mysql-cdc
print
upsert-kafka

由此可见,报错的原因是,当你的sink是数组,多个写入数据源的时候,不支持 datastream-kafka,所以,如果要同时使用2个sink的话,要改成 'sink[0].connector' = 'doris'、'sink[1].connector' = 'kafka'、'sink[1].value.format' = 'debezium-json'、'sink[1].properties.bootstrap.servers' = '10.0.10.100:9092,10.0.10.101:9092,10.0.10.102:9092' 才可以

CDCSOURCE整库同步到doris后,再同步到kafka的完整脚本如下:(必须开启右侧面板的“全局变量”):

EXECUTE CDCSOURCE cdc_doris_kafka_mul
WITH
  (
    'connector' = 'mysql-cdc',
    'hostname' = '10.0.10.200',
    'port' = '6606',
    'username' = 'user_test', -- 这里下划线不是特殊字符,不影响的
    'password' = 'D#abc123',  -- 这里 # 井号也不是特殊字符,不影响的
    'jdbc.properties.useSSL' = 'false',
    'jdbc.properties.tinyInt1isBit' = 'false',
    'jdbc.properties.serverTimezone' = 'Asia/Shanghai',
    'source.server-time-zone' = 'Asia/Shanghai',
    'checkpoint' = '5000',
    'scan.startup.mode' = 'initial',
    'parallelism' = '1',
    --  'database-name' = 'db_test',
    'table-name' = 'db_test\.user_type',

    'sink[0].connector' = 'doris',
    'sink[0].fenodes' = '10.0.10.300:8030',
    'sink[0].username' = 'root',
    'sink[0].password' = '123456',
    'sink[0].doris.batch.size' = '1',
    'sink[0].sink.max-retries' = '1',
    --  'sink[0].sink.batch.interval' = '60000',
    'sink[0].sink.db' = 'bi_ods',
    'sink[0].table.prefix' = 'ods_',
    --   'sink[0].table.suffix' = '_test',
    --  'sink[0].table.upper' = 'false',
    'sink[0].sink.label-prefix' = '${idUtil.simpleUUID()}',
    'sink[0].table.identifier' = '#{schemaName}.#{tableName}',
    'sink[0].sink.enable-delete' = 'true',

    'sink[1].connector' = 'kafka',
    --  'sink[1].topic' = 'db_test_user_type_test',
    'sink[1].topic' = '#{schemaName}_#{tableName}',
    'sink[1].value.format' = 'debezium-json',
    --  'sink[1].brokers' = '10.0.10.100:9092,10.0.10.101:9092,10.0.10.102:9092',
    'sink[1].properties.bootstrap.servers' = '10.0.10.100:9092,10.0.10.101:9092,10.0.10.102:9092',
    'sink[1].properties.transaction.timeout.ms' = '60000',
    --  'sink[1].properties.transactional.id' = '${idUtil.randomUUID()}',
  )

6、CDCSOURCE整库同步,发现JDBC无法设置时区,而且kafka mq获取的时间数据快了8小时

Dinky CDCSOURCE 整库同步踩坑记录(MySQL数据通过CDCSOURCE同步到Doris以及Kafka中)插图8

kafka mq消息里面的json内容,这个是修复后的正常时间

解决方法:

参考前面的增加useSSL解决方案,修改0.7.4源码增加 jdbc.properties.serverTimezone,其实加上之后之后jdbc是生效了,但是mq看到的时间数据依然是快了8个小时,最后把 datastream-kafka 改成 'sink.connector' = 'kafka' 之后才解决(也就是上面脚本所示)

7、测试环境kafka运行一段时间会异常,checkpoint相关报错(生产环境没遇到这个问题)

解决方法:改成上面的 'sink.connector' = 'kafka' 之后,这个问题也正常了(如果是 kafka 类型不要配置这个 'sink[1].properties.transactional.id' = '${idUtil.randomUUID()}',但如果是 datastream-kafka 类型,则需要配置)

8、汇总说明,上面提及的修改dinky 0.7.4 源码,其实 1.0.0 也有一些bug要自己去改源码的,主要修改源码的模块如下:

1)修复时间戳格式问题(dlink-client)
2)FlinkSQL支持useSSL配置(dlink-client)
3)全局变量增加随机数ID(dlink-executor)
4)#{schemaName}、#{tableName}、#{pkList} 参数改为跟1.0版本一样使用#{}占位符(dlink-common)
5)增加时区配置(dlink-client)

注意:上面这些.jar包,有些时候除了更新dinky目录之外,还要更新Kubernetes Session上面的lib目录,别漏了

All by flydoos 2024-02-02

版权声明:
小编:牛A与牛C之间
链接:https://www.wuleba.com/2529.html
来源:吾乐吧软件站
本站资源仅供个人学习交流,请于下载后 24 小时内删除,不允许用于商业用途,否则法律问题自行承担。

THE END
分享
二维码
打赏
< <上一篇
下一篇>>

下载说明:请不要相信网盘那边的广告,不要下载APP,普通下载就完事了!