Press "Enter" to skip to content

Go 并发编程篇(三):基于共享内存实现协程通信

上篇教程中,我们已经演示了如何通过协程在 Go 语言中实现并发编程,从语法结构来说,Go 语言的协程是非常简单的,只需要通过 go 关键字声明即可,难点在于并发引起的不确定性,以及为了协调这种不确定性在不同协程间所要进行的通信,在并发开篇教程中,我们也介绍过在工程上,常见的并发通信模型有两种:共享内存和消息传递。

接下来,学院君将详细展开介绍这两种通信模型,以及如何基于这些通信模型对 Go 并发编程进行编排。

首先,我们来看看如何通过共享内存来实现 Go 协程通信,并通过协程通信来重构上篇教程的并发代码,实现应用程序的优雅退出。

通过共享计数器控制协程退出

新建一个 03-add.go,并编写代码如下:

package main
​
import (
    "fmt"
    "runtime"
    "sync"
    "time"
)
​
var counter int = 0
​
func addV2(a, b int, lock *sync.Mutex) {
    lock.Lock()
    c := a + b
    counter++
    fmt.Printf("%d: %d + %d = %d\n", counter, a, b, c)
    lock.Unlock()
}
​
func main() {
    start := time.Now()
    lock := &sync.Mutex{}
    for i := 0; i < 10; i++ {
        go addV2(1, i, lock)
    }
​
    for {
        lock.Lock()
        c := counter
        lock.Unlock()
        runtime.Gosched() // 让出 CPU 时间片
        if c >= 10 {
            break
        }
    }
    end := time.Now()
    consume := end.Sub(start).Seconds()
    fmt.Println("程序执行耗时(s):", consume)
}

为了精确判断主协程退出时机问题,我们需要在所有子协程执行完毕后通知主协程,主协程在收到该信号后退出程序,通过共享内存的方式我们引入了一个全局的 counter 计数器,该计数器被所有协程共享,每执行一次子协程,该计数器的值加 1,当所有子协程执行完毕后,计数器的值应该是 10,我们在主协程中通过一个死循环来判断 counter 的值,只有当它大于等于 10 时,才退出循环,进而退出整个程序。

此外,由于 counter 变量会被所有协程共享,为了避免 counter 值被污染(两个协程同时操作计数器),我们还引入了锁机制,即 sync.Mutex,这是 Go 语言标准库提供的互斥锁,当一个协程调用其 Lock() 方法加锁后,其他协程必须等到这个协程调用同一个 sync.MutexUnlock() 方法解锁才能继续访问这个 sync.Mutex(通过指针传递到子协程,所以整个应用持有的是同一个互斥锁),我们可以通过这种方式保证所有 lock.Lock()lock.Unlock() 之间的代码是以同步阻塞方式串行执行的,从而保证对 counter 进行读取和更新操作时,同时只有一个协程在操作它(既保证了操作的原子性)。

最后,我们还统计了整个程序执行时间。

当我们执行这段代码时,打印结果如下:

image-20221002205921330

可以看到,实际执行时间远远小于1秒,这样一来,程序的整体执行效率相比于上篇教程的实现快了将近2000倍。

不过,代码也因此变得更复杂,更难以维护,这还只是个简单的加法运算实现,就要写这么多代码,要引入共享变量,还要引入互斥锁来保证操作的原子性,对于更加复杂的业务代码,如果到处都要加锁、解锁,显然对开发者和维护者来说都是噩梦,Go 语言既然以并发编程作为语言的核心优势,当然不至于将这样的问题用这么繁琐的方式来解决。

通过 sync.WaitGroup 控制协程退出

实际上,我们可以借助 Go 官方标准库 sync 包提供的 sync.WaitGroup 更加优雅地实现协程退出。

sync.WaitGroup 类型是开箱即用的,也是并发安全的。该类型提供了以下三个方法:

  • AddWaitGroup 类型有一个计数器,默认值是 0,我们可以通过 Add 方法来增加这个计数器的值,通常我们可以通过个方法来标记需要等待的子协程数量;
  • Done:当某个子协程执行完毕后,可以通过 Done 方法标记已完成,该方法会将所属 WaitGroup 类型实例计数器值减 1,通常可以通过 defer 语句来调用它;
  • WaitWait 方法的作用是阻塞当前协程,直到对应 WaitGroup 类型实例的计数器值归零,如果在该方法被调用的时候,对应计数器的值已经是 0,那么它将不会做任何事情。

至此,你可能已经看出来了,我们完全可以组合使用 sync.WaitGroup 类型提供的方法来替代之前自行实现计数器的实现方法,对应代码如下:

package main
​
import (
    "fmt"
    "sync"
    "time"
)
​
func addV3(a, b int, doneFunc func()) {
    defer func() {
        doneFunc() // 子协程执行完毕后将计数器-1
    }()
    c := a + b
    fmt.Printf("%d + %d = %d\n", a, b, c)
}
​
func main() {
    start := time.Now()
    wg := sync.WaitGroup{}
    wg.Add(10) // 初始化计数器数目为10
    for i := 0; i < 10; i++ {
        go addV3(1, i, wg.Done)
    }
​
    wg.Wait() // 等待子协程全部执行完毕退出
    end := time.Now()
    consume := end.Sub(start).Seconds()
    fmt.Println("程序执行耗时(s):", consume)
}

看起来代码简洁多了,我们首先在主协程中声明了一个 sync.WaitGroup 类型的 wg 变量,然后调用 Add 方法设置等待子协程数为 10,然后循环启动子协程,并将 wg.Done 作为 defer 函数传递过去,最后,我们通过 wg.Wait() 等到 sync.WaitGroup 计数器值为 0 时退出程序。

上述代码打印结果和之前通过通道实现的结果是一致的:

image-20221002210044779

对于某些动态程序,可能在运行时才能知道计数器的数目,这种情况下,sync.WaitGroup 会更加方便,我们可以这样在循环体内部动态增加计数器,而不是一开始就指定总数:

// wg.Add(10) // 初始化计数器数目为10
for i := 0; i < 10; i++ {
    wg.Add(1) // 循环体内动态增加计数器,每次+1
    go addV3(1, i, wg.Done)
}

这种情况下,需要确保 Add 方法增加的数量和 Done 方法减少的数量是一致的即可,通常我们调用一次 Add 方法,就在协程执行完毕后调用一次 Done 方法就可以保证,如果两者不是一一对应的,Add 调用次数大于 Done 调用次数,所有协程执行完毕计数器是正数,会导致协程阻塞:

fatal error: all goroutines are asleep - deadlock!

反过来,如果 Add 调用次数小于 Done 调用次数,会导致计数器出现负数,也会报错:

panic: sync: negative WaitGroup counter

小结

sync.WaitGroup 其实是 Go 底层封装的一个更加优雅的计数器实现,和自行实现的计数器一样,本质上都是通过计数信号量基于共享内存实现协程之间的同步和通信,在基于共享内存实现协程通信时,一个重要的、不可忽视的要素就是如何确保不同协程同时访问同一资源的数据竞争和并发安全问题,这在传统多线程并发编程中也是一大议题,下篇教程学院君将详细给大家介绍 Go 语言中基于共享内存实现协程通信的并发安全问题和解决方案。

One Comment

  1. bella
    bella 2023年12月7日

    for {
    lock.Lock()
    c := counter
    lock.Unlock()
    runtime.Gosched() // 让出 CPU 时间片
    if c >= 10 {
    break
    }
    }
    学院君,这里读counter的时候是不是没必要加锁

发表回复