最近在做一个项目,后端 Go ,前端 SSE 推流 LLM 的输出。遇到一个很烦的问题:用户刷新页面或者网络抖一下,流就断了,但后端还在跑,token 照烧不误。
更麻烦的是我们的 LLM worker 和 HTTP handler 不在同一个实例上,负载均衡一转发,重连过来的请求根本找不到原来那个流。
JS/TS 那边有 Vercel 的 resumable-stream 可以用,但 Go 这边翻了一圈啥也没有,就自己撸了一个:
https://github.com/gtoxlili/streamhub思路不复杂:
- Redis Streams 存 chunk ,断线重连的订阅者先 replay 历史再接实时数据
- Redis Pub/Sub 传 cancel 信号,用户在 A 节点点停止,B 节点上的生成就能收到
- 每个 producer 有个 generation ID 做 fencing token ,防止旧 producer 写脏数据
- 同一个 session 只允许一个 producer 注册,不会重复调 LLM
代码大概长这样:
```go
// 生产端
stream, created, err := hub.Register("chat:123", cancelFunc)
if !created {
return // 已经有人在跑了
}
defer stream.Close()
stream.Publish("hello")
// 消费端(任意实例)
chunks, unsub := hub.Get("chat:123").Subscribe(128)
defer unsub()
for chunk := range chunks {
// 先 replay 再 live
fmt.Fprint(w, chunk)
}
```
目前还比较早期,API 可能还会改。做类似场景的同学可以看看,有想法欢迎提 issue 。