昨天看到 Jaana Dogan 创建了一个 broadcaster 的库, 话说美女 Jaana Dogan 又回到了 Google 了么。她的实现我们就当做 broadcaster 的第一个实现吧。
什么是 broadcaster?就是村口的大喇叭,一播音,全村都知道了。
Jaana Dogan 实现的这个 broadcaster 只有通知的功能,没有传递消息,也不能重用。我们就以这个库为基准,看看我们能够实现几种方式。
1、sync.Cond 实现
Jaana Dogan 使用
sync.Cond
实现。
package main
import (
"sync"
)
type Broadcaster struct {
mu *sync.Mutex
cond *sync.Cond
signaled bool
}
func NewBroadcaster() *Broadcaster {
var mu sync.Mutex
return &Broadcaster{
mu: &mu,
cond: sync.NewCond(&mu),
signaled: false,
}
}
func (b *Broadcaster) Go(fn func()) {
gofunc() {
b.cond.L.Lock()
defer b.cond.L.Unlock()
for !b.signaled {
b.cond.Wait()
}
fn()
}()
}
func (b *Broadcaster) Broadcast() {
b.cond.L.Lock()
b.signaled = true
b.cond.L.Unlock()
b.cond.Broadcast()
}
sync.Cond
是一个条件变量,可以用来实现广播通知。
sync.Cond
的
Wait
方法会阻塞当前所有调用的 goroutine,直到一个 goroutine 调用
Broadcast
方法。
我们可以写为它写一个单元测试,其他四种也使用同样的单元测试代码,就不赘述了。
package main
import (
"sync"
"testing"
)
func TestNewBroadcaster(t *testing.T) {
b := NewBroadcaster()
var wg sync.WaitGroup
wg.Add(2)
b.Go(func() {
t.Log("function 1 finished")
wg.Done()
})
b.Go(func() {
t.Log("function 2 finished")
wg.Done()
})
b.Broadcast()
wg.Wait()
}
我觉得直接使用
sync.Cond
也可以,只不过 Jaana Dogan 将它包装了一下,更方便使用。
-
-
Broadcast
方法用来通知所有等待的 goroutine
2、channel 实现
使用
channle
可以更简洁的实现。
package main
type Broadcaster struct {
signal chanstruct{}
}
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
signal: make(chanstruct{}),
}
}
func (b *Broadcaster) Go(fn func()) {
gofunc() {
fn()
}()
}
func (b *Broadcaster) Broadcast() {
close(b.signal)
}
channel
的特性是可以关闭,关闭后的
channel
会一直返回零值,所以我们可以使用
close
来通知所有等待的 goroutine。
这是常常使用 channel 实现的一种通知机制。
3、context 实现
context
是 Go1.7 引入的一个标准库,用来传递请求的上下文,可以用来实现广播通知。
package broadcaster
import (
"context"
)
type Broadcaster struct {
ctx context.Context
cancel context.CancelFunc
}
func NewBroadcaster() *Broadcaster {
ctx, cancel := context.WithCancel(context.Background())
return &Broadcaster{
ctx: ctx,
cancel: cancel,
}
}
func (b *Broadcaster) Go(fn func()) {
gofunc() {
fn()
}()
}
func (b *Broadcaster) Broadcast() {
b.cancel()
}
平心而论,
context
也是一种不错的选择。
4、sync.WaitGroup 实现
sync.WaitGroup
也可以实现广播通知。
package main
import (
"sync"
"sync/atomic"
)
type Broadcaster struct {
done int32
trigger sync.WaitGroup
}
func NewBroadcaster() *Broadcaster {
b := &Broadcaster{}
b.trigger.Add(1)
return b
}
func (b *Broadcaster) Go(fn func()) {
gofunc() {
if atomic.LoadInt32(&b.done) == 1 {
return
}
b.trigger.Wait()
fn()
}()
}
func (b *Broadcaster) Broadcast() {
if atomic.CompareAndSwapInt32(&b.done, 0, 1) {
b.trigger.Done()
}
}
sync.WaitGroup
的
Wait
方法会阻塞等待
Done
方法的调用,所以我们可以使用
WaitGroup
来实现广播通知。一旦
Wait
被放行,所有阻塞在
Wait
的 goroutine 都会被放行。所以一开始我们将 WaitGroup 的计数器设置为 1,然后调用
Wait
方法,一旦
Broadcast
方法被调用,计数器减 1,
Wait
方法放行。
5、sync.RWMutex 实现
sync.RWMutex
也可以实现广播通知。
package main
import (
"sync"
)
type Broadcaster struct {
mu *sync.RWMutex
}
func NewBroadcaster() *Broadcaster {
var mu sync.RWMutex
mu.Lock()
return &Broadcaster{mu: &mu}
}
func (b *Broadcaster) Go(fn func()) {
gofunc() {
b.mu.RLock()
defer b.mu.RUnlock()
fn()
}()
}
func (b *Broadcaster) Broadcast() {
b.mu.Unlock()
}