如何实现分布式消息同步? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
molvqingtai
V2EX    程序员

如何实现分布式消息同步?

  •  
  •   molvqingtai
    molvqingtai 348 天前 3038 次点击
    这是一个创建于 348 天前的主题,其中的信息可能已经有所发展或是发生改变。

    前段时间写了一个浏览器扩展,详见: t/1076581?p=1#reply25

    有一个痛点,因为所有消息都储存在本地,导致无法接收离线消息,例如:

    现在有 UserA 、UserB 、UserC 、UserD 4 个用户,A 、B 、C 在线,D 离线

    时间点 1:UserA 、UserB 、UserC 三个用户聊天,产生 3 条信息 “message-1-A, message-1-B, message-1-C”

    时间点 2:UserC 离线退出聊天,UserA 与 UserB 继续聊天,产生 2 条信息:“message-2-A, message-2-B”

    时间点 3:UserC 、UserD 上线加入聊天,那么此时,时间点 1 和时间点 2 的聊天信息需要同步给 UserC 和 User D

    UserA 、UserB 拥有所有信息记录 5 条, 无需同步

    UserC 本地记录 3 条,需要同步时间点 2 ,同步 2 条

    UserD 本地无记录,需要同步时间点 1+2 同步 5 条

    如上,我了解一些分布式同步的解决方案,etcd 、raft 等,奈何太菜看得一脸懵逼,上手太复杂 目前想到的解决方案就是,A 、B 、C 、D 广播自己所有的消息记录,然后各自接收,通过消息的时间戳 Diff, 然后同步时根据 Diff 出的差值,追加或插入。

    1. 性能问题:是否可以避免广播所有的消息记录,能否做到只广播其他用户需要同步的记录?
    2. 同步信息过多:因为插件的消息时储存在浏览器 IndexDB 中,一方面是有大小限制,感觉没有必要同步所有的历史消息,比如可以设置一个只同步 7 天 30 天..的聊天记录,或许有更好的方案?
    3. 消息数据结构:我现在只是简单的使用一个带时间戳的 List 来存储消息,要实现上面的功能是否有更好的存储方案,比如链表等?
    第 1 条附言    347 天前
    解答纯 P2P 不能同步的疑惑,其实目前已经实现了在线用户数量显示

    大概原理:

    A 、B 、C 、D ,4 个用户,D 离线,当 D 上线时,会给所有在线的 Peer 节点发送 join 事件,然后 A 、B 、C 接收到某个 D 的 join 事件后,将自己的 userId 发送给 D ,就是 D 就能知道 ABC 在线,同时 ABC 也知道 D 在线,离开同理,以上逻辑只适用于本地自定义 userId 不变的情况下

    每加入一个用户会和所有的用户交换信息,这样是虽然实现了在线数量的同步,同理,如果这种方式用在消息记录同步,虽然可行,网络成本太高了,毕竟在线数量只需要交换 userId ,如果全量交换消息记录数据量太大了
    38 条回复    2024-11-03 07:40:31 +08:00
    cpstar
        1
    cpstar  
       347 天前   1
    我就想知道,如果没有远端服务器,如果 ABCD 全都离线状态,那么其中一者上线后,从哪获取消息?比如,CD 提前离线,而 AB 聊的火热但之后离线,CD 再上线时必然无法获取 AB 的消息,又聊的火热,再等 AB 其中一者上线时,CD 才能获取到 AB 最后聊天的内容,然后 CD 怎么插数据?
    于是必然需要远端存储,那么就与单纯本地存储相违背了。那么,没了,南山无敌说号称“不存”,么?说根本,OP 就在讨论一个 IM 的设计原则。单纯的本地存储只能解决点对点消息,而且一旦一点离线,无法发送,这么说当年抠抠差不多这个逻辑。一旦开群聊,那势必。。。
    iintothewind
        2
    iintothewind  
       347 天前
    部署一个 mq 服务, 支持每个用户创建可存储 mq 消费的 offset 的状态的 session,
    然后每个用户根据 id 创建 session 就好了.
    可选方案 mqtt, redis-mq 都可以, 没必要上 kafka.
    单用户多个客户端消息同步, 这个说真的比较难办, web 的话你没有权限拿到每个客户端的唯一 id, 很难区分, 即便能拿到, 你还要根据客户端不同, 从消息总线里面 replay+filter, 重新同步, 比较麻烦.

    带时间戳的 list, 你为啥不用 timer series 数据库, redis 本身就是了, 何必自己设计?
    qping
        3
    qping  
       347 天前 via Android
    暂时想到两个方案
    1 要么选举出一个中心节点,如果中心节点下线,那要重新选举。

    2 所有客户端保留的都是最近 n 天的聊天记录,也就是说它们存储的历史消息都是一样的,有其他人上线时,就需要就近从任意一人那里获得历史消息

    方案一,实现复杂,如果中心节点下线那所有人都会卡顿


    方案二,如果有人改动了消息,比如在聊天时清空了本地的消息,正好有人上线和他同步,那就同步不到消息了
    qping
        4
    qping  
       347 天前 via Android
    @iintothewind 他要的是分布式消息同步,只有客户端没有服务端
    iintothewind
        5
    iintothewind  
       347 天前
    @qping 没有中心节点, 那消息有没有送达, 送达的顺序都没办法保证啊, 因为没有一个一致的版本, 难道要自己做分布式服务, 搞仲裁啊? nb, 我倒想看看, 这咋搞?
    mayli
        6
    mayli  
       347 天前
    常见的分布式问题,CAP ,节点不够就是会丢消息脑裂
    niubee1
        7
    niubee1  
       347 天前
    纯粹的 P2P 消息,是无法做到多端同步的,能够实现多端同步的,必然不是纯粹的 P2P 。
    molvqingtai
        8
    molvqingtai  
    OP
       347 天前
    @iintothewind #5 送达顺序,默认以消息发送事件为准,假设消息时及时的,延迟 0
    molvqingtai
        9
    molvqingtai  
    OP
       347 天前
    @molvqingtai 发送时间
    molvqingtai
        10
    molvqingtai  
    OP
       347 天前
    @cpstar 如果没人任何用户在线,就没有同步呀
    luckyrayyy
        11
    luckyrayyy  
       347 天前
    同步这个我感觉还好,你服务器得存了所有消息吧。每个群的消息 ID 都是递增的吧,本地登陆的时候对比一下本地消息最大值和服务端消息最大值,拉中间缺失的部分就行了?
    luckyrayyy
        12
    luckyrayyy  
       347 天前
    哦没服务端,告辞。那没人在线的时候就没有数据可以同步呀
    iintothewind
        13
    iintothewind  
       347 天前
    @molvqingtai #8 你说的是 payload 里面的时间戳吧, 我说的是实际送达每个客户端的消息的顺序和时间.
    q958951326
        14
    q958951326  
       347 天前
    外行来说个思路,我怎么感觉这个消息同步,有点类似于动态路由协议中的路由信息的交换?
    qping
        15
    qping  
       347 天前
    @molvqingtai 客户端时间是不可靠得,我把本机时间调到一年后咋整
    DsuineGP
        16
    DsuineGP  
       347 天前   2
    TL;DR 用 CRDT 算法

    原因:
    - 常用的 raft 算法无法容忍脑裂, 即 A-B / C-D 上线然后还要实现日志同步, 即使要解决脑裂也要用非常扭曲的方法;
    - CTDT 是 p2p base 的分布式一致性算法, 且你的业务场景比较简单, 只需要需要处理 retain+insert 的场景, 不涉及到 remove;
    - https://github.com/vlcn-io/cr-sqlite 这个用 crdt 做 sqllite 代理的项目应该比较适合你的业务场景
    zsxzy
        17
    zsxzy  
       347 天前
    类似区块链的共识算法 , 解决双花问题
    SilentRhythm
        18
    SilentRhythm  
       347 天前
    外行插个嘴不知道能否给 op 一些思路:
    1. 把广播推送消息内容改成广播新消息通知,如“user1 在 XXX 时间戳发送了一条新消息”,在线的用户收到通知去 user1 拉取,拉取时带上本地最新消息时间戳作为 offset 。
    2. 不按日期存,按最大消息条数存,比如最后 1024 条;
    molvqingtai
        19
    molvqingtai  
    OP
       347 天前
    @qping #15 这个好解决,从一些公开的时间服务获取时间
    codegenerator
        20
    codegenerator  
       347 天前
    无服务的话只能采用类似 gossip 协议可以实现
    但是只能最终一致,勉强能达到需求
    jimmy2024
        21
    jimmy2024  
       347 天前
    又来一个套免费方案的
    xichuhanguguan
        22
    xichuhanguguan  
       347 天前
    我有个疑问,像 p2p 用的 DHT 网络。他在加入网络的时候是需要一个知道一个在网络中的节点,向他去获取网络中其他节点信息。新用户怎么加入这个网络,邀请制吗?
    molvqingtai
        23
    molvqingtai  
    OP
       347 天前
    @jimmy2024 我的应用本来就是开源免费的,单纯的技术交流,何来免费套方案
    molvqingtai
        24
    molvqingtai  
    OP
       347 天前
    @xichuhanguguan 同一个域名就是节点,一个域名一个聊天室
    Dynesshely
        25
    Dynesshely  
       347 天前
    不引入服务器的话这个问题无解
    提前下线的用户,在无人在线的情况下登录,无法获取既有的聊天数据
    KagurazakaNyaa
        26
    KagurazakaNyaa  
       347 天前
    @xichuhanguguan 要么通过 tracker ,要么有 boostrap 节点
    molvqingtai
        27
    molvqingtai  
    OP
       347 天前
    @Dynesshely 没人在线的情况下,获取聊天记录也没意义
    seedhk
        28
    seedhk  
       347 天前
    想到一个点:
    没有服务器的情况下,如果新节点上线,如何保证他获取到的消息是未经过篡改的?
    molvqingtai
        29
    molvqingtai  
    OP
       347 天前
    @seedhk 不能保证,能做到的只能验证消息的数据格式,不符合格式就过滤掉
    molvqingtai
        30
    molvqingtai  
    OP
       347 天前
    @seedhk 我一直没有实现文件传输功能,这也是考虑的点,怕大家电脑中毒了
    Dynesshely
        31
    Dynesshely  
       347 天前
    @molvqingtai 你要解决的不就是接收不了离线消息的问题嘛
    molvqingtai
        32
    molvqingtai  
    OP
       347 天前
    @Dynesshely #31 可能我描述不太准确,应该是可以同步其他客户端本地存在的消息,但自己本地不存在的消息,要实现这个操作当然需要至少两个用户在线
    bli22ard
        33
    bli22ard  
       347 天前   1
    看过电报的群组聊天协议, 每条消息有一条 id ,这个 id 就是从 0 开始递增,每次+1 , 发消息,服务端每次+1 就可以了。对于客户端来说,它只要关注,群组 id 和 最后一条消息的 id 。客户端启动,建立 websocket 连接,带上自己的 last message id , 然后服务端,从这个 last id 开是这个客户端推送,这个同步要考虑,消息差了非常多,你需要设定一个阈值,超过这个阈值,就只取最后多少条消息。这块有个细节就是,要处理好,websocket 建立这段时间,出现新的消息,而导致的一些倒霉客户端没同步到这些最新消息的问题。你说的 etcd 这些用来实现存储一致性的,不适合你这种场景,你这种场景 mysql 就可以了,性能不够分库,就可以了。
    fano
        34
    fano  
       347 天前
    @DsuineGP 正解
    2Nfree
        35
    2Nfree  
       347 天前
    直接引入区块链吧,内容寻址、分块存储、P2P 传输、DHT 索引
    2Nfree
        36
    2Nfree  
       347 天前
    @2Nfree 但是这样还是解决不了全部节点下线消息不可用的问题
    molvqingtai
        37
    molvqingtai  
    OP
       347 天前
    @2Nfree #35 准备综合楼上的思路手撸了
    molvqingtai
        38
    molvqingtai  
    OP
       345 天前
    @all 感谢各位,目前已实现,逻辑如下:
    同步最大消息为 30 天内的历史记录,使用最后一条消息作为判断依据
    同步信息数量不一定是 30 天内所有的消息, 如果在同步之前,产生了新的时间点的信息,则不会同步


    A,B,C,D,E 5 个用户,A ,B 在线,C,D,E 离线

    A-B 聊天,产生信息两条 messageA, messageB

    A-B 离线

    C-D 上线,产生数据两条 messageC, messageD

    A-B 上线,C-D 将会给 A-B 推送两条消息 messageC 和 messageD ,但是 A-B 不会给 C-D 推送 messageA 和 messageB ,因为 C-D 的最新消息时间点比 A-B 早

    E 上线,A-B-C-D 均会给 E 推送消息 messageA, messageB, messageC, messageD

    最终结果:
    A-B 显示 4 条消息 messageC, messageD, messageA, messageB

    C-D 显示 2 条消息 messageA, messageB

    E 显示 4 条消息 messageA, messageB, messageC, messageD


    如上:
    C-D 没有同步到早于自己的消息
    一方面是,如果要全量同步 30 天,必然需要根据 30 天内的消息时间点 Diff ,然后插入,现在的实现只是增量追加,而且消息随着时间累积会越来越多

    先暂时这样,后续看看是否有必要将 30 天内的数据全量同步
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1100 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 23:10 PVG 07:10 LAX 16:10 JFK 19:10
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86