Dinky CDCSOURCE 整库同步踩坑记录(MySQL数据通过CDCSOURCE同步到Doris以及Kafka中)
目前通过 FlinkCDC 进行会存在诸多问题,如需要定义大量的 DDL 和编写大量的 INSERT INTO,更为严重的是会占用大量的数据库连接,对 Mysql 和网络造成压力。
Dinky 定义了 CDCSOURCE 整库同步的语法,该语法和 CDAS 作用相似,可以直接自动构建一个整库入仓入湖的实时任务,并且对 source 进行了合并,不会产生额外的 Mysql 及网络压力,支持对任意 sink 的同步,如 kafka、doris、hudi、jdbc 等等
最近,我接到一个需求。原本其他同事采用 FlinkCDC 编写的脚本,运行一段时间之后,就会自动挂掉,反正各种问题,于是乎,领导叫我要用CDCSOURCE整库同步的方式进行重写,但是我从来没用过这个玩意,而且我只是一个Java程序员,不懂大数据的玩意,但是来活了只能开干。大概的需求是:通过binlog监控MySQL数据变更,然后通过CDCSOURCE监控指定的表数据,然后插入到Doris数据库,同时发送到Kafka的MQ消息中,后面再用Java开发Flink实时计算逻辑处理MQ里面的数据。本文主要是讲CDCSOURCE这个部分,不讲Java部分。
开始之前,必须吐槽Dinky官方的技术文档,按照官方文档DEMO执行的脚本,根本无法执行成功,巨坑巨坑巨坑……因为踩了很多坑,今天才终于把问题解决,所以发个文章记录下,以下是遇到我问题,以及解决方法:
1、按照Dinky官方CDCSOURCE Demo源码示例去写,一直无法把数据发送到kafka,最终发现是官方dinky代码有问题(0.7.4、1.0.0版本都有这个问题)
官方提供的demo脚本,查看详情 http://www.dlink.top/docs/0.7/data_integration_guide/cdcsource_statements
解决方法:
一开始,一直怀疑是账号密码有特殊字符需要转义,折腾了很久发现没用。最后才发现是没有配置useSSL导致,dinky 官方文档说支持设置 'jdbc.properties.useSSL' = 'false' (1.0版本才有这个key,0.7版本没有),即便是1.0,这个设置也是无效的,看了代码,发现这个设置只会在 fromSource 生效,在拼接JDBC连接的时候并不会生效导致我们一直无法连接成功,而且0.7版本报错界面没有有效错误提示(1.0版本报错日志是比较好的)。
建议大家直接去服务器目录查看完整报错日志,不然无法入手 /xxx/dlink-release-0.7.4/logs/dlink.log,这个dlink.log里面有完整的报错内容,根据报错提示,然后修改dinky官方源码,增加 'jdbc.properties.useSSL' 属性,问题解决~~
解决问题后,最终的脚本如上
CDCSOURCE整库同步到kafka的完整脚本如下:(不需要开启右侧面板的“全局变量”)
EXECUTE CDCSOURCE cdc_kafka_mul WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.0.10.200',
'port' = '6606',
'username' = 'user_test',
'password' = 'D#abc123',
'jdbc.properties.useSSL' = 'false',
'jdbc.properties.tinyInt1isBit' = 'false',
'checkpoint' = '5000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'database-name' = 'db_test',
'table-name' = 'db_test\.user_type',
'sink.connector' = 'datastream-kafka',
'sink.brokers' = '10.0.10.100:9092,10.0.10.101:9092,10.0.10.102:9092',
'sink.properties.transaction.timeout.ms' = '60000'
)
2、按照Dinky官方CDCSOURCE Demo源码示例去写,一直无法把数据发送到doris,Dinky这边没任何报错,但是运行的时候报错了
2024-01-31 13:01:13,478 ERROR com.dlink.cdc.AbstractSinkBuilder [] - SchameTable: db_test.user_test - Row: {"before":null,"after":{xxx} - Exception:
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Boolean
解决方法:
这个报错是在job-manager的日志里面,所以dinky那边看不到报错的,找了很久才发现。只需要如上图设置,配置 'jdbc.properties.tinyInt1isBit' = 'false' 即可解决问题。因为系统默认会自动把 tinyint 转成 Boolean 布尔类型,需要设置成 false。
CDCSOURCE整库同步到doris的完整脚本如下:(根据实际情况,决定是否开启右侧面板的“全局变量”,写脚本的时候,Dinky右侧面板“作业配置”——可以开启“全局变量”)
EXECUTE CDCSOURCE cdc_doris_mul WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.0.10.200',
'port' = '6606',
'username' = 'user_test',
'password' = 'D#abc123',
'jdbc.properties.useSSL' = 'false',
'jdbc.properties.tinyInt1isBit' = 'false',
'checkpoint' = '5000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'database-name' = 'db_test',
'table-name' = 'db_test\.user_type',
'sink.connector' = 'doris',
'sink.fenodes' = '10.0.10.300:8030',
'sink.username' = 'root',
'sink.password' = '123456',
'sink.doris.batch.size' = '1',
'sink.sink.max-retries' = '1',
'sink.sink.db' = 'bi_ods',
'sink.table.prefix' = 'ods_',
'sink.table.suffix' = '_test',
-- 如果你不想使用随机数,每次重启任务后,可以把xxx改成不同的数字,这样就不用开启“全局变量”(使用随机数必须开启全局变量)
-- 'sink.sink.label-prefix' = '#{schemaName}_#{tableName}_xxx',
'sink.sink.label-prefix' = '${idUtil.simpleUUID()}',
-- 如果你的是Dinky 0.7.4 并且还没修改源码,这里是 $ 符号,不是 # 号
-- 'sink.table.identifier' = '${schemaName}.${tableName}',
'sink.table.identifier' = '#{schemaName}.#{tableName}',
'sink.sink.enable-delete' = 'true',
'sink.sink.properties.format' ='json',
'sink.sink.properties.read_json_by_line' ='true'
)
3、按照上面的操作,成功把数据写入到kafka、doris,以为问题就可以了?重启上面的cdc任务之后,发现一直报错“Load status is Label Aleady Exists and load job finished, change you label prefix or restore from latest savepoint!”
报错内容
解决方法:
需要每次手动修改 'sink.sink.label-prefix' = 'db_test_user_test_xxx' 的值才可以重新运行(xxx 需要每次都不一样,所以可以设置成不同的数字或者不一样的随机数)。
我们参考1.0.0版本,在0.7.4版本源码新增了2个获取随机ID的方法:${idUtil.randomUUID()}(字符串中间带横向)、${idUtil.simpleUUID()}(字符串中间不带横向),写脚本的时候,Dinky右侧面板“作业配置”必须勾选且开启“全局变量”才可以使用这两个函数。至于为什么要去修改源码?因为不改的话,只能每次手动修改,比如这次把xxx改成1,下次改成2,太累了……
官方 1.0.0 版本新增的随机函数,查看详情 http://www.dlink.top/docs/next/user_guide/register_center/global_var
使用内置变量/函数,需要开启全局变量
4、新增2个获取随机ID方法之后,因为开启了“全局变量”,但是这个操作会导致FlinkSQL的局部变量失效,报错提示“The fragment of sql schemaName does not exist.”,
The fragment of sql schemaName does not exist 报错提示截图
解决方法:
出现上面的报错,是因为开启“全局变量”之后,找不到找不到“${schemaName}.${tableName}”变量。最后发现1.0.0版本为了解决这个问题,把语法改成“#{schemaName}.#{tableName}”,因此再次改0.7.4版本代码兼容处理,具体修改逻辑自行参考1.0.0源码
5、前面都是单个sink,已经全部正常了,但是如果把Dinky CDCSOURCE改成同时写入2个sink,先插入到doris,再插入到kafka有发现了报错,提示“Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='datastream-kafka'”
官方DEMO是不对的,不要按照他这个去写(Dinky 0.7.4、1.0.0 都全都不行)
前面单独使用 sink.connector' = 'doris' 或者单独使用 'sink.connector' = 'datastream-kafka' 都是可以同步数据的,但是如果需要同时使用 doris、datastream-kafka 直接报错(官方demo也是同时用这两个,超级巨坑,用这两个是不行的)
解决方法:
通过查看 /xxx/dlink-release-0.7.4/logs/dlink.log 的日志文件,可以看到报错提示“Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='datastream-kafka'”,隐藏的比较深,大家要看仔细一点。在这段英文报错的后面一点,可以看到提示:
Available factory identifiers are:
blackhole
datagen
doris
filesystem
jdbc
kafka
mysql-cdc
print
upsert-kafka
由此可见,报错的原因是,当你的sink是数组,多个写入数据源的时候,不支持 datastream-kafka,所以,如果要同时使用2个sink的话,要改成 'sink[0].connector' = 'doris'、'sink[1].connector' = 'kafka'、'sink[1].value.format' = 'debezium-json'、'sink[1].properties.bootstrap.servers' = '10.0.10.100:9092,10.0.10.101:9092,10.0.10.102:9092' 才可以
CDCSOURCE整库同步到doris后,再同步到kafka的完整脚本如下:(必须开启右侧面板的“全局变量”):
EXECUTE CDCSOURCE cdc_doris_kafka_mul
WITH
(
'connector' = 'mysql-cdc',
'hostname' = '10.0.10.200',
'port' = '6606',
'username' = 'user_test', -- 这里下划线不是特殊字符,不影响的
'password' = 'D#abc123', -- 这里 # 井号也不是特殊字符,不影响的
'jdbc.properties.useSSL' = 'false',
'jdbc.properties.tinyInt1isBit' = 'false',
'jdbc.properties.serverTimezone' = 'Asia/Shanghai',
'source.server-time-zone' = 'Asia/Shanghai',
'checkpoint' = '5000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
-- 'database-name' = 'db_test',
'table-name' = 'db_test\.user_type',
'sink[0].connector' = 'doris',
'sink[0].fenodes' = '10.0.10.300:8030',
'sink[0].username' = 'root',
'sink[0].password' = '123456',
'sink[0].doris.batch.size' = '1',
'sink[0].sink.max-retries' = '1',
-- 'sink[0].sink.batch.interval' = '60000',
'sink[0].sink.db' = 'bi_ods',
'sink[0].table.prefix' = 'ods_',
-- 'sink[0].table.suffix' = '_test',
-- 'sink[0].table.upper' = 'false',
'sink[0].sink.label-prefix' = '${idUtil.simpleUUID()}',
'sink[0].table.identifier' = '#{schemaName}.#{tableName}',
'sink[0].sink.enable-delete' = 'true',
'sink[1].connector' = 'kafka',
-- 'sink[1].topic' = 'db_test_user_type_test',
'sink[1].topic' = '#{schemaName}_#{tableName}',
'sink[1].value.format' = 'debezium-json',
-- 'sink[1].brokers' = '10.0.10.100:9092,10.0.10.101:9092,10.0.10.102:9092',
'sink[1].properties.bootstrap.servers' = '10.0.10.100:9092,10.0.10.101:9092,10.0.10.102:9092',
'sink[1].properties.transaction.timeout.ms' = '60000',
-- 'sink[1].properties.transactional.id' = '${idUtil.randomUUID()}',
)
6、CDCSOURCE整库同步,发现JDBC无法设置时区,而且kafka mq获取的时间数据快了8小时
kafka mq消息里面的json内容,这个是修复后的正常时间
解决方法:
参考前面的增加useSSL解决方案,修改0.7.4源码增加 jdbc.properties.serverTimezone,其实加上之后之后jdbc是生效了,但是mq看到的时间数据依然是快了8个小时,最后把 datastream-kafka 改成 'sink.connector' = 'kafka' 之后才解决(也就是上面脚本所示)
7、测试环境kafka运行一段时间会异常,checkpoint相关报错(生产环境没遇到这个问题)
解决方法:改成上面的 'sink.connector' = 'kafka' 之后,这个问题也正常了(如果是 kafka 类型不要配置这个 'sink[1].properties.transactional.id' = '${idUtil.randomUUID()}',但如果是 datastream-kafka 类型,则需要配置)
8、汇总说明,上面提及的修改dinky 0.7.4 源码,其实 1.0.0 也有一些bug要自己去改源码的,主要修改源码的模块如下:
1)修复时间戳格式问题(dlink-client)
2)FlinkSQL支持useSSL配置(dlink-client)
3)全局变量增加随机数ID(dlink-executor)
4)#{schemaName}、#{tableName}、#{pkList} 参数改为跟1.0版本一样使用#{}占位符(dlink-common)
5)增加时区配置(dlink-client)
注意:上面这些.jar包,有些时候除了更新dinky目录之外,还要更新Kubernetes Session上面的lib目录,别漏了
All by flydoos 2024-02-02
版权声明:
小编:牛A与牛C之间
链接:https://www.wuleba.com/2529.html
来源:吾乐吧软件站
本站资源仅供个人学习交流,请于下载后 24 小时内删除,不允许用于商业用途,否则法律问题自行承担。
下载说明:请不要相信网盘那边的广告,不要下载APP,普通下载就完事了!
共有 0 条评论