Go语言中的WaitGroup类似Java的CountDownLatch,任务检查点,任务等待点,比如有一个主任务,执行到某一时刻需要执行两个子任务,主任务等待阻塞,两个子任务执行完毕后再执行主任务,这就需要一个任务阻塞检查点,等待子任务执行完毕
WaitGroup使用
示例:
1 2 3 4 5 6 7 8 9 10 11
| wg.Add(1) go createRand()
wg.Add(1) go sumResult() wg.Wait() for v := range resultChan { j := (*v).job s := (*v).sum fmt.Println("1111", j, s) }
|
创建两个协程方法,产生随机数,计算结果,主线程等待计算结果后,再打印结果
一共有三个方法
1 2 3
| (wg *WaitGroup) Add(delta int) (wg *WaitGroup) Done() (wg *WaitGroup) Wait()
|
Add
方法用于设置 WaitGroup 的计数值,可以理解为子任务的数量
Done
方法用于将 WaitGroup 的计数值减一,可以理解为完成一个子任务
Wait
方法用于阻塞调用者,直到 WaitGroup 的计数值为0,即所有子任务都完成
源码分析
WaitGroup结构
1 2 3 4
| type WaitGroup struct { noCopy noCopy state1 [3]uint32 }
|
state1 是个复合字段,会拆分为两部分: 64位(8个字节)的 statep 作为一个整体用于原子操作, 其中前面4个字节表示计数值,后面四个字节表示等待数量;剩余 32位(4个字节)semap 用于标识信号量。
Go语言中对于64位的变量进行原子操作,需要保证该变量是 64位对齐 的,也就是要保证这 8个字节 的首地址是 8 的整数倍。因此当 state1 的首地址是 8 的整数倍时,取前8个字节作为 statep ,后4个字节作为 semap;当 state1 的首地址不是 8 的整数倍时,取后8个字节作为 statep ,前4个字节作为 semap。
1 2 3 4 5 6 7 8 9 10
| func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } }
|
Add
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 { panic("sync: negative WaitGroup counter") }
if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }
if v > 0 || w == 0 { return }
if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }
*statep = 0
for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
|
Done
1 2 3 4
| func (wg *WaitGroup) Done() { wg.Add(-1) }
|
Add 方法用于添加一个计数值(负数相当于减),当计数值变为0后, Wait 方法阻塞的所有等待者都会被释放,计数值变为负数是非法操作,产生 panic,当计数值为0时(初始状态),Add 方法不能和 Wait 方法并发调用,需要保证 Add 方法在 Wait 方法之前调用,否则会 panic
Wait
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for { state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 { return }
if atomic.CompareAndSwapUint64(statep, state, state+1) { runtime_Semacquire(semap)
if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }
|
注意事项
根据源码分析,我们可以得到如何使用waitGroup可以避免panic:
1、保证 Add 在 Wait 前调用
2、Add 中不传递负数
3、任务完成后不要忘记调用 Done 方法,建议使用 defer wg.Done()
4、不要复制使用 WaitGroup,函数传递时使用指针传递
5、尽量不复用 WaigGroup,减少出问题的风险