当通过
go
关键字创建一个新的goroutine的时候,它会优先被放入P的本地队列。为了运行goroutine,M需要持有(绑定)一个P,接着M会启动一个OS线程,循环从P的本地队列里取出一个goroutine并执行。当然还有上文提及的
work-stealing
调度算法:当M执行完了当前P的Local队列里的所有G后,P也不会就这么在那躺尸啥都不干,它会先尝试从Global队列寻找G来执行,如果Global队列为空,它会随机挑选另外一个P,从它的队列里中拿走一半的G到自己的队列中执行。
既然Go调度器已经这么优秀了,我们为什么还要自己去实现一个golang的 Goroutine Pool 呢?事实上,优秀不代表完美,任何不考虑具体应用场景的编程模式都是耍流氓!有基于G-P-M的Go调度器背书,go程序的并发编程中,可以任性地起大规模的goroutine来执行任务,官方也宣称用golang写并发程序的时候随便起个成千上万的goroutine毫无压力。
首先,即便每个goroutine只分配2KB的内存,但如果是恐怖如斯的数量,聚少成多,内存暴涨,就会对GC造成极大的负担,写过java的同学应该知道jvm GC那万恶的STW(Stop The World)机制,也就是GC的时候会挂起用户程序直到垃圾回收完,虽然Go1.8之后的GC已经去掉了STW以及优化成了并行GC,性能上有了不小的提升,但是,如果太过于频繁地进行GC,依然会有性能瓶颈;
type sig struct{}
type f func() error
// Pool accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type Pool struct {
// capacity of the pool.
capacity int32
// running is the number of the currently running goroutines.
running int32
// freeSignal is used to notice pool there are available
// workers which can be sent to work.
freeSignal chan sig
// workers is a slice that store the available workers.
workers []*Worker
// release is used to notice the pool to closed itself.
release chan sig
// lock for synchronous operation
lock sync.Mutex
once sync.Once
}
Pool
是一个通用的协程池,支持不同类型的任务,亦即每一个任务绑定一个函数提交到池中,批量执行不同类型任务,是一种广义的协程池;本项目中还实现了另一种协程池 — 批量执行同类任务的协程池
PoolWithFunc
,每一个
PoolWithFunc
只会绑定一个任务函数
pf
,这种Pool适用于大批量相同任务的场景,因为每个Pool只绑定一个任务函数,因此
PoolWithFunc
相较于
Pool
会更加节省内存,但通用性就不如前者了,为了让大家更好地理解协程池的原理,这里我们用通用的
Pool
来分析。
// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
var w *Worker
// 标志,表示当前运行的worker数量是否已达容量上限
waiting := false
// 涉及从workers队列取可用worker,需要加锁
p.lock.Lock()
workers := p.workers
n := len(workers) - 1
// 当前worker队列为空(无空闲worker)
if n < 0 {
// 运行worker数目已达到该Pool的容量上限,置等待标志
if p.running >= p.capacity {
waiting = true
// 否则,运行数目加1
} else {
p.running++
}
// 有空闲worker,从队列尾部取出一个使用
} else {
w = workers[n]
workers[n] = nil
p.workers = workers[:n]
}
p.lock.Unlock()
// 阻塞等待直到有空闲worker
if waiting {
// 队列有空闲worker通知信号
<-p.freeSignal
for {
p.lock.Lock()
workers = p.workers
l := len(workers) - 1
if l < 0 {
p.lock.Unlock()
continue
}
w = workers[l]
workers[l] = nil
p.workers = workers[:l]
p.lock.Unlock()
break
}
// 当前无空闲worker但是Pool还没有满,
// 则可以直接新开一个worker执行任务
} else if w == nil {
w = &Worker{
pool: p,
task: make(chan f),
}
w.run()
}
return w
}
上面的源码中加了较为详细的注释,结合前面的设计思路,相信大家应该能理解获取可用worker绑定任务执行这个协程池的核心操作,这里主要关注一个地方:达到Pool容量限制之后,额外的任务请求需要阻塞等待idle worker,这里是为了防止无节制地创建goroutine,事实上Go调度器有一个复用机制,每次使用
go
关键字的时候它会检查当前结构体M中的P中,是否有可用的结构体G。如果有,则直接从中取一个,否则,需要分配一个新的结构体G。如果分配了新的G,需要将它挂到runtime的相关队列中,但是调度器却没有限制goroutine的数量,这在瞬时性goroutine爆发的场景下就可能来不及复用G而依然创建了大量的goroutine,所以
ants
除了复用还做了限制goroutine数量。
其他部分可以依照注释理解,这里不再赘述。
任务执行
// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
// pool who owns this worker.
pool *Pool
// task is a job should be done.
task chan f
}
// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {
//atomic.AddInt32(&w.pool.running, 1)
go func() {
// 监听任务列表,一旦有任务立马取出运行
for f := range w.task {
if f == nil {
atomic.AddInt32(&w.pool.running, -1)
return
}
f()
// 回收复用
w.pool.putWorker(w)
}
}()
}
// stop this worker.
func (w *Worker) stop() {
w.sendTask(nil)
}
// sendTask sends a task to this worker.
func (w *Worker) sendTask(task f) {
w.task <- task
}
Worker回收(goroutine复用)
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
p.lock.Lock()
p.workers = append(p.workers, worker)
p.lock.Unlock()
p.freeSignal <- sig{}
}
还记得前面我说除了通用的
Pool struct
之外,本项目还实现了一个
PoolWithFunc struct
—一个执行批量同类任务的协程池,
PoolWithFunc
相较于
Pool
,因为一个池只绑定一个任务函数,省去了每一次task都需要传送一个任务函数的代价,因此其性能优势比起
Pool
更明显,这里我们稍微讲一下一个协程池只绑定一个任务函数的细节:
上码!
type pf func(interface{}) error
// PoolWithFunc accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
// capacity of the pool.
capacity int32
// running is the number of the currently running goroutines.
running int32
// freeSignal is used to notice pool there are available
// workers which can be sent to work.
freeSignal chan sig
// workers is a slice that store the available workers.
workers []*WorkerWithFunc
// release is used to notice the pool to closed itself.
release chan sig
// lock for synchronous operation
lock sync.Mutex
// pf is the function for processing tasks
poolFunc pf
once sync.Once
}
PoolWithFunc struct
中的大部分字段和
Pool struct
基本一致,重点关注
poolFunc pf
,这是一个函数类型,也就是该Pool绑定的指定任务函数,而client提交到这种类型的Pool的数据就不再是一个任务函数
task f
了,而是
poolFunc pf
任务函数的形参,然后交由
WorkerWithFunc
处理:
// WorkerWithFunc is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type WorkerWithFunc struct {
// pool who owns this worker.
pool *PoolWithFunc
// args is a job should be done.
args chan interface{}
}
// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *WorkerWithFunc) run() {
//atomic.AddInt32(&w.pool.running, 1)
go func() {
for args := range w.args {
if args == nil {
atomic.AddInt32(&w.pool.running, -1)
return
}
w.pool.poolFunc(args)
w.pool.putWorker(w)
}
}()
}