mysql 数据同步 elasticsearch 方案 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
jiobanma
V2EX    程序员

mysql 数据同步 elasticsearch 方案

  •  
  •   jiobanma
    banmajio 2023-03-08 09:24:18 +08:00 7778 次点击
    这是一个创建于 949 天前的主题,其中的信息可能已经有所发展或是发生改变。

    目前项目想把 mysql 中的数据同步到 elasticsearch 中,接触了几种方案,想咨询一下大家工作中都使用什么方案。

    1. mysql 中的全量数据使用 logstash 导入 es ,然后增量的数据通过 canal 监听 binlog 转发到消息队列,然后数据服务监听详细队列,维护 es 中的数据。 [优点] :代码侵入小,轻量级使用。 [缺点] :a. logstash 导入大量数据效率低,每次任务最多只能同步 100000 条,并且最少要 1 分钟执行一次。b. logstash 先全量,canal 再接着增量,两个时间点可能会出现数据一致性问题。c. 貌似 canal 的坑不少,并且 github 好像也不怎么维护了。

    2. flink cdc 同步 mysql 到 elasticsearch 。看了一下 demo 的演示,感觉挺强大的,完全不用写代码,基本上通过 flink sql 就可以搞定。但是尝试按照官方的教程在本地模拟一下,不知道是不是目前还不支持 es8 ,导致我的数据刚开始可以同步到 es ,但是之后的 CUD 操作都无法同步到 es ,flink 的 webui 也有错误信息。在官方的 github 上咨询了此事,目前还没有回复 https://github.com/ververica/flink-cdc-connectors/discussions/1968 。 [缺点] :网上的资料较少。

    大家工作中都使用什么方案呢

    68 条回复    2024-04-28 18:43:43 +08:00
    voidmnwzp
        1
    voidmnwzp  
       2023-03-08 09:29:41 +08:00
    不要求实时性就根据 updatetime 每天跑个定时任务刷一次,要求强一致性就用 flink cdc
    GunsRose
        2
    GunsRose  
       2023-03-08 09:31:34 +08:00
    肯定是 flink-cdc 好用,不过这玩意搞起来也有很多的坑
    jiobanma
        3
    jiobanma  
    OP
       2023-03-08 09:34:58 +08:00
    @GunsRose 能举个例子吗大佬
    jiobanma
        4
    jiobanma  
    OP
       2023-03-08 09:35:19 +08:00
    @voidmnwzp 还是要求强一致性的
    pubby
        5
    pubby  
       2023-03-08 09:36:19 +08:00 via iPhone
    现在用的方案是
    记录 binlog 位置后先全量导入
    然后从那个位置开始 canal 同步数据
    577322753
        7
    577322753  
       2023-03-08 09:39:54 +08:00
    如果是云服务上的 db 的话,es 索引与表是一对一的话可以直接用 DTS 数据同步来做,表与 es 索引不是一对一的话,可以用 DTS 数据订阅,将 binlog 投递到 kafka 中,自己订阅 kafka 消息解析 binlog ,写到 es 中。
    我们目前是用 DTS 数据订阅作为生产端 + 基于 canal Client 改造的客户端实现的 es 同步
    577322753
        8
    577322753  
       2023-03-08 09:41:36 +08:00
    有些 DTS 工具也是支持多表写入到一个索引里的,京东云上的数据同步就是这样的,跟他们沟通过,底层也是用的 flink 那一套
    j1132888093
        9
    j1132888093  
       2023-03-08 09:42:51 +08:00
    我们公司操作数据库的时候同步操作 es........
    likeme
        10
    likeme  
       2023-03-08 09:44:25 +08:00
    @j1132888093 我也是这么做的,难道不行?(自我怀疑...)
    binge921
        11
    binge921  
       2023-03-08 09:47:18 +08:00
    别用 canal 了 我上次用 不小心把生产的 cpu 占用干满了 哭死
    jiobanma
        12
    jiobanma  
    OP
       2023-03-08 09:49:53 +08:00
    @j1132888093
    @likeme 这样是可以的,但是涉及到一个历史数据的导入,还是比较麻烦的,当然可以写一个洗数据的接口,但是效率肯定很低。然后要在业务代码里在同步操作 es 还得梳理旧的代码和接口,太费人了。
    gitxuzan
        13
    gitxuzan  
       2023-03-08 09:51:27 +08:00
    cancal 提供了第三方中间件语言,对接到自己语言里面,监听解析 binlog ,然后实时批量同步
    hhjswf
        14
    hhjswf  
       2023-03-08 09:54:37 +08:00
    @likeme 入侵性太强了
    godleon
        15
    godleon  
       2023-03-08 09:59:54 +08:00
    bboss
    matrix1010
        16
    matrix1010  
       2023-03-08 10:14:28 +08:00
    可以参考 elastic 官方的这篇文章,简单来说就是通过时间戳增量批量刷新: https://www.elastic.co/cn/blog/how-to-keep-elasticsearch-synchronized-with-a-relational-database-using-logstash 。 但其实可以完全不用 Logstash, 自己写个 cron 脚本按相同的逻辑更新
    vagusss
        17
    vagusss  
       2023-03-08 10:14:33 +08:00
    maxwell 好像坑比 canal 少一点
    Morriaty
        18
    Morriaty  
       2023-03-08 10:17:54 +08:00
    你是 online service 还只是内部搜索需求?如果是 online service ,同步工具倒是其次,这玩意最恶心的是,你要设计好一个机制,就是每次由于字段变更、分词词库更新等问题需要重新全量索引( full index )时,你的实时索引( realtime index )不能停,因为线上还在服务,用户要一直能看到最新的。可以看看 https://engineering.carsguide.com.au/elasticsearch-zero-downtime-reindexing-e3a53000f0ac
    SoulSleep
        19
    SoulSleep  
       2023-03-08 10:20:32 +08:00
    我们用的 canal ,你说的这俩方案我们曾经都试过....
    logstash...慢,不灵活
    flink cdc 还不错,不需要写代码...但是...有上手难度(对我们团队来说

    canal ,我们现在线上运行了几十个实例,有 1-1 ,N-1 ,多种场景,性能,没太多资源,大概做到了 5w/s 的速度,完全满足我们的需要
    我们的链路:
    1-1:mysql--canal--adapter--es

    n-1: mysql--canal--处理程序--es
    terranboy
        20
    terranboy  
       2023-03-08 10:20:38 +08:00
    现在的 ORM 的 EVENT 都很完善的 在业务里面同步了
    SoulSleep
        21
    SoulSleep  
       2023-03-08 10:22:45 +08:00   1
    @Morriaty #18 对于你这个补充一下
    我们在 es 里只存搜索条件+数据库主键,会有一个单独的服务用搜索条件得到各个表的数据库主键,然后在程序里聚合,查询效果对比:
    1.关系型数据库,多表 join ,几亿数据,大概要 5~10 秒
    2.现有方案,es+单表聚合,几亿数据,大概 0.x~2 秒
    weofuh
        22
    weofuh  
       2023-03-08 10:30:57 +08:00
    可不可以历史数据用 datax ,增量数据用 canal 呢?
    Aresxue
        23
    Aresxue  
       2023-03-08 10:33:40 +08:00
    flink 没玩过,但是 canal 绝对是个坑,一个是 cpu 老是莫名其妙跑满,还有对主从模式很多地方都没有支持非常坑。
    barbery
        24
    barbery  
       2023-03-08 10:41:11 +08:00
    我们也是用 canal 同步,暂时没发现有什么坑,可能是我们的数据量不大
    jiobanma
        25
    jiobanma  
    OP
       2023-03-08 10:45:42 +08:00
    @SoulSleep 感谢大佬
    Seulgi
        26
    Seulgi  
       2023-03-08 10:51:25 +08:00
    dts 贼贵啊。
    hotcool100
        27
    hotcool100  
       2023-03-08 10:55:42 +08:00
    flink cdc 太吃内存和资源了,用 canal 好多了
    changdy
        28
    changdy  
       2023-03-08 10:57:40 +08:00
    2333 看来只有我一个人用的是 debezium 吗?
    CRUD
        29
    CRUD  
       2023-03-08 11:03:03 +08:00
    我们是用 canal ,mysql -> canal -> rabbitmq -> 处理服务 -> es
    jiobanma
        30
    jiobanma  
    OP
       2023-03-08 11:04:10 +08:00
    @SoulSleep 看您的描述,整个链路都是用 canal 实现的,那历史数据方便怎么同步到 es 呢?我了解的是 canal 对全量数据的同步支持不是很好
    brader
        31
    brader  
       2023-03-08 11:22:48 +08:00
    可以了解下这个项目 https://github.com/brokercap/Bifrost
    用 go 写的,也很节省服务器资源
    potatowish
        32
    potatowish  
       2023-03-08 11:27:01 +08:00   1
    debezium 支持全量和增量同步,flink cdc 也是基于它
    FawkesV
        33
    FawkesV  
       2023-03-08 11:28:05 +08:00
    学习了
    Desdemor
        34
    Desdemor  
       2023-03-08 11:31:14 +08:00
    canal -> kafka -> 处理服务 -> es
    awalkingman
        35
    awalkingman  
       2023-03-08 12:25:39 +08:00
    @j1132888093
    @likeme
    那要是写 mysql 成功了,写 es 失败了,,,数据的一致性咋个维护
    v2e0xAdmin2
        36
    v2e0xAdmin2  
       2023-03-08 13:44:26 +08:00
    @jiobanma https://github.com/colosobo/drc

    我们公司用的这个项目,可以全量同步,也可以增量同步。写到 kafka 或者 mq 里,然后 flink 消费写到 es 里。
    v2e0xAdmin2
        37
    v2e0xAdmin2  
       2023-03-08 13:47:15 +08:00
    @SoulSleep canal 运维太麻烦了。
    QunLeLZ
        38
    QunLeLZ  
       2023-03-08 14:03:01 +08:00
    我们现在使用的是从 pg 同步数据到 es ,中间还要做一定逻辑处理,还是比较方便的,直接写 SQL 就行了。不过我们用的是阿里的 Flink 服务,自建 Flink 的话会稍微麻烦点,但我理解应该问题也不会太大。
    wmwgijol28
        39
    wmwgijol28  
       2023-03-08 14:03:41 +08:00
    CloudCanal 社区版
    wxw752
        40
    wxw752  
       2023-03-08 14:04:19 +08:00
    flink-cdc
    j1132888093
        41
    j1132888093  
       2023-03-08 14:05:50 +08:00
    @newskillsget 先写 mysql ,再写 es ,失败就回滚 mysql 。如果是写完 es 再抛出异常的话,就要手动回滚 es 了,但是我们公司这种场景目前很少,还能 cover 住
    potatowish
        42
    potatowish  
       2023-03-08 14:14:14 +08:00
    @changdy 第二个
    fengpan567
        43
    fengpan567  
       2023-03-08 14:17:38 +08:00
    canal 监听 binlog ,kafka 消费写入 es
    noparking188
        44
    noparking188  
       2023-03-08 14:34:37 +08:00
    CDC 到队列,自己写程序或者 lambda 写 ES ,可行吗,是不是更简单一点?
    我之前的公司是双写+重试,错了人肉纠正
    BQsummer
        45
    BQsummer  
       2023-03-08 14:48:36 +08:00
    前段时间调研过 doris 的数据同步:
    canal 文档一坨屎, 很多参数得从源代码里找, 数据量大的话会有坑
    flink cdc 开发最省事, 但是集群运维费事
    debezium 中文文档为 0, flink cdc 最早的同步方式是通过它, 后来自己实现了一套
    CloudCanal 是 starrocks 文档里推荐的, 国产, 还不开源, 不敢用
    其实上云是最省力的方式
    SoulSleep
        46
    SoulSleep  
       2023-03-08 14:51:09 +08:00
    @jiobanma #30 历史数据 canal 支持一次性迁移,类似 etl
    @v2e0xAdmin2 #37 麻烦在哪儿呢?我们用 canal-admin 管理配置,指标暂时没输出到 grafana...监控主要关注业务异常,主要是稳定,基本上没出过问题
    dabai0806
        47
    dabai0806  
       2023-03-08 15:00:47 +08:00
    logstash 其实是可以配置秒级同步的(这也是我碰巧试出来的 文档和网上文章都没看到有人提过)

    线上几万条数据, 我配置的是 10 秒同步一次 就是同步的时候 cpu 会上飙百分之 30 左右

    配置参数
    schedule => "*/10 * * * * *"
    dabai0806
        48
    dabai0806  
       2023-03-08 15:02:51 +08:00
    logstash 维护方便些同步数据的自由度也高 写条查询 sql 就完事了, canal 就是个半成品, 难用的要死
    lueluev
        49
    lueluev  
       2023-03-08 15:09:08 +08:00
    好熟悉的问题 flume 还有人用吗
    fridaycatye
        50
    fridaycatye  
       2023-03-08 15:36:24 +08:00
    datax 可用,基于查询做同步,灵活
    fishtocat
        51
    fishtocat  
       2023-03-08 17:56:52 +08:00
    学习了,我们现在是基于 canal 自己实现的 golang 客户端消费更新信息
    F281M6Dh8DXpD1g2
        52
    F281M6Dh8DXpD1g2  
       2023-03-08 17:57:50 +08:00
    flink-cdc 开 ckpt 完事
    ufo5260987423
        53
    ufo5260987423  
       2023-03-08 18:00:16 +08:00
    我不知道您为啥说 logstash 效率低,不论是全量更新还是增量更新,效率都还不错。我处理过的专利数据大概几百个 G ,都能在可接受时间内完成任务。

    如果需要实时更新,那么在事物里面同步操作 es 就好了。
    可能我了解的还不够多,请您指正。
    ffkjjj
        54
    ffkjjj  
       2023-03-08 18:02:55 +08:00
    debezium
    awalkingman
        55
    awalkingman  
       2023-03-08 18:08:47 +08:00
    @j1132888093 人工补偿兜底,很真实了。巧妙的人工介入能极大地降低系统复杂度,谓之四两拨千斤。
    v2e0xAdmin2
        56
    v2e0xAdmin2  
       2023-03-08 18:11:39 +08:00
    @SoulSleep 加机器麻烦
    zhangxudong
        57
    zhangxudong  
       2023-03-08 18:34:21 +08:00
    可以试试阿里云 DTS 原班人马搞的 cloudcanal
    HunterPan
        58
    HunterPan  
       2023-03-08 19:13:18 +08:00
    datax
    vgbhfive
        59
    vgbhfive  
       2023-03-08 19:33:23 +08:00
    datax 可以考虑下
    GopherDaily
        60
    GopherDaily  
       2023-03-08 19:53:58 +08:00
    kafka connect, 如果你们已经有 kafka 的话,
    debezium + es sink 吧
    tairan2006
        61
    tairan2006  
       2023-03-08 19:58:01 +08:00
    我记得 datax 是专门搞这个的,不过我也没用过
    leeton
        62
    leeton  
       2023-03-08 22:04:24 +08:00
    我们就是用 canal 监听 binlog 发 kafka 消息一条一条刷增量数据
    ldh756034624
        63
    ldh756034624  
       2023-03-09 10:04:22 +08:00
    cirton
        64
    cirton  
       2023-03-09 10:05:24 +08:00
    ogg 可以实时同步到 es 。之前用 ogg 同步 mysql 和 oracle ,效率非常够用,效果也很好,但是出了问题运维是个麻烦事儿。
    刚看到你的 es 版本是 8 的,那就用不了了。
    Morriaty
        65
    Morriaty  
       2023-03-09 11:06:50 +08:00
    @SoulSleep 我们说的应该不是一个问题。举个例子,

    es 里有个 title 字段,类似「高启强大规模犯罪集团终落网」,分词有问题,分成了「高启 / 强大 / 规模 / 犯罪 / 集团 / 终 / 落网」,于是我们更新了人名库,因为可能有很多历史 title 都包含这个词,需要把整个索引 reindex 一遍。

    这个过程基于历史数据量不同,耗时不同,可能需要几个小时的时间。但你线上正在提供 online search service 不能停,不能等这几个小时的 reindex 。

    所以就需要两个并行的 full indexing 和 realtime indexing ,以及两个链路的切换逻辑
    SoulSleep
        66
    SoulSleep  
       2023-03-09 14:51:07 +08:00
    @Morriaty #64 不错,我们的场景还没有这么复杂,只是加速分库的 mysql 查询效率,这么精细的需求对我们现在的场景来说,就直接刷 db 的 binlog 重新同步去做了,按照最大的效率,估计要 2 个小时左右....确实无论怎么做代价都很大....
    kakawa
        67
    kakawa  
       2024-04-28 10:44:51 +08:00
    @Morriaty 所以就需要两个并行的 full indexing 和 realtime indexing ,以及两个链路的切换逻辑
    不是搜索部门的 但是现在实实在在的遇到了这个问题 请问这个有什么最佳实践吗 求解
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5435 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 37ms UTC 01:23 PVG 09:23 LAX 18:23 JFK 21:23
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86