go

Go 系列 并发编程

Posted by lichao modified on November 30, 2021

Go 中的 goroutine 之间没有父与子的关系,也就没有所谓子进程退出后的通知机制。多个 goroutine 都是平行地被调度,多个 goroutine 如何协作工作涉及通信、同步、通知和退出四个方面。

  • 通信:chan 通道当然是 goroutine 之间通信的基础(这里的通信是指程序的数据通道)。
  • 同步:不带缓冲的 chan 提供一个天然的同步等待机制;当然 sync.WaitGroup 也为多个 goroutine 协同工作提供一种同步等待机制。
  • 通知:通知通常不是业务数据,而是管理、控制流数据。可以使用 chan,并结合 select 收敛进行处理。
  • 推出:goroutine 之间没有 父子关系,可以利用 context 包提供多个 goroutine 之间的退出通知功能。

背景

抢占式调度

在 go 1.14 前,go 并非完全的抢占式调度,如下代码会陷入死循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main
import (
    "fmt"
    "runtime"
    "time"
)
func main() {
    runtime.GOMAXPROCS(1)

    fmt.Println("The program starts ...")

    go func() {
        for {
        }
    }()

    time.Sleep(time.Second)
    fmt.Println("I got scheduled!")
}

如果检测到某个 P 的状态为 Prunning,并且它已经运行了超过 10ms,则会将 P 的当前的 G 的 stackguard 设置为 StackPreempt。这个操作其实是相当于加上一个标记,通知这个 G 在合适时机进行调度。如果没有函数调用或者主动调用 runtime.Gosched() 的话将会陷入死循环。

Go 1.14 引入了基于系统信号的异步抢占调度,在循环中无函数调用的情况下也能被抢占出让 P,同样的代码则会正常运行。runtime 的 sysmon 负责进行运行时间的检查,在src/runtime/proc.go 中有定义变量 forcePreemptNS,在超过时限后会发送 SIGURG 信号,通知相应协程让出调度。

1
2
3
// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms

在 mac 本地运行测试,如图所示在循环中约每 20 ms goroutine 都会被中断一次。

Channel

channel 一般用于协程之间的通信,channel 也可以用于并发控制。比如主协程启动 N 个子协程,主协程等待所有子协程退出后再继续后续流程,这种场景下 channel 也可轻易实现。

场景示例

下面程序展示一个使用channel控制子协程的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main 

import ( 
    "time"
    "fmt" 
) 
func Process(ch chan int) { 
    // Do some work... 
    time.Sleep(time.Second) 
    ch <- 1 // 管道中写入一个元素表示当前协程已结束 
} 

func main() { 
    channels := make([]chan int, 10) // 创建一个10个元素的切片,元素类型为 channel 
    for i:= 0; i < 10; i++ { 
        channels[i] = make(chan int) // 切片中放入一个channel 
        go Process(channels[i]) // 启动协程,传一个管道用于通信 
    } 
    for i, ch := range channels { // 遍历切片,等待子协程结束
        <-ch 
        fmt.Println("Routine ", i, " quit!") 
    }
}

上面程序通过创建 N 个 channel 来管理 N 个协程,每个协程都有一个 channel 用于跟父协程通信,父协程创建完所有协程中等待所有协程结束。

这个例子中,父协程仅仅是等待子协程结束,其实父协程也可以向管道中写入数据通知子协程结束,这时子协程需要 定期的探测管道中是否有消息出现。

使用 channel 来控制子协程的优点是实现简单,缺点是当需要大量创建协程时就需要有相同数量的 channel,而且对于子协程继续派生出来的协程不方便控制。 后面继续介绍的 WaitGroup、Context 看起来比 channel 优雅一些,在各种开源组件中使用频率比 channel 高得多。

WaitGroup

WaitGroup 是 Golang 应用开发过程中经常使用的并发控制技术。

WaitGroup,可理解为 Wait-Goroutine-Group,即等待一组 goroutine 结束。比如某个 goroutine 需要等待其他几个 goroutine 全部完成,那么使用 WaitGroup 可以轻松实现。

例子

下面程序展示了一个 goroutine 等待另外两个 goroutine 结束的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main 
import ( 4. "fmt" 5. "time" 6. "sync" 7. ) 
func main() { 
    var wg sync.WaitGroup 
    wg.Add(2) //设置计数器,数值即为goroutine的个数 
    go func() { 
        //Do some work 15. time.Sleep(1*time.Second) 
        fmt.Println("Goroutine 1 finished!") 
        wg.Done() // goroutine执行结束后将计数器减1 
    }() 
    go func() { 
        // Do some work 23. time.Sleep(2*time.Second) 
        fmt.Println("Goroutine 2 finished!") 
        wg.Done() //goroutine执行结束后将计数器减1 
    }() 
    wg.Wait() //主goroutine阻塞等待计数器变为0 
    fmt.Printf("All Goroutine finished!") 
}

简单的说,上面程序中wg内部维护了一个计数器:

  1. 启动 goroutine 前将计数器通过 Add(2) 将计数器设置为待启动的 goroutine 个数。
  2. 启动 goroutine 后,使用 Wait() 方法阻塞自己,等待计数器变为 0。
  3. 每个 goroutine 执行结束通过 Done() 方法将计数器减 1。
  4. 计数器变为 0 后,阻塞的 goroutine 被唤醒。

其实 WaitGroup 也可以实现一组 goroutine 等待另一组 goroutine,这有点像玩杂技,很容出错,如果不了解其实现原理更是如此。实际上,WaitGroup 的实现源码非常简单。