晚上看了会 Medium 上的博客,发现了一篇关于使用 Go 来实现并发的获取数据。
我在作者原有的基础上改造了一下,限制了最大的数量,不知道有没有什么疏漏,请大家指教指教。
另外我也在思考关于 GC 的问题,如果代码中在某一次循环中走了<-ctx.Done()的分支,直接返回了结果,productsChan 会被怎么回收?我应该如何关闭相关的 channel ?还是让程序自己处理?
我的改造:
package main import "sync" import "runtime" import "fmt" var LIST_PRODUCT_TYPE = [100000]string{"food", "electronics", "clothing","...more"} // ......非常多的数据需要查询 type GetListProductResponse struct { Data []ProductListResponse `json:"data"` } type ProductListResponse struct { Code string `json:"code"` Name string `json:"name"` Price string `json:"price"` Status bool `json:"status"` } func getProducts(ctx context.Context, req *GetProductListRequest) (*GetListProductResponse, error) { // calling endpoint 3rd party // parse to response // and return the data return &productList, nil } func main() { ctx, cancelFunc := context.WithCancel(ctx) defer cancelFunc() wg := sync.WaitGroup{} doneChan := make(chan struct{}, 1) productsChan := make(chan *GetListProductResponse) errChan := make(chan error) // LIST_PRODUCT_TYPE 数量非常大,需要限制最大的并发数量 maxConcurrency := 5 semaphore := make(chan struct{}, maxConcurrency) wg.Add(len(LIST_PRODUCT_TYPE)) for key := range LIST_PRODUCT_TYPE { req := &GetProductListRequest{ ProductType: LIST_PRODUCT_TYPE[key], } select { case <-ctx.Done(): return nil, ctx.Err() case semaphore<-struct{}{}: go func() { defer wg.Done() defer func() { <-semaphore }() products, err := getProductList(ctx, req) if err != nil { errChan <- err return } productsChan <- products }() } } go func() { wg.Wait() doneChan <- struct{}{} }() var ( catalogues GetListProductResponse data []ProductListResponse ) for { select { case <-ctx.Done(): return nil, ctx.Err() case err := <-errChan: return nil, err case products := <-productsChan: data = append(data, products.Data...) catalogues.Data = data case <-doneChan: return &catalogues, nil } } }
![]() | 1 dcalsky 203 天前 ![]() 新手自己写容易出问题,推荐: https://github.com/sourcegraph/conc |
3 awanganddong 201 天前 ``` func worker(wg *sync.WaitGroup, jobs chan int, i int) { defer wg.Done() for job := range jobs { fmt.Printf("jobs:%v,goroutin:%v\n", job, i) } } func main() { numWorker := 3 numJobs := 1000 var wg sync.WaitGroup jobs := make(chan int) for i := 0; i < numWorker; i++ { wg.Add(1) go worker(&wg, jobs, i) } for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) wg.Wait() } 我能想到的就是执行任务这里做并发控制。 ``` |