errgroup
是 Go 官方库 x 中提供的一个非常实用的工具,用于并发执行多个 goroutine,并且方便的处理错误。
我们知道,Go 标准库中有个
sync.WaitGroup
可以用来并发执行多个 goroutine,
errgroup
就是在其基础上实现了
errgroup.Group
。不过,
errgroup.Group
和
sync.WaitGroup
在功能上是有区别的,尽管它们都用于管理 goroutine 的同步。
errgroup 优势
与
sync.WaitGroup
相比,以下是设计
errgroup.Group
的原因和优势:
-
-
sync.WaitGroup
只负责等待 goroutine 完成,不处理 goroutine 的返回值或错误。
-
errgroup.Group
虽然目前也不能直接处理 goroutine 的返回值,但在 goroutine 返回错误时,可以立即取消其他正在运行的 goroutine,并在
Wait
方法中返回第一个非
nil
的错误。
-
errgroup
可以与
context.Context
配合使用,支持在某个 goroutine 出现错误时自动取消其他 goroutine,这样可以更好地控制资源,避免不必要的工作。
-
使用
errgroup
可以减少错误处理的样板代码,开发者不需要手动管理错误状态和同步逻辑,使得并发编程更简单、更易于维护。
-
errgroup
提供了便捷的接口来限制并发 goroutine 的数量,避免过载,而
sync.WaitGroup
没有这样的功能。
以上,
errgroup
为处理并发任务提供了更强大的错误管理和控制机制,因此在许多并发场景下是更优的选择。
随着本文接下来的深入讲解,你就能深刻体会到上面所说的优势了。
sync.WaitGroup 使用示例
在介绍
errgroup.Group
前,我们还是先来一起回顾下
sync.WaitGroup
的用法。
示例如下:
package main
import (
"fmt"
"net/http"
"sync"
)
func main() {
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/", // 这是一个错误的 URL,会导致任务失败
}
var err error
var wg sync.WaitGroup // 零值可用,不必显式初始化
for _, url := range urls {
wg.Add(1) // 增加 WaitGroup 计数器
// 启动一个 goroutine 来获取 URL
go func() {
defer wg.Done() // 当 goroutine 完成时递减 WaitGroup 计数器
resp, e := http.Get(url)
if e != nil { // 发生错误返回,并记录该错误
err = e
return
}
defer resp.Body.Close()
fmt.Printf("fetch url %s status %s\n", url, resp.Status)
}()
}
// 等待所有 goroutine 执行完成
wg.Wait()
if err != nil { // err 会记录最后一个错误
fmt.Printf("Error: %s\n", err)
}
}
示例中,我们使用
sync.WaitGroup
来启动 3 个 goroutine 并发访问 3 个不同的
URL
,并在成功时打印响应状态码,或失败时记录错误信息。
执行示例代码,得到如下输出:
$ go run waitgroup/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
我们获取了两个成功的响应,并打印了一条错误信息。
根据示例,我们可以抽象出
sync.WaitGroup
最典型的惯用法:
var wg sync.WaitGroup
for ... {
wg.Add(1)
go func() {
defer wg.Done()
// do something
}()
}
wg.Wait()
errgroup.Group 使用示例
其实
errgroup.Group
的使用套路与
sync.WaitGroup
非常类似。
基本使用
errgroup
基本使用套路如下:
-
-
-
-
使用
Group.Wait
方法等待所有 goroutine 完成或有一个返回错误。
将前文中的
sync.WaitGroup
程序示例使用
errgroup.Group
重写为如下示例:
package main
import (
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func main() {
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/", // 这是一个错误的 URL,会导致任务失败
}
// 使用 errgroup 创建一个新的 goroutine 组
var g errgroup.Group // 零值可用,不必显式初始化
for _, url := range urls {
// 使用 errgroup 启动一个 goroutine 来获取 URL
g.Go(func() error {
resp, err := http.Get(url)
if err != nil {
return err // 发生错误,返回该错误
}
defer resp.Body.Close()
fmt.Printf("fetch url %s status %s\n", url, resp.Status)
return nil // 返回 nil 表示成功
})
}
// 等待所有 goroutine 完成并返回第一个错误(如果有)
if err := g.Wait(); err != nil {
fmt.Printf("Error: %s\n", err)
}
}
可以发现,这段程序与
sync.WaitGroup
示例很像,根据代码中的注释,很容易看懂。
执行示例代码,得到如下输出:
$ go run examples/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
输出结果也没什么变化。
上下文取消
errgroup
提供了
errgroup.WithContext
可以附加取消功能,在任意一个 goroutine 返回错误时,可以立即取消其他正在运行的 goroutine,并在
Wait
方法中返回第一个非
nil
的错误。
示例如下:
package main
import (
"context"
"fmt"
"net/http"
"sync"
"golang.org/x/sync/errgroup"
)
func main() {
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/", // 这是一个错误的 URL,会导致任务失败
}
// 创建一个带有 context 的 errgroup
// 任何一个 goroutine 返回非 nil 的错误,或 Wait() 等待所有 goroutine 完成后,context 都会被取消
g, ctx := errgroup.WithContext(context.Background())
// 创建一个 map 来保存结果
var result sync.Map
for _, url := range urls {
// 使用 errgroup 启动一个 goroutine 来获取 URL
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err // 发生错误,返回该错误
}
// 发起请求
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err // 发生错误,返回该错误
}
defer resp.Body.Close()
// 保存每个 URL 的响应状态码
result.Store(url, resp.Status)
return nil // 返回 nil 表示成功
})
}
// 等待所有 goroutine 完成并返回第一个错误(如果有)
if err := g.Wait(); err != nil {
fmt.Println("Error: ", err)
}
// 所有 goroutine 都执行完成,遍历并打印成功的结果
result.Range(func(key, value any) bool {
fmt.Printf("fetch url %s status %s\n", key, value)
return true
})
}
执行示例代码,得到如下输出:
$ go run examples/withcontext/main.go
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
fetch url http://www.google.com/ status 200 OK
由测试结果来看,对于
[http://www.google.com/](http://www.google.com/)
的请求可以接收到成功响应,由于对
[http://www.somestupidname.com/](http://www.somestupidname.com/)
请求报错,程序来不及等待
[http://www.golang.org/](http://www.golang.org/)
响应,就被取消了。
其实我们大致可以猜测到,取消功能应该是通过
context.cancelCtx
来实现的,我们暂且不必深究,稍后探索源码就能验证我们的猜想了。
限制并发数量
errgroup
提供了
errgroup.SetLimit
可以限制并发执行的 goroutine 数量。
示例如下:
package main
import (
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
// 创建一个 errgroup.Group
var g errgroup.Group
// 设置最大并发限制为 3
g.SetLimit(3)
// 启动 10 个 goroutine
for i := 1; i <= 10; i++ {
g.Go(func() error {
// 打印正在运行的 goroutine
fmt.Printf("Goroutine %d is starting\n", i)
time.Sleep(2 * time.Second) // 模拟任务耗时
fmt.Printf("Goroutine %d is done\n", i)
return nil
})
}
// 等待所有 goroutine 完成
if err := g.Wait(); err != nil {
fmt.Printf("Encountered an error: %v\n", err)
}
fmt.Println("All goroutines complete.")
}
使用
g.SetLimit(3)
可以限制最大并发为 3 个 goroutine。
执行示例代码,得到如下输出:
$ go run examples/setlimit/main.go
Goroutine 3 is starting
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 is done
Goroutine 1 is done
Goroutine 5 is starting
Goroutine 3 is done
Goroutine 6 is starting
Goroutine 4 is starting
Goroutine 6 is done
Goroutine 5 is done
Goroutine 8 is starting
Goroutine 4 is done
Goroutine 7 is starting
Goroutine 9 is starting
Goroutine 9 is done
Goroutine 8 is done
Goroutine 10 is starting
Goroutine 7 is done
Goroutine 10 is done
All goroutines complete.
根据输出可以发现,虽然我们通过
for
循环启动了 10 个 goroutine,但程序执行时最多只允许同时启动 3 个 goroutine,当这 3 个 goroutine 中有某个执行完成并退出,才会有新的 goroutine 被启动。
尝试启动
errgroup
还提供了
errgroup.TryGo
可以
尝试启动一个任务
,它返回一个
bool
值,标识任务是否启动成功,
true
表示成功,
false
表示失败。
errgroup.TryGo
需要搭配
errgroup.SetLimit
一同使用,因为如果不限制并发数量,那么
errgroup.TryGo
始终返回
true
,当达到最大并发数量限制时,
errgroup.TryGo
返回
false
。
示例如下:
package main
import (
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
// 创建一个 errgroup.Group
var g errgroup.Group
// 设置最大并发限制为 3
g.SetLimit(3)
// 启动 10 个 goroutine
for i := 1; i <= 10; i++ {
if g.TryGo(func() error {
// 打印正在运行的 goroutine
fmt.Printf("Goroutine %d is starting\n", i)
time.Sleep(2 * time.Second) // 模拟工作
fmt.Printf("Goroutine %d is done\n", i)
return nil
}) {
// 如果成功启动,打印提示
fmt.Printf("Goroutine %d started successfully\n", i)
} else {
// 如果达到并发限制,打印提示
fmt.Printf("Goroutine %d could not start (limit reached)\n", i)
}
}
// 等待所有 goroutine 完成
if err := g.Wait(); err != nil {
fmt.Printf("Encountered an error: %v\n", err)
}
fmt.Println("All goroutines complete.")
}
使用
g.SetLimit(3)
限制最大并发为 3 个 goroutine,调用
g.TryGo
如果启动任务成功,打印
Goroutine {i} started successfully
提示信息;启动任务失败,则打印
Goroutine {i} could not start (limit reached)
提示信息。
执行示例代码,得到如下输出:
$ go run examples/trygo/main.go
Goroutine 1 started successfully
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 started successfully
Goroutine 3 started successfully
Goroutine 4 could not start (limit reached)
Goroutine 5 could not start (limit reached)
Goroutine 6 could not start (limit reached)
Goroutine 7 could not start (limit reached)
Goroutine 8 could not start (limit reached)
Goroutine 9 could not start (limit reached)
Goroutine 10 could not start (limit reached)
Goroutine 3 is starting
Goroutine 2 is done
Goroutine 3 is done
Goroutine 1 is done
All goroutines complete.
因为限制最大并发数量为 3,所以前面 3 个 goroutine 启动成功,并且正常执行完成,其他几个 goroutine 全部执行失败。
以上就是
errgroup
的全部用法了,更多使用场景你可以在实践中去尝试和感悟。
源码解读
接下来,我们一起阅读下
errgroup
源码,以此来加深对
errgroup
的理解。
errgroup
源码非常少,仅有 3 个文件。这 3 个文件源码内容分别如下:
主逻辑代码:
https://github.com/golang/sync/blob/v0.8.0/errgroup/errgroup.go
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
//
// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks
// returning errors.
package errgroup
import (
"context"
"fmt"
"sync"
)
type
token struct{}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
cancel func(error)
wg sync.WaitGroup
sem chan token
errOnce sync.Once
err error
}
func (g *Group) done() {
if g.sem != nil {
}
g.wg.Done()
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := withCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel(g.err)
}
return g.err
}
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem }
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
}
// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem // Note: this allows barging iff channels in general allow barging.
default:
return false
}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
return true
}
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
if n 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}
为 Go 1.20 及更高版本提供的
withCancelCause
函数实现:
https://github.com/golang/sync/blob/v0.8.0/errgroup/go120.go
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.20
package errgroup
import "context"
func withCancelCause(parent context.Context) (context.Context, func(error)) {
return context.WithCancelCause(parent)
}
为低于 Go 1.20 版本提供的
withCancelCause
函数实现:
https://github.com/golang/sync/blob/v0.8.0/errgroup/pre_go120.go
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.20
package errgroup
import "context"
func withCancelCause(parent context.Context) (context.Context, func(error)) {
ctx, cancel := context.WithCancel(parent)
return ctx, func(error) { cancel() }
}
可以看到,
errgroup
全部源码加起来也不到 100 行,可谓短小精悍。
现在我们来分析下
errgroup
源码。
根据包注释我们可以知道,
errgroup
包提供了同步、错误传播和上下文取消功能,用于一组 goroutines 处理共同任务的子任务。
errgroup.Group
与
sync.WaitGroup
相关,增加了处理任务返回错误的能力。
为了提供以上功能,首先
errgroup
定义了
token
和
Group
两个结构体:
// 定义一个空结构体类型 token,会作为信号进行传递,用于控制并发数
type token struct{}
// Group 是一组协程的集合,这些协程处理同一整体任务的子任务
//
// 零值 Group 是有效的,对活动协程的数量没有限制,并且不会在出错时取消
type Group struct {
cancel func(error) // 取消函数,就是 context.CancelCauseFunc 类型
wg sync.WaitGroup // 内部使用了 sync.WaitGroup
sem chan token // 信号 channel,可以控制协程并发数量
errOnce sync.Once // 确保错误仅处理一次
err error // 记录子协程集中返回的第一个错误
}
token
被定义为空结构体,用来传递信号,这也是 Go 中空结构体的惯用法。
NOTE:
你可以在我的另一篇文章
《Go 中空结构体惯用法,我帮你总结全了!》
中查看空结构体的更多用法。
Group
是
errgroup
包提供的唯一公开结构体,其关联的方法承载了所有功能。
cancel
属性为一个函数,上下文取消时会被调用,其实就是
context.CancelCauseFunc
类型,调用
errgroup.WithContext
时被赋值。
wg
属性即为
sync.WaitGroup
,承担并发控制的主逻辑,
errgroup.Go
和
errgroup.TryGo
内部并发控制逻辑都会代理给
sync.WaitGroup
。
sem
属性是
token
类型的
channel
,用于限制并发数量,调用
errgroup.SetLimit
是被赋值。
err
会记录所有 goroutine 中出现的第一个错误,由
errOnce
确保错误错误仅处理一次,所以后面再出现更多的错误都会被忽略。
接下来我们先看
errgroup.SetLimit
方法定义:
// SetLimit 限制该 Group 中活动的协程数量最多为 n,负值表示没有限制
//
// 任何后续对 Go 方法的调用都将阻塞,直到可以在不超过限额的情况下添加活动协程
//
// 在 Group 中存在任何活动的协程时,限制不得修改
func (g *Group) SetLimit(n int) { // 传进来的 n 就是 channel 长度,以此来限制协程的并发数量
if n 0 { // 这里检查如果小于 0 则不限制协程并发数量。此外,也不要将其设置为 0,会产生死锁
g.sem = nil
return
}
if len(g.sem) != 0 { // 如果存在活动的协程,调用此方法将产生 panic
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active"
, len(g.sem)))
}
g.sem = make(chan token, n)
}
errgroup.SetLimit
方法可以限制并发属性,其内部逻辑很简单,不过要注意在调用
errgroup.Go
或
errgroup.TryGo
方法前调用
errgroup.SetLimit
,以防程序出现
panic
。
然后看下主逻辑
errgroup.Go
方法实现:
// Go 会在新的协程中调用给定的函数
// 它会阻塞,直到可以在不超过配置的活跃协程数量限制的情况下添加新的协程
//
// 首次返回非 nil 错误的调用会取消该 Group 的上下文(context),如果该 context 是通过调用 WithContext 创建的,该错误将由 Wait 返回
func (g *Group) Go(f func() error) {
if g.sem != nil { // 这个是限制并发数的信号通道
g.sem // 如果超过了配置的活跃协程数量限制,向 channel 发送 token 会阻塞
}
g.wg.Add(1) // 转发给 sync.WaitGroup.Add(1),将活动协程数加一
go func() {
defer g.done() // 当一个协程完成时,调用此方法,内部会将调用转发给 sync.WaitGroup.Done()
if err := f(); err != nil { // f() 就是我们要执行的任务
g.errOnce.Do(func() { // 仅执行一次,即只处理一次错误,所以会记录第一个非 nil 的错误,与协程启动顺序无关
g.err = err // 记录错误
if g.cancel != nil { // 如果 cancel 不为 nil,则调用取消函数,并设置 cause
g.cancel(g.err)
}
})
}
}()
}
首先会检测是否使用
errgroup.SetLimit
方法设置了并发限制,如果有限制,则使用
channel
来控制并发数量。
否则执行主逻辑,其实就是
sync.WaitGroup
的套路代码。
在
defer
中调用了
g.done()
,
done
方法定义如下:
// 当一个协程完成时,调用此方法
func (g *Group) done() {
// 如果设置了最大并发数,则 sem 不为 nil,从 channel 中消费一个 token,表示一个协程已完成
if g.sem != nil {
}
g.wg.Done() // 转发给 sync.WaitGroup.Done(),将活动协程数减一
}
另外,如果某个任务返回了错误,则通过
errOnce
确保错误只被处理一次,处理方式就是先记录错误,然后调用
cancel
方法。
cancel
实际上是在
errgroup.WithContext
方法中赋值的:
// WithContext 返回一个新的 Group 和一个从 ctx 派生的关联 Context
//
// 派生的 Context 会在传递给 Go 的函数首次返回非 nil 错误或 Wait 首次返回时被取消,以先发生者为准。
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := withCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}
这里的
withCancelCause
有两种实现。
如果 Go 版本大于等于 1.20,提供的
withCancelCause
函数实现如下:
// 构建约束标识了这个文件是 Go 1.20 版本被加入的
//go:build go1.20
package errgroup
import "context"
// 代理到 context.WithCancelCause
func withCancelCause(parent context.Context) (context.Context, func(error)) {
return context.WithCancelCause(parent)
}
如果 Go 版本小于 1.20,提供的
withCancelCause
函数实现如下:
//go:build !go1.20