
在开始之前,先提供 github 地址:FlowSync
说明: 本项目是 DocFlow 项目协同编辑功能的基础实现示例。如果你想学习更完整的协同编辑解决方案(包括富文本编辑、权限管理、AI 集成等企业级功能),请访问 DocFlow 项目。
如果想参与 DocFlow 开发或者想学习更详细的 Tiptap 和 Yjs 协同原理的,可以添加我微信获取更详细的了解:yunmz777
想象一下这样的场景:多个用户同时打开同一个流程图,拖动节点、创建连接、实时看到彼此的光标位置,就像在同一块白板上协作一样流畅。这就是我们要实现的效果。这个案例会涵盖协同编辑的核心场景:数据同步、状态管理、光标协同,以及前后端的完整实现。
让我们先看看整体架构。这个协同流程图应用由三个核心部分组成:前端使用 React + ReactFlow 构建可视化流程图界面,后端使用 NestJS 搭建 WebSocket 服务器处理协同逻辑,Yjs 作为协同引擎在前后端之间同步数据。

前端负责用户交互和视图渲染,Yjs Doc 存储流程图的节点和连接数据,WebsocketProvider 负责与服务器建立连接并同步数据。后端的 WebSocket Server 接收来自多个客户端的消息,协调它们之间的数据同步,WSSharedDoc 存储每个房间的文档状态。
让我们从后端开始,因为它是整个协同系统的中枢。后端的职责很清晰:接收客户端连接,转发同步消息,管理 Awareness 状态。
我们使用 NestJS 框架搭建后端服务,整个协同逻辑封装在一个 Service 中。首先创建项目结构:

在开始编写代码前,需要安装必要的依赖:
pnpm add yjs y-protocols ws lib0 pnpm add -D @types/ws 在 src/yjs/yjs.service.ts 文件中,首先导入需要的模块,然后定义 WSSharedDoc 类:
import { Injectable, OnModuleInit, Logger } from "@nestjs/common"; import * as WebSocket from "ws"; import * as Y from "yjs"; import * as syncProtocol from "y-protocols/sync"; import * as awarenessProtocol from "y-protocols/awareness"; import * as encoding from "lib0/encoding"; import * as decoding from "lib0/decoding"; import * as map from "lib0/map"; const wsReadyStateOpen = 1; const messageSync = 0; const messageAwareness = 1; const docs: Map<string, WSSharedDoc> = new Map(); 这些导入包括:NestJS 的装饰器和日志工具,WebSocket 库用于建立连接,Yjs 核心库和协议库,lib0 提供的编解码和 Map 工具。常量定义了消息类型和连接状态。
WSSharedDoc 类继承自 Y.Doc ,是服务端维护的共享文档:
class WSSharedDoc extends Y.Doc { name: string; conns: Map<WebSocket, Set<number>>; awareness: awarenessProtocol.Awareness; constructor(name: string) { super({ gc: true }); this.name = name; this.cOnns= new Map(); this.awareness = new awarenessProtocol.Awareness(this); } } 这个类扩展了 Yjs 文档,添加了连接管理和 Awareness 支持。name 是文档的唯一标识(通常是房间名),conns 维护了当前所有连接到这个文档的 WebSocket 客户端,awareness 用于同步用户的临时状态(如光标位置)。
在同一文件中,文档管理使用一个全局 Map ,确保同一个房间的所有客户端共享同一个文档实例:
const docs: Map<string, WSSharedDoc> = new Map(); const getYDoc = (docname: string): WSSharedDoc => map.setIfUndefined(docs, docname, () => new WSSharedDoc(docname)); 接下来在 YjsService 类中实现连接处理。当客户端连接时,我们需要建立 WebSocket 连接,从 URL 中提取房间名,然后设置消息处理逻辑:
onModuleInit() { this.wss = new WebSocket.Server({ port: 1234 }); this.wss.on('connection', (conn: WebSocket, req: any) => { const url = req.url || '/'; const docName = url.slice(1).split('?')[0] || 'default'; this.setupWSConnection(conn, docName); }); this.logger.log('WebSocket server initialized on port 1234'); } 这里从请求 URL 中提取房间名,比如 ws://localhost:1234/flow-room 会提取出 flow-room 作为文档名称。这样不同的房间使用不同的文档,互不干扰。
setupWSConnection 是整个服务端的核心,它处理客户端的所有消息:
private setupWSConnection(conn: WebSocket, docName: string) { conn.binaryType = 'arraybuffer'; const doc = getYDoc(docName); doc.conns.set(conn, new Set()); conn.on('message', (message: ArrayBuffer | Buffer) => { const uint8Array = message instanceof ArrayBuffer ? new Uint8Array(message) : new Uint8Array(message.buffer, message.byteOffset, message.byteLength); const encoder = encoding.createEncoder(); const decoder = decoding.createDecoder(uint8Array); const messageType = decoding.readVarUint(decoder); switch (messageType) { case messageSync: encoding.writeVarUint(encoder, messageSync); syncProtocol.readSyncMessage(decoder, encoder, doc, null); if (encoding.length(encoder) > 1) { send(doc, conn, encoding.toUint8Array(encoder)); } break; case messageAwareness: awarenessProtocol.applyAwarenessUpdate( doc.awareness, decoding.readVarUint8Array(decoder), conn, ); break; } }); } 消息分为两种类型:Sync 消息用于同步文档数据(节点、连接等持久化数据),Awareness 消息用于同步用户状态(光标位置等临时数据)。服务端收到 Sync 消息后,使用 y-protocols 的 readSyncMessage 处理,如果有响应数据就发送回客户端。收到 Awareness 消息后,应用更新并广播给其他客户端。
新客户端连接后,需要立即发送当前文档状态,让新用户能看到已有的内容:
setImmediate(() => { if (conn.readyState === wsReadyStateOpen) { const encoder = encoding.createEncoder(); encoding.writeVarUint(encoder, messageSync); syncProtocol.writeSyncStep1(encoder, doc); send(doc, conn, encoding.toUint8Array(encoder)); const awarenessStates = doc.awareness.getStates(); if (awarenessStates.size > 0) { const awarenessEncoder = encoding.createEncoder(); encoding.writeVarUint(awarenessEncoder, messageAwareness); encoding.writeVarUint8Array( awarenessEncoder, awarenessProtocol.encodeAwarenessUpdate( doc.awareness, Array.from(awarenessStates.keys()) ) ); send(doc, conn, encoding.toUint8Array(awarenessEncoder)); } } }); 使用 setImmediate 延迟发送,确保连接完全建立。首先发送 Sync Step1 消息,包含当前文档的完整状态向量,然后如果有其他用户在线,发送他们的 Awareness 状态,这样新用户能立即看到其他人的光标。
服务端需要监听文档的更新,并将更新广播给所有连接的客户端:
const updateHandler = (update: Uint8Array) => { const encoder = encoding.createEncoder(); encoding.writeVarUint(encoder, messageSync); syncProtocol.writeUpdate(encoder, update); const message = encoding.toUint8Array(encoder); doc.conns.forEach((_, c) => send(doc, c, message)); }; doc.on("update", updateHandler); const awarenessChangeHandler = ({ added, updated, removed }) => { const changedClients = added.concat(updated, removed); const cOnnControlledIDs= doc.conns.get(conn); if (connControlledIDs !== undefined) { added.forEach((clientID) => connControlledIDs.add(clientID)); removed.forEach((clientID) => connControlledIDs.delete(clientID)); } const encoder = encoding.createEncoder(); encoding.writeVarUint(encoder, messageAwareness); encoding.writeVarUint8Array( encoder, awarenessProtocol.encodeAwarenessUpdate(doc.awareness, changedClients) ); const buff = encoding.toUint8Array(encoder); doc.conns.forEach((_, c) => send(doc, c, buff)); }; doc.awareness.on("update", awarenessChangeHandler); 当文档有更新时(比如用户移动了节点),将增量更新编码后广播给所有客户端。当 Awareness 状态变化时(比如用户移动了光标),也广播给所有客户端。注意这里会追踪每个连接控制的客户端 ID ,用于后续清理。
当客户端断开连接时,需要清理相关资源,移除其 Awareness 状态,如果房间空了就销毁文档:
conn.on("close", () => { const cOntrolledIds= doc.conns.get(conn); doc.conns.delete(conn); doc.awareness.off("update", awarenesChangeHandler); doc.off("update", updateHandler); if (controlledIds) { awarenessProtocol.removeAwarenessStates( doc.awareness, Array.from(controlledIds), null ); } if (doc.conns.size === 0) { doc.destroy(); docs.delete(docName); } }); 这里移除事件监听器防止内存泄漏,清除该客户端的 Awareness 状态,如果是最后一个客户端离开,销毁文档释放内存。
这些逻辑都编写完成之后,我们就可以执行如下名来来启动项目:
pnpm start:dev 前端的实现更加复杂一些,因为要处理用户交互、UI 渲染和协同同步。我们使用 ReactFlow 作为流程图渲染引擎,Yjs 处理数据同步。
前端项目结构如下:

前端需要安装以下依赖:
pnpm add yjs y-websocket reactflow React Flow 是一个用于构建交互式图形和流程图的库,提供了丰富的功能,如拖拽节点、连接线、缩放、平移等,常用于可视化编辑器、流程设计器等应用中。它支持高度定制,可以很容易地与其他前端库和框架集成。
在 src/components/Canvas.tsx 文件中,首先导入需要的库:
import { useCallback, useEffect, useRef, useState } from "react"; import { ReactFlow, Controls, Background, useNodesState, useEdgesState, addEdge, BackgroundVariant, ConnectionMode, useReactFlow, ReactFlowProvider, type Connection, type Edge, type Node, NodeChange, EdgeChange, } from "reactflow"; import "reactflow/dist/style.css"; import * as Y from "yjs"; import { WebsocketProvider } from "y-websocket"; import Cursor from "./Cursor"; 然后在 Canvas 组件中创建 Yjs 文档和 WebSocket 连接:
useEffect(() => { const doc = new Y.Doc(); const wsProvider = new WebsocketProvider( "ws://localhost:1234", "flow-room", doc, { connect: false, resyncInterval: -1, } ); setTimeout(() => { wsProvider.connect(); }, 50); const nodesMap = doc.getMap("nodes"); const edgesMap = doc.getMap("edges"); ydoc.current = doc; provider.current = wsProvider; // ... 后续逻辑 }, []); 这里延迟连接是为了避免过快重连的警告,使用 resyncInterval: -1 禁用自动重新同步。我们使用两个 Y.Map 分别存储节点和连接,这样可以独立管理它们的增删改。
在同一个 useEffect 中,设置本地用户的 Awareness 状态,包括随机生成的颜色和客户端 ID:
const clientId = wsProvider.awareness.clientID; const userColor = useRef(generateRandomColor()); wsProvider.awareness.setLocalState({ cursor: null, color: userColor.current, clientId: clientId, }); wsProvider.awareness.on("change", () => { const states = new Map(wsProvider.awareness.getStates()); setCursors(states); }); 监听 Awareness 变化,当其他用户的光标移动时,更新本地状态并重新渲染光标。
继续在 useEffect 中,如果是首次打开,初始化一些默认节点,然后监听节点变化,将 Yjs 数据同步到 React 状态:
if (nodesMap.size === 0) { initialNodes.forEach((node) => { nodesMap.set(node.id, JSON.parse(JSON.stringify(node))); }); } nodesMap.observe(() => { const yNodes = Array.from(nodesMap.values()); const validNodes = yNodes.map((node) => ({ id: node.id, type: node.type || "default", data: node.data, position: { x: node.position.x, y: node.position.y, }, })); setNodes(validNodes); }); 使用 JSON.parse(JSON.stringify()) 确保存储的是纯数据对象,避免 Yjs 无法序列化 React 组件。observe 方法监听 Y.Map 的变化,任何修改都会触发回调,我们将 Yjs 数据转换为 ReactFlow 需要的格式。
当用户拖动节点时,需要将位置更新同步到 Yjs 文档:
const handleNodesChange = useCallback( (changes: NodeChange[]) => { onNodesChange(changes); if (!ydoc.current) return; changes.forEach((change) => { if (change.type === "position") { const node = nodes.find((n) => n.id === change.id); if (node) { const updatedNode = { ...node, position: change.position || node.position, }; ydoc.current ?.getMap("nodes") .set(change.id, JSON.parse(JSON.stringify(updatedNode))); } } }); }, [nodes, onNodesChange] ); ReactFlow 使用 changes 数组描述节点的变化,我们过滤出位置变化,更新到 Yjs 文档。这个更新会触发 observe 回调,同时通过 WebSocket 发送给其他客户端。
当用户连接两个节点时,创建一条边并同步到 Yjs:
const OnConnect= useCallback( (connection: Connection) => { if (!connection.source || !connection.target) return; const newEdge = { id: `e${connection.source}-${connection.target}`, source: connection.source, target: connection.target, sourceHandle: connection.sourceHandle || undefined, targetHandle: connection.targetHandle || undefined, }; if (ydoc.current) { ydoc.current.getMap("edges").set(newEdge.id, newEdge); } setEdges((eds) => addEdge(connection, eds)); }, [setEdges] ); 生成唯一的边 ID ,同时更新本地状态和 Yjs 文档。本地状态用于立即响应,Yjs 文档用于同步给其他客户端。
光标同步是协同编辑中很重要的用户体验细节,让用户能看到彼此的位置:
const handleMouseMove = useCallback( (e: React.MouseEvent) => { if (!provider.current?.awareness || !flowRef.current) return; const bounds = flowRef.current.getBoundingClientRect(); const x = e.clientX - bounds.left; const y = e.clientY - bounds.top; const flowPosition = reactFlowInstance.screenToFlowPosition({ x, y }); provider.current.awareness.setLocalState({ cursor: flowPosition, color: userColor.current, clientId: provider.current.awareness.clientID, }); }, [reactFlowInstance] ); const handleMouseLeave = useCallback(() => { if (!provider.current?.awareness) return; provider.current.awareness.setLocalState({ cursor: null, color: userColor.current, clientId: provider.current.awareness.clientID, }); }, []); 获取鼠标相对于容器的坐标,使用 ReactFlow 的 screenToFlowPosition 转换为画布坐标(考虑缩放和平移),然后更新到 Awareness 。当鼠标离开画布时,将光标设置为 null ,其他用户就看不到这个光标了。
最后在 Canvas 组件的 JSX 中渲染其他用户的光标,使用不同颜色区分:
{ Array.from(cursors.entries()).map(([clientId, state]) => { if (!state.cursor || clientId === currentClientId) { return null; } const screenPosition = reactFlowInstance.flowToScreenPosition({ x: state.cursor.x, y: state.cursor.y, }); return ( <Cursor key={clientId} x={screenPosition.x} y={screenPosition.y} color={state.color} /> ); }); } 过滤掉自己的光标,将画布坐标转换回屏幕坐标(因为光标是绝对定位在屏幕上的),使用每个用户的颜色渲染光标组件。
在 src/components/Cursor.tsx 中实现光标组件,我们使用的 SVG 的方式来实现这个光标组件:
interface CursorProps { x: number; y: number; color: string; } export default function Cursor({ x, y, color }: CursorProps) { return ( <div style={{ position: "absolute", left: x, top: y, pointerEvents: "none", zIndex: 9999, }} > <svg width="24" height="24" viewBox="0 0 24 24"> <path d="M5.65376 12.3673L5.46026 12.4668L5.70974 12.8619L11.3877 21.5659L12.6377 23.192L13.1897 21.2687L16.855 8.71373L19.0002 0.939087L10.5578 3.70771L0.458496 7.88092L-1.56467 8.73732L0.0837585 10.0768L5.65376 12.3673Z" fill={color} /> </svg> </div> ); } 这个组件接收坐标和颜色,使用 SVG 绘制一个指针形状,通过 pointerEvents: "none" 确保不会干扰用户的鼠标操作。
让我们通过一个完整的场景来理解整个协同过程。假设用户 A 拖动了一个节点,数据是如何同步到用户 B 的?

整个过程分为三个阶段:
第一阶段,用户 A 拖动节点,触发 handleNodesChange ,更新 Yjs 文档,Yjs 文档生成增量更新。
第二阶段,WebsocketProvider 监听到 update 事件,将增量编码为二进制消息,通过 WebSocket 发送给服务器,服务器应用更新到服务端文档。
第三阶段,服务器将更新广播给所有其他客户端,用户 B 的 WebsocketProvider 收到消息,解码后应用到本地文档,触发 observe 回调,更新 React 状态,ReactFlow 重新渲染,用户 B 看到节点移动了。
这个过程非常高效,因为传输的是增量数据而不是完整文档,一次节点移动可能只需要传输几十个字节。Yjs 的 CRDT 算法保证了即使有并发修改,最终所有客户端的状态也会一致。
因为我们项目使用的是 Monorepo 项目,所以我们只需要在终端上启动如下命令即可:
pnpm dev 如果看到这样的输出,说明我们的前后端都启动成功了:

现在我们需要在不同的浏览器都打开相同的链接:
http://localhost:5173/ 因为这是用 vite 创建的项目,所以默认启动的是 5173 (我要吃饭端口)


最终你能看到两端鼠标实时同步过来,并且拖动节点能同步到另外一段浏览器,这就是我们实现的最简单的协同编辑器。
基于这个简单的协同编辑器架构,我们可以进一步扩展和丰富其功能。例如,可以增加用户身份、编辑历史回溯、权限管理、节点间的关系建立等更多功能,进一步提升协作体验和工具的实用性。
在这个架构中,Yjs 和 WebSocket 各司其职,共同完成协同编辑。
Yjs 的职责是维护文档状态和处理冲突。它使用 CRDT 算法确保多个客户端并发编辑时数据最终一致,生成增量更新而不是完整快照,大大减少了网络传输量,提供了 Y.Map 、Y.Array 等数据结构,让我们能方便地组织复杂的文档数据。Yjs 不关心数据如何传输,它只负责生成和应用更新。
WebSocket 的职责是网络通信。它建立客户端和服务器之间的双向连接,传输 Yjs 生成的二进制消息,处理连接断开、重连等网络问题,维护房间隔离,确保不同房间的消息不会混淆。WebSocket 不关心消息的内容,它只负责可靠地传输数据。
y-websocket 将两者连接起来,监听 Yjs 文档的更新,自动发送给服务器,接收服务器的消息,自动应用到 Yjs 文档,处理同步协议( State Vector 、Update )的编解码。这种职责分离的设计非常优雅,我们可以轻松替换传输层(比如用 WebRTC 代替 WebSocket )而不影响 Yjs 的使用。
虽然手动实现 WebSocket 服务器能让我们深入理解协同原理,但在实际项目中,我们通常会使用更成熟的解决方案。Hocuspocus 就是 Yjs 官方推荐的协同后端框架,它提供了开箱即用的功能。
Hocuspocus 的核心优势在于功能完整和易于扩展。它内置了持久化支持,可以将文档保存到数据库(支持 Redis 、PostgreSQL 、SQLite 等),提供了身份验证和权限控制的钩子,支持文档历史和版本管理,内置了扩展系统,可以轻松添加自定义逻辑,还提供了监控和日志功能,便于调试和运维。
使用 Hocuspocus 搭建服务器非常简单:
import { Server } from "@hocuspocus/server"; const server = Server.configure({ port: 1234, async onAuthenticate({ token, documentName }) { // 验证用户身份 return { user: { id: 1, name: "John" } }; }, async onLoadDocument({ documentName }) { // 从数据库加载文档 return Y.encodeStateAsUpdate(doc); }, async onStoreDocument({ documentName, state }) { // 保存文档到数据库 await db.save(documentName, state); }, }); server.listen(); 前端使用 HocuspocusProvider 替代 WebsocketProvider:
import { HocuspocusProvider } from "@hocuspocus/provider"; const provider = new HocuspocusProvider({ url: "ws://localhost:1234", name: "flow-room", document: doc, token: "user-auth-token", }); Hocuspocus 还提供了很多实用的扩展,比如 Webhooks 扩展可以在文档变化时触发 HTTP 请求,Logger 扩展可以记录所有操作日志,Throttle 扩展可以限制更新频率防止滥用,Database 扩展提供了多种数据库的开箱即用支持。
对于生产环境的应用,建议使用 Hocuspocus 而不是自己实现 WebSocket 服务器,这样可以节省大量开发时间,获得更稳定可靠的服务。
通过这个实战案例,我们完整地实现了一个协同流程图编辑器,涵盖了协同编辑的核心要素。
首先是架构设计。前端使用 React + ReactFlow 构建 UI ,后端使用 NestJS + WebSocket 处理协同,Yjs 作为协同引擎在前后端之间同步数据。这种架构清晰地分离了视图层、业务逻辑层和传输层。
其次是实现细节。后端管理文档实例和连接,处理 Sync 和 Awareness 消息,监听文档更新并广播给所有客户端。前端初始化 Yjs 文档和 WebSocket 连接,使用 Y.Map 存储节点和连接,监听用户操作并更新 Yjs 文档,渲染其他用户的协同光标。
再看协同流程。用户操作触发 Yjs 文档更新,生成增量数据,通过 WebSocket 发送给服务器,服务器广播给其他客户端,其他客户端应用更新并重新渲染。整个过程高效且可靠,得益于 Yjs 的 CRDT 算法和增量同步机制。
最后是工程化建议。在理解了底层原理后,实际项目中建议使用 Hocuspocus 简化开发,它提供了持久化、权限控制、监控等生产级功能,让我们能专注于业务逻辑而不是底层实现。
通过这个案例,我们不仅学会了如何构建协同应用,更重要的是理解了协同编辑的核心思想:通过 CRDT 算法保证最终一致性,通过增量同步减少网络传输,通过 Awareness 实现实时协作体验。这些理念可以应用到各种协同场景中,无论是文档编辑、画板协作还是数据表格。
1 qW7bo2FbzbC0 6 天前 后端的数据结构可以介绍下吗 |
2 moment082 OP @qW7bo2FbzbC0 看你需求呀,这个后端目前只做转发,没有数据结构 |