百万请求一分钟, Golang 轻松来搞定 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
darluc
V2EX    推广

百万请求一分钟, Golang 轻松来搞定

  •  1
     
  •   darluc 2018-02-28 15:26:48 +08:00 9667 次点击
    这是一个创建于 2832 天前的主题,其中的信息可能已经有所发展或是发生改变。

    点击此处阅读原文

    我在反广告、杀病毒、检木马等行业的不同软件公司里已经工作 15 年以上了,非常了解这类系统软件因每天处理海量数据而导致的复杂性。

    目前我作为 smsjunk.com 的 CEO 和 KnowBe4 的主架构师,在这两个网络安全领域的公司里工作。

    有趣的是,在过去的 10 年里,作为软件工程师,我接触到的 web 后端代码大多是用 Ruby on Rails 开发的。请不要误会,我很喜欢 Ruby on Railds 框架,而且我认为它是一套令人称赞的框架,不过时间一长,你就会习惯于使用 ruby 语言的方式思考和设计系统,会忘记利用多线程,并行化,快速执行和小的内存消耗,软件架构本可以如此高效且简单。很多年来,我也是一个 C/C++,Delphi 以及 C# 的使用者,而且我开始认识到使用正确的工具能让事情变得更简单。

    我对互联网上没完没了的语言框架之间的论战并不感冒。因为我相信解决方案的效能及代码可维护性主要倚仗于你的架构能做到多简单。

    实际问题

    在实现某个遥测分析系统时,我们遇到一个实际问题,要处理来自数百万终端的 POST 请求。其中的 web 请求处理过程会接收到一个 JSON 文档,它包含一个由许多荷载数据组成的集合,我们要把它写到 Amazon S3 存储中,之后我们的 map-reduce 系统就可以对这些数据进行处理。

    一般我们会利用如下的组件去创建一个有后台工作层的架构,如:

    • Sidekiq
    • Resque
    • DelayedJob
    • Elasticbeanstalk Worker Tier
    • RabbitMQ
    • 等等

    并且建立两个不同的服务集群,一个用作 web 前端接收数据,另一个执行具体的工作,这样我们就能动态调整后台处理工作的能力了。

    不过从项目伊始,我们的团队就认为应该用 Go 语言来实现这项工作,因为在讨论过程中我们发现这可能是一个流量巨大的系统。我已经使用 Go 语言快两年了,而且我们已经在工作中用它开发了一些系统,只是还没遇到过负载如此大的系统。

    我们从定义一些 web 的 POST 请求载荷数据结构开始,还有一个用于上传到 S3 存储的方法。

    type PayloadCollection struct { WindowsVersion string `json:"version"` Token string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct { // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } // Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var cOntentType= "application/octet-stream" return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{}) } 

    Go routines 的傻瓜式用法

    起初我们实现了一个非常简单的 POST 处理接口,尝试用一个简单的 goroutine 并行工作处理过程:

    func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader( http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var cOntent= &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=") w.WriteHeader( http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader( http.StatusOK) } 

    在普通负载的情况下,这段代码对于大多数人已经够用了,不过很快就被证明了不适合大流量的情形。当我们把第一个版本的代码部署到生产环境后,才发现实际情况远远超出我们的预期,系统流量比之前预计的大许多,我们低估了数据负载量。

    上面的处理方式从几个方面来看都有问题。我们无法办法控制创建的 go routines 的数量。而且我们每分钟收到一百万次的 POST 请求,代码必然很快就崩溃。

    再次尝试

    我们需要寻找别的出路。从一开始,我们就在讨论怎样保证请求处理时间较短,然后在后台进行工作处理。当然,在 Ruby on Rails 里必须这样做,否则你会阻塞掉所有的 web 处理进程,无论你是否使用了 puma,unicorn,passenger (我们这里就不讨论 JRuby 了)。然后我们可能会使用常见的解决方案,比如 Resque,Sidkiq,SQS,等等。有许多方法可以完成这个任务。

    所以第二次迭代采用了缓冲通道( buffered channel ),我们可以将一些工作先放入队列,再将它们上传至 S3,由于我们能够控制队列的大小,而且有充足的内存可用,所以我们以为将任务缓冲到 channel 队列中就可以了。

    var Queue chan Payload func init() { Queue = make(chan Payload, MAX_QUEUE) } func payloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { Queue <- payload } ... } 

    然后将任务从队列中取出再进行处理,我们使用了类似下面的代码:

    func StartProcessor() { for { select { case job := <-Queue: job.payload.UploadToS3() // <-- 仍然不好使! } } } 

    老实说,我都不知道当时我们在想些什么。这一定是喝红牛熬夜导致的结果。这个方案没给我们带来任何好处,我们只是将一个有问题的并发过程替换为了一个缓冲队列,它只是将问题推后了而已。我们的同步处理过程每次只将一份载荷数据上传到 S3,由于接受到请求的速率远大于单例程上传到 S3 的能力,我们的缓冲队列很快就满了,导致请求处理过程阻塞,无法将更多的数据送入队列。

    我们傻乎乎地忽略了问题,最终开始了系统的死亡倒计时。在部署了这个问题版本之后几分钟里,系统的延迟以固定的速率不断增加。

    更好的解决方案

    我们决定使用 Go 通道的一种常用模式构建一个两层的通道系统,一个通道用作任务队列,另一个来控制处理任务时的并发量。

    这个办法是想以一种可持续的速率、并发地上传数据至 S3 存储,这样既不会把机器跑挂掉也不会产生 S3 的连接错误。因此我们选择使用了一种 Job/Worker 模式。如果你熟悉 Java,C# 等语言,可以认为这是使用通道以 Go 语言的方式实现了一个工作线程池。

    var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") ) // Job represents the job to be run type Job struct { Payload Payload } // A buffered channel that we can send work requests on. var JobQueue chan Job // Worker represents the worker that executes the job type Worker struct { WorkerPool chan chan Job JobChannel chan Job quit chan bool } func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool)} } // Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // we have received a work requet. if err := job.Payload.UploadToS3(); err != nil { log.Errorf("Error uploading to S3: %s", err.Error()) } case <-w.quit: // we have received a signal to stop return } } }() } // Stop signals the worker to stop listening for work requests. func (w Worker) Stop() { go func() { w.quit <- true }() } 

    我们修改了 web 请求处理过程,使用数据载荷创建了一个 Job 实例,然后将其送入 JobQueue 通道中供工作例程使用。

    func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader( http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var cOntent= &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=") w.WriteHeader( http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { // let's create a job with the payload work := Job{Payload: payload} // Push the work onto the queue. JobQueue <- work } w.WriteHeader( http.StatusOK) } 

    在 web 服务初始化的过程中,我们创建了一个 Dispatcher 实例,调用 Run() 方法创建了工作例程池,并且通过监听 JobQueue 获取工作任务。

    dispatcher := NewDispatcher(MaxWorker) dispatcher.Run() 

    下面的代码是任务分派器的具体实现:

    type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher WorkerPool chan chan Job } func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool} } func (d *Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.pool) worker.Start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job) } } } 

    注意我们提供了一个最大数量的参数,用于控制工作池中初始的例程数量。因为这个项目使用了 Amazon Elasticbeanstalk 以及 docker 中的 Go 环境,所以我们努力遵循 12-factor 的方法,从环境变量中读取配置值,便于在生产环境中进行系统配置。通过这种方式,我们可以控制工作例程的数量和工作队列的长度,无需对集群进行重新部署,我们就能快速调整参数值。

    点击此处阅读全文

    45 条回复    2018-03-05 17:58:14 +08:00
    kslr
        1
    kslr  
       2018-02-28 15:32:51 +08:00
    如果直接上传到 S3 然后报告结果给 API 呢
    douglarek
        2
    douglarek  
       2018-02-28 15:34:33 +08:00
    mark
    Crabbbbb
        3
    Crabbbbb  
       2018-02-28 15:36:17 +08:00
    mark
    douglarek
        4
    douglarek  
       2018-02-28 15:36:20 +08:00
    原文标题可没说是 Golang 轻松来搞定
    chinvo
        5
    chinvo  
       2018-02-28 15:36:20 +08:00
    让客户端上传到 s3,然后发 s3 的结果给 API,这样做和文中方案有何利弊区分?
    fatjiong
        6
    fatjiong  
       2018-02-28 15:38:17 +08:00
    感谢分享。
    est
        7
    est  
       2018-02-28 15:38:24 +08:00   1
    > 从环境变量中读取配置值,便于在生产环境中进行系统配置。通过这种方式,我们可以控制工作例程的数量和工作队列的长度,无需对集群进行重新部署

    这特么也是需要重启一下进程才能实现的把???
    douglarek
        8
    douglarek  
       2018-02-28 15:43:34 +08:00
    看明白了,实现了个 sqs + 多进程消费 ?
    ljypaul2011
        9
    ljypaul2011  
       2018-02-28 15:57:24 +08:00
    mark
    Icezers
        11
    Icezers  
       2018-02-28 15:58:52 +08:00
    为什么不拆分成 生产者-任务队列-消费者的微服务模式呢,只要保证任务队列的数据库不炸就行了啊,如果生产者的生产速率恒大于消费者的消费速率,文章中的 JobQueue 一样会炸啊,无非是时间问题,内存可比硬盘贵多了
    xkeyideal
        12
    xkeyideal  
       2018-02-28 16:01:37 +08:00   1
    去年的老帖,动动脑再来发,作者说的最后一种高效方案写的真的好么?实现的复杂一逼,而且这种情况,后端服务堵了啥语言都白给。
    murmur
        13
    murmur  
       2018-02-28 16:02:04 +08:00
    Hndling 1 Million Requests per Minute with Golang
    又一个翻译标题党
    这需求太简单了吧,收集一大堆 json 然后整合起来上传到亚马逊云上 几乎没有什么处理操作 只要内存队列控制的好的语言应该都能完成
    janxin
        14
    janxin  
       2018-02-28 16:06:34 +08:00
    @est #7 环境变量那种还是 12factor 推荐的方式,不过我觉得还是配置文件好
    Icezers
        15
    Icezers  
       2018-02-28 16:11:46 +08:00   4
    重新看了一遍原文看明白了
    第一种方案,并发量过大导致上传任务奔溃
    第二种方案,使用了 channel 控制并发,导致协程阻塞
    于是作者的第三种方案就是写了一个类 Java 的线程池。。。。在高并发的时候把任务放入线程池,同时控制任务执行速率防止上传协程崩溃,
    问题来了,如果真按文中所述每分钟百万次请求,要么放大协程数量炸上传,要么 JobQueue 阻塞炸内存。。。。
    问题根源不是出在上传那里么,还是没有解决
    所以没有什么是加一台机器不能解决的,如果有,就加 2 台
    wph95
        16
    wph95  
       2018-02-28 16:12:47 +08:00 via iPhone   1
    aws kinesis 了解一下
    没看出来这文章哪里体现出一分钟一百万了
    python async 跑分还吊打 go ……
    Icezers
        17
    Icezers  
       2018-02-28 16:12:51 +08:00
    @xkeyideal 同意,我觉得这不是语言的问题,是业务模型没有设计好
    jswh
        18
    jswh  
       2018-02-28 16:20:18 +08:00
    看下来了,和 go 关系不大
    sagaxu
        19
    sagaxu  
       2018-02-28 16:25:50 +08:00
    一秒才一万多,逻辑也不复杂,随便哪个语言都能轻松搞定吧
    clippit
        20
    clippit  
       2018-02-28 16:26:46 +08:00
    就单纯上报的话,还可以试试 AWS Lambda (逃
    CoderGeek
        21
    CoderGeek  
       2018-02-28 16:29:57 +08:00 via iPhone
    好像真的没啥关系 微服务能很大程度避免这种问题吧
    verzhshq
        22
    verzhshq  
       2018-02-28 16:31:25 +08:00
    谢谢分享
    xkeyideal
        23
    xkeyideal  
       2018-02-28 16:31:30 +08:00   1
    @Icezers 这个帖子,去年我专门拿出来做过相关的剖析,前提是不考虑后端服务的处理能力。

    作者说了三种 goroutine 常用的方法,第一种最常见,第二种也还好,第三种设计的模型真的垃圾,简单代码复杂化,让人很难看懂,明明可以用个类 worker pool 的方式就能搞定,开多个 worker,每个 worker 去各自的 pool 中取任务即可,非要搞那么复杂,做的高级一点,还可以对 worker 的数量进行缩扩容。

    最后讨论了一下这个应用场景,结论就是后端的 S3 服务一睹,啥方案都白给。

    综合来看,这篇文章的价值很小,给一些刚入门的,不太理解 goroutine 的人看看还行
    pmispig
        24
    pmispig  
       2018-02-28 16:32:30 +08:00
    我来总结下吧。最后优化的方式是使用了 2 个 channel. 一个用来放 job 列队,一个用来控制 Go routines 的并发数量。
    rrfeng
        25
    rrfeng  
       2018-02-28 16:33:32 +08:00 via Android
    这个翻译文章问题太多了...

    大家去看原文吧。记得两年前就看过了,翻译出来根本不是原来的味道
    jimrok
        26
    jimrok  
       2018-02-28 16:34:33 +08:00
    把消费者也并发执行了,最终搞定。这模式 java,erlang 都很容易实现,golang 又拿出来炒一遍。
    shell314
        27
    shell314  
       2018-02-28 16:44:59 +08:00
    学习了
    xuanyuanaosheng
        28
    xuanyuanaosheng  
       2018-02-28 16:45:15 +08:00
    mark
    glacier2002
        29
    glacier2002  
       2018-02-28 16:52:36 +08:00
    学习了
    yc8332
        30
    yc8332  
       2018-02-28 17:01:49 +08:00
    怎么感觉很 low 的样子 (逃
    nnnoml
        31
    nnnoml  
       2018-02-28 17:07:51 +08:00
    mark
    soli
        32
    soli  
       2018-02-28 17:23:14 +08:00   3
    没仔细看代码,从描述来看,感觉作者及同事处理问题的步骤没啥大问题。
    但估计是作者没有真正理解每个环节所遇到的问题的本质,所以给人有种『吃了第十个馒头饱了,就觉得前九个馒头都没用』的感觉。

    简单梳理一下:

    1. 最开始他们遇到的问题是『要处理来自数百万终端的 POST 请求』。

    这其实是要解决并发数和响应时间的问题,顺带要考虑突发流量问题,即要『削峰』。

    这时候他们采用协程去解决,没啥问题。
    (有简单的解决方法,肯定用简单的哈。)

    2. 解决了并发数和响应时间问题后,导致了爆协程的问题。

    请求确实不少,协程处理没那么快的话,当然爆掉了。
    尤其是遇到突发流量的话,服务不死才怪。

    为了解决爆协程的问题(注意:这个阶段的问题已经和 1 中的问题不是同一个问题了),
    那就用队列吧。至少队列可以控制数量哈。

    这时候他们解决问题的方式,也没啥可指摘的。

    3. 这时候横着爆的问题变成了纵向爆了。

    用队列代替协程之后,协程数量爆的问题变成了队列长度爆的问题。

    这个阶段才把吞吐率的问题暴露出来。
    无论横着竖着都爆,那就是说明问题是数据处理不够快哈。
    那开个线程池,多放几个线程处理数据呗。

    这时候他们的解决方法,也没得说。


    最后,既然每一步都没啥问题,但整个过程为啥给人一种有问题的感觉?
    我认为,唯一的问题是整个团队没有有经验的人。
    有经验的人,遇到这种情况的时候,至少能把各个阶段面临的问题理解清楚,处理起来有的放矢。
    更好点的,可以预见后续会遇到的问题,从而从一开始就采用不同的解决方案。
    再好点的,可以迅速做出解决方案原型,并进行多项性能测试,从而采用最简单的方案把问题解决。
    如果原型+测试表明,用协程就可以解决了,那直接协程就好了,队列线程池什么的就属于过渡设计了;
    类似文中这种情况,原型+测试的方法可以极大地减少对生产环境的影响,
    也尽量避免了开发人员为了紧急应对线上问题而疲于奔命。
    Icezers
        33
    Icezers  
       2018-02-28 17:31:35 +08:00
    @soli 我觉得他们的本质问题是 UploadToS3 速率过慢的问题,就算『削峰』+开线程池控制协程,出水口依然没有变大,最后池子还是会有问题的
    soli
        34
    soli  
       2018-02-28 17:44:06 +08:00
    @Icezers 那是最终的问题哈。

    一开始如果连并发都没解决的话,前面一分钟只能接一万个请求,那『 UploadToS3 速率过慢』又从何而来呢?
    前面九个馒头不吃的话,怎么直接吃第十个馒头呢?
    AckywOw
        35
    AckywOw  
       2018-02-28 17:44:23 +08:00
    mark
    hugodotlau
        36
    hugodotlau  
       2018-02-28 18:06:18 +08:00
    马克华菲
    jinya
        37
    jinya  
       2018-02-28 19:01:39 +08:00
    m
    feverzsj
        38
    feverzsj  
       2018-02-28 19:24:44 +08:00
    还以为是百万每秒,原来是每分钟,这也值得拿来说事?
    angelshq
        39
    angelshq  
       2018-02-28 19:35:21 +08:00
    mark,学习完 go 在回头看看。
    rayjoy
        40
    rayjoy  
       2018-02-28 22:26:56 +08:00
    mark,go 学习中
    sen506
        41
    sen506  
       2018-02-28 22:40:05 +08:00 via iPhone
    1 秒 1 万多真心不高。。
    kunluanbudang
        42
    kunluanbudang  
       2018-02-28 23:08:54 +08:00 via Android
    @Livid

    麻烦看下
    searene
        43
    searene  
       2018-03-01 09:01:47 +08:00
    说句题外话,翻译口味有点浓,一看就能看出来
    puperSB
        44
    puperSB  
       2018-03-01 10:21:40 +08:00
    mark
    kaxi
        45
    kaxi  
       2018-03-05 17:58:14 +08:00
    @Icezers
    jobChannel := <-d.WorkerPool 这里的 jobChannel 是不是应该 d.jobChannel 才对,如果这样写 Dispatcher struct 中好像又缺少定义包括 d.maxWorkers 及 d.pool 都缺少定义?
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2829 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 31ms UTC 13:46 PVG 21:46 LAX 05:46 JFK 08:46
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86