目前项目想把 mysql 中的数据同步到 elasticsearch 中,接触了几种方案,想咨询一下大家工作中都使用什么方案。
mysql 中的全量数据使用 logstash 导入 es ,然后增量的数据通过 canal 监听 binlog 转发到消息队列,然后数据服务监听详细队列,维护 es 中的数据。 [优点] :代码侵入小,轻量级使用。 [缺点] :a. logstash 导入大量数据效率低,每次任务最多只能同步 100000 条,并且最少要 1 分钟执行一次。b. logstash 先全量,canal 再接着增量,两个时间点可能会出现数据一致性问题。c. 貌似 canal 的坑不少,并且 github 好像也不怎么维护了。
flink cdc 同步 mysql 到 elasticsearch 。看了一下 demo 的演示,感觉挺强大的,完全不用写代码,基本上通过 flink sql 就可以搞定。但是尝试按照官方的教程在本地模拟一下,不知道是不是目前还不支持 es8 ,导致我的数据刚开始可以同步到 es ,但是之后的 CUD 操作都无法同步到 es ,flink 的 webui 也有错误信息。在官方的 github 上咨询了此事,目前还没有回复 https://github.com/ververica/flink-cdc-connectors/discussions/1968 。 [缺点] :网上的资料较少。
大家工作中都使用什么方案呢
![]() | 1 voidmnwzp 2023-03-08 09:29:41 +08:00 不要求实时性就根据 updatetime 每天跑个定时任务刷一次,要求强一致性就用 flink cdc |
2 GunsRose 2023-03-08 09:31:34 +08:00 肯定是 flink-cdc 好用,不过这玩意搞起来也有很多的坑 |
![]() | 5 pubby 2023-03-08 09:36:19 +08:00 via iPhone 现在用的方案是 记录 binlog 位置后先全量导入 然后从那个位置开始 canal 同步数据 |
![]() | 7 577322753 2023-03-08 09:39:54 +08:00 如果是云服务上的 db 的话,es 索引与表是一对一的话可以直接用 DTS 数据同步来做,表与 es 索引不是一对一的话,可以用 DTS 数据订阅,将 binlog 投递到 kafka 中,自己订阅 kafka 消息解析 binlog ,写到 es 中。 我们目前是用 DTS 数据订阅作为生产端 + 基于 canal Client 改造的客户端实现的 es 同步 |
![]() | 8 577322753 2023-03-08 09:41:36 +08:00 有些 DTS 工具也是支持多表写入到一个索引里的,京东云上的数据同步就是这样的,跟他们沟通过,底层也是用的 flink 那一套 |
![]() | 9 j1132888093 2023-03-08 09:42:51 +08:00 我们公司操作数据库的时候同步操作 es........ |
![]() | 10 likeme 2023-03-08 09:44:25 +08:00 @j1132888093 我也是这么做的,难道不行?(自我怀疑...) |
![]() | 11 binge921 2023-03-08 09:47:18 +08:00 别用 canal 了 我上次用 不小心把生产的 cpu 占用干满了 哭死 |
![]() | 12 jiobanma OP @j1132888093 @likeme 这样是可以的,但是涉及到一个历史数据的导入,还是比较麻烦的,当然可以写一个洗数据的接口,但是效率肯定很低。然后要在业务代码里在同步操作 es 还得梳理旧的代码和接口,太费人了。 |
![]() | 13 gitxuzan 2023-03-08 09:51:27 +08:00 cancal 提供了第三方中间件语言,对接到自己语言里面,监听解析 binlog ,然后实时批量同步 |
15 godleon 2023-03-08 09:59:54 +08:00 bboss |
![]() | 16 matrix1010 2023-03-08 10:14:28 +08:00 可以参考 elastic 官方的这篇文章,简单来说就是通过时间戳增量批量刷新: https://www.elastic.co/cn/blog/how-to-keep-elasticsearch-synchronized-with-a-relational-database-using-logstash 。 但其实可以完全不用 Logstash, 自己写个 cron 脚本按相同的逻辑更新 |
![]() | 17 vagusss 2023-03-08 10:14:33 +08:00 maxwell 好像坑比 canal 少一点 |
![]() | 18 Morriaty 2023-03-08 10:17:54 +08:00 你是 online service 还只是内部搜索需求?如果是 online service ,同步工具倒是其次,这玩意最恶心的是,你要设计好一个机制,就是每次由于字段变更、分词词库更新等问题需要重新全量索引( full index )时,你的实时索引( realtime index )不能停,因为线上还在服务,用户要一直能看到最新的。可以看看 https://engineering.carsguide.com.au/elasticsearch-zero-downtime-reindexing-e3a53000f0ac |
![]() | 19 SoulSleep 2023-03-08 10:20:32 +08:00 我们用的 canal ,你说的这俩方案我们曾经都试过.... logstash...慢,不灵活 flink cdc 还不错,不需要写代码...但是...有上手难度(对我们团队来说 canal ,我们现在线上运行了几十个实例,有 1-1 ,N-1 ,多种场景,性能,没太多资源,大概做到了 5w/s 的速度,完全满足我们的需要 我们的链路: 1-1:mysql--canal--adapter--es n-1: mysql--canal--处理程序--es |
20 terranboy 2023-03-08 10:20:38 +08:00 现在的 ORM 的 EVENT 都很完善的 在业务里面同步了 |
![]() | 21 SoulSleep 2023-03-08 10:22:45 +08:00 ![]() @Morriaty #18 对于你这个补充一下 我们在 es 里只存搜索条件+数据库主键,会有一个单独的服务用搜索条件得到各个表的数据库主键,然后在程序里聚合,查询效果对比: 1.关系型数据库,多表 join ,几亿数据,大概要 5~10 秒 2.现有方案,es+单表聚合,几亿数据,大概 0.x~2 秒 |
22 weofuh 2023-03-08 10:30:57 +08:00 可不可以历史数据用 datax ,增量数据用 canal 呢? |
![]() | 23 Aresxue 2023-03-08 10:33:40 +08:00 flink 没玩过,但是 canal 绝对是个坑,一个是 cpu 老是莫名其妙跑满,还有对主从模式很多地方都没有支持非常坑。 |
![]() | 24 barbery 2023-03-08 10:41:11 +08:00 我们也是用 canal 同步,暂时没发现有什么坑,可能是我们的数据量不大 |
26 Seulgi 2023-03-08 10:51:25 +08:00 dts 贼贵啊。 |
27 hotcool100 2023-03-08 10:55:42 +08:00 flink cdc 太吃内存和资源了,用 canal 好多了 |
![]() | 28 changdy 2023-03-08 10:57:40 +08:00 2333 看来只有我一个人用的是 debezium 吗? |
![]() | 29 CRUD 2023-03-08 11:03:03 +08:00 我们是用 canal ,mysql -> canal -> rabbitmq -> 处理服务 -> es |
![]() | 30 jiobanma OP @SoulSleep 看您的描述,整个链路都是用 canal 实现的,那历史数据方便怎么同步到 es 呢?我了解的是 canal 对全量数据的同步支持不是很好 |
31 brader 2023-03-08 11:22:48 +08:00 可以了解下这个项目 https://github.com/brokercap/Bifrost 用 go 写的,也很节省服务器资源 |
![]() | 32 potatowish 2023-03-08 11:27:01 +08:00 ![]() debezium 支持全量和增量同步,flink cdc 也是基于它 |
![]() | 33 FawkesV 2023-03-08 11:28:05 +08:00 学习了 |
![]() | 34 Desdemor 2023-03-08 11:31:14 +08:00 canal -> kafka -> 处理服务 -> es |
![]() | 35 awalkingman 2023-03-08 12:25:39 +08:00 |
![]() | 36 v2e0xAdmin2 2023-03-08 13:44:26 +08:00 @jiobanma https://github.com/colosobo/drc 我们公司用的这个项目,可以全量同步,也可以增量同步。写到 kafka 或者 mq 里,然后 flink 消费写到 es 里。 |
![]() | 37 v2e0xAdmin2 2023-03-08 13:47:15 +08:00 @SoulSleep canal 运维太麻烦了。 |
![]() | 38 QunLeLZ 2023-03-08 14:03:01 +08:00 我们现在使用的是从 pg 同步数据到 es ,中间还要做一定逻辑处理,还是比较方便的,直接写 SQL 就行了。不过我们用的是阿里的 Flink 服务,自建 Flink 的话会稍微麻烦点,但我理解应该问题也不会太大。 |
![]() | 39 wmwgijol28 2023-03-08 14:03:41 +08:00 CloudCanal 社区版 |
![]() | 40 wxw752 2023-03-08 14:04:19 +08:00 flink-cdc |
![]() | 41 j1132888093 2023-03-08 14:05:50 +08:00 @newskillsget 先写 mysql ,再写 es ,失败就回滚 mysql 。如果是写完 es 再抛出异常的话,就要手动回滚 es 了,但是我们公司这种场景目前很少,还能 cover 住 |
![]() | 42 potatowish 2023-03-08 14:14:14 +08:00 @changdy 第二个 |
43 fengpan567 2023-03-08 14:17:38 +08:00 canal 监听 binlog ,kafka 消费写入 es |
![]() | 44 noparking188 2023-03-08 14:34:37 +08:00 CDC 到队列,自己写程序或者 lambda 写 ES ,可行吗,是不是更简单一点? 我之前的公司是双写+重试,错了人肉纠正 |
![]() | 45 BQsummer 2023-03-08 14:48:36 +08:00 前段时间调研过 doris 的数据同步: canal 文档一坨屎, 很多参数得从源代码里找, 数据量大的话会有坑 flink cdc 开发最省事, 但是集群运维费事 debezium 中文文档为 0, flink cdc 最早的同步方式是通过它, 后来自己实现了一套 CloudCanal 是 starrocks 文档里推荐的, 国产, 还不开源, 不敢用 其实上云是最省力的方式 |
![]() | 46 SoulSleep 2023-03-08 14:51:09 +08:00 @jiobanma #30 历史数据 canal 支持一次性迁移,类似 etl @v2e0xAdmin2 #37 麻烦在哪儿呢?我们用 canal-admin 管理配置,指标暂时没输出到 grafana...监控主要关注业务异常,主要是稳定,基本上没出过问题 |
![]() | 47 dabai0806 2023-03-08 15:00:47 +08:00 logstash 其实是可以配置秒级同步的(这也是我碰巧试出来的 文档和网上文章都没看到有人提过) 线上几万条数据, 我配置的是 10 秒同步一次 就是同步的时候 cpu 会上飙百分之 30 左右 配置参数 schedule => "*/10 * * * * *" |
![]() | 48 dabai0806 2023-03-08 15:02:51 +08:00 logstash 维护方便些同步数据的自由度也高 写条查询 sql 就完事了, canal 就是个半成品, 难用的要死 |
![]() | 49 lueluev 2023-03-08 15:09:08 +08:00 好熟悉的问题 flume 还有人用吗 |
![]() | 50 fridaycatye 2023-03-08 15:36:24 +08:00 datax 可用,基于查询做同步,灵活 |
![]() | 51 fishtocat 2023-03-08 17:56:52 +08:00 学习了,我们现在是基于 canal 自己实现的 golang 客户端消费更新信息 |
![]() | 52 F281M6Dh8DXpD1g2 2023-03-08 17:57:50 +08:00 flink-cdc 开 ckpt 完事 |
53 ufo5260987423 2023-03-08 18:00:16 +08:00 我不知道您为啥说 logstash 效率低,不论是全量更新还是增量更新,效率都还不错。我处理过的专利数据大概几百个 G ,都能在可接受时间内完成任务。 如果需要实时更新,那么在事物里面同步操作 es 就好了。 可能我了解的还不够多,请您指正。 |
54 ffkjjj 2023-03-08 18:02:55 +08:00 debezium |
![]() | 55 awalkingman 2023-03-08 18:08:47 +08:00 @j1132888093 人工补偿兜底,很真实了。巧妙的人工介入能极大地降低系统复杂度,谓之四两拨千斤。 |
![]() | 56 v2e0xAdmin2 2023-03-08 18:11:39 +08:00 @SoulSleep 加机器麻烦 |
![]() | 57 zhangxudong 2023-03-08 18:34:21 +08:00 可以试试阿里云 DTS 原班人马搞的 cloudcanal |
![]() | 58 HunterPan 2023-03-08 19:13:18 +08:00 datax |
59 vgbhfive 2023-03-08 19:33:23 +08:00 datax 可以考虑下 |
60 GopherDaily 2023-03-08 19:53:58 +08:00 kafka connect, 如果你们已经有 kafka 的话, debezium + es sink 吧 |
61 tairan2006 2023-03-08 19:58:01 +08:00 我记得 datax 是专门搞这个的,不过我也没用过 |
![]() | 62 leeton 2023-03-08 22:04:24 +08:00 我们就是用 canal 监听 binlog 发 kafka 消息一条一条刷增量数据 |
63 ldh756034624 2023-03-09 10:04:22 +08:00 @newskillsget HaH |
![]() | 64 cirton 2023-03-09 10:05:24 +08:00 ogg 可以实时同步到 es 。之前用 ogg 同步 mysql 和 oracle ,效率非常够用,效果也很好,但是出了问题运维是个麻烦事儿。 刚看到你的 es 版本是 8 的,那就用不了了。 |
![]() | 65 Morriaty 2023-03-09 11:06:50 +08:00 @SoulSleep 我们说的应该不是一个问题。举个例子, es 里有个 title 字段,类似「高启强大规模犯罪集团终落网」,分词有问题,分成了「高启 / 强大 / 规模 / 犯罪 / 集团 / 终 / 落网」,于是我们更新了人名库,因为可能有很多历史 title 都包含这个词,需要把整个索引 reindex 一遍。 这个过程基于历史数据量不同,耗时不同,可能需要几个小时的时间。但你线上正在提供 online search service 不能停,不能等这几个小时的 reindex 。 所以就需要两个并行的 full indexing 和 realtime indexing ,以及两个链路的切换逻辑 |
![]() | 66 SoulSleep 2023-03-09 14:51:07 +08:00 @Morriaty #64 不错,我们的场景还没有这么复杂,只是加速分库的 mysql 查询效率,这么精细的需求对我们现在的场景来说,就直接刷 db 的 binlog 重新同步去做了,按照最大的效率,估计要 2 个小时左右....确实无论怎么做代价都很大.... |
67 kakawa 2024-04-28 10:44:51 +08:00 @Morriaty 所以就需要两个并行的 full indexing 和 realtime indexing ,以及两个链路的切换逻辑 不是搜索部门的 但是现在实实在在的遇到了这个问题 请问这个有什么最佳实践吗 求解 |
![]() | 68 Morriaty 2024-04-28 18:43:43 +08:00 ![]() |