The Way to Go(八)

协程(goroutine)与通道(channel)

什么是协程

一个应用程序是运行在机器上的一个进程;进程是一个运行在自己内存地址空间里的独立执行体。一个进程由一个或多个操作系统线程组成,这些线程其实是共享同一个内存地址空间的一起工作的执行体。几乎所有’正式’的程序都是多线程的,以便让用户或计算机不必等待,或者能够同时服务多个请求(如 Web 服务器),或增加性能和吞吐量。一个并发程序可以在一个处理器或者内核上使用多个线程来执行任务,但是只有同一个程序在某个时间点同时运行在多核或者多处理器上才是真正的并行。

并行是一种通过使用多处理器以提高速度的能力。所以并发程序可以是并行的,也可以不是。

公认的,使用多线程的应用难以做到准确,最主要的问题是内存中的数据共享,它们会被多线程以无法预知的方式进行操作,导致一些无法重现或者随机的结果(称作 竞态)。

不要使用全局变量或者共享内存,它们会给你的代码在并发运算的时候带来危险。

解决之道在于同步不同的线程,对数据加锁,这样同时就只有一个线程可以变更数据。在 Go 的标准库 sync 中有一些工具用来在低级别的代码中实现加锁;

不过过去的软件开发经验告诉我们这会带来更高的复杂度,更容易使代码出错以及更低的性能,所以这个经典的方法明显不再适合现代多核/多处理器编程:thread-per-connection 模型不够有效。

在 Go 中,应用程序并发处理的部分被称作 goroutines(协程),它可以进行更有效的并发运算。在协程和操作系统线程之间并无一对一的关系:协程是根据一个或多个线程的可用性,映射(多路复用,执行于)在他们之上的;协程调度器在 Go 运行时很好的完成了这个工作。

协程工作在相同的地址空间中,所以共享内存的方式一定是同步的;这个可以使用sync 包来实现,不过我们很不鼓励这样做:Go 使用 channels 来同步协程

当系统调用(比如等待 I/O)阻塞协程时,其他协程会继续在其他线程上工作。协程的设计隐藏了许多线程创建和管理方面的复杂工作。

存在两种并发方式:确定性的(明确定义排序)和非确定性的(加锁/互斥从而未定义排序)。Go 的协程和通道理所当然的支持确定性的并发方式(例如通道具有一个 sender 和一个 receiver)。

协程是通过使用关键字go 调用(或执行)一个函数或者方法来实现的(也可以是匿名或者 lambda 函数)。这样会在当前的计算过程中开始一个同时进行的函数,在相同的地址空间中并且分配了独立的栈,比如:go sum(bigArray),在后台计算总和。

协程的栈会根据需要进行伸缩,不会出现栈溢出;开发者无需关心栈的大小。当协程结束的时候,它会静默退出:用来启动这个协程的函数也不会得到任何的返回值。

任何 Go 程序都必须有的main() 函数也可以看做是一个协程,尽管它并没有通过go 来启动。协程可以在程序初始化的过程中运行(在 init() 函数中)。

协程间的信道

Go有一个特殊的类型,通道(channel),像是通道(管道),可以通过它们发送类型化的数据在协程之间通信,可以避开所有内存共享导致的坑;通道的通信方式保证了同步性。数据通过通道:同一时间只有一个协程可以访问数据:所以不会出现数据竞争,设计如此。数据的归属(可以读写数据的能力)被传递。

工厂的传送带是个很有用的例子。一个机器(生产者协程)在传送带上放置物品,另外一个机器(消费者协程)拿到物品并打包。

通道服务于通信的两个目的:值的交换,同步的,保证了两个计算(协程)任何时候都是可知状态。

通常使用这样的格式来声明通道:

1
var identifier chan datatype

未初始化的通道的值是nil。

通道只能传输一种类型的数据,比如 chan int 或者 chan string,所有的类型都可以用于通道,空接口interface{} 也可以。甚至可以(有时非常有用)创建通道的通道。

通道实际上是类型化消息的队列:使数据得以传输。它是先进先出(FIFO)的结构所以可以保证发送给他们的元素的顺序。通道也是引用类型,所以我们使用 make() 函数来给它分配内存。这里先声明了一个字符串通道 ch1,然后创建了它(实例化):

1
2
var ch1 chan string
ch1 = make(chan string)

当然可以更短:

1
ch1 := make(chan string)。

int通道的通道: chanOfChans := make(chan int)

函数通道:funcChan := chan func()

通信操作符 <-

这个操作符直观的标示了数据的传输:信息按照箭头的方向流动。

  • 流向通道(发送)

ch <- int1 表示:用通道ch发送变量 int1(双目运算符,中缀 = 发送)

  • 从通道流出(接收),三种方式:

int2 = <- ch 表示:变量 int2从通道 ch(一元运算的前缀操作符,前缀 = 接收)接收数据(获取新值);假设 int2 已经声明过了,如果没有的话可以写成:int2 := <- ch

<- ch可以单独调用获取通道的(下一个)值,当前值会被丢弃,但是可以用来验证,所以以下代码是合法的:

1
2
3
if <- ch != 1000{
...
}

下面的示例展示了通信操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"time"
)

func main() {
ch := make(chan string)

go sendData(ch)
go getData(ch)

time.Sleep(1e9)
}

func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokio"
}

func getData(ch chan string) {
var input string
// time.Sleep(2e9)
for {
input = <-ch
fmt.Printf("%s ", input)
}
}

输出:

1
Washington Tripoli London Beijing Tokio

main()函数中启动了两个协程:sendData()通过通道ch 发送了 5 个字符串,getData()按顺序接收它们并打印出来。

  • main() 等待了 1 秒让两个协程完成,如果不这样,sendData() 就没有机会输出。

  • getData()使用了无限循环:它随着 sendData()的发送完成和ch 变空也结束了。

  • 如果我们移除一个或所有 go关键字,程序无法运行,Go 运行时会抛出 panic

为什么会这样?运行时会检查所有的协程(也许只有一个是这种情况)是否在等待(可以读取或者写入某个通道),意味着程序无法处理。这是死锁(deadlock)形式,运行时可以检测到这种情况。

注意:不要使用打印状态来表明通道的发送和接收顺序:由于打印状态和通道实际发生读写的时间延迟会导致和真实发生的顺序不同。

通道阻塞

默认情况下,通信是同步且无缓冲的:在有接受者接收数据之前,发送不会结束。可以想象一个无缓冲的通道在没有空间来保存数据的时候:必须要一个接收者准备好接收通道的数据然后发送者可以直接把数据发送给接收者。所以通道的发送/接收操作在对方准备好之前是阻塞的:

1)对于同一个通道,发送操作(协程或者函数中的),在接收者准备好之前是阻塞的:如果ch中的数据无人接收,就无法再给通道传入其他数据:新的输入无法在通道非空的情况下传入。所以发送操作会等待 ch 再次变为可用状态:就是通道值被接收时(可以传入变量)。

2)对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了。

以下程序验证了以上理论,一个协程在无限循环中给通道发送整数数据。不过因为没有接收者,只输出了一个数字 0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import "fmt"

func main() {
ch1 := make(chan int)
go pump(ch1) // pump hangs
fmt.Println(<-ch1) // prints only 0
}

func pump(ch chan int) {
for i := 0; ; i++ {
ch <- i
}
}

输出:

1
0

pump() 函数为通道提供数值,也被叫做生产者。

为通道解除阻塞定义了 suck 函数来在无限循环中读取通道:

1
2
3
4
5
func suck(ch chan int) {
for {
fmt.Println(<-ch)
}
}

main() 中使用协程开始它:

1
2
3
go pump(ch1)
go suck(ch1)
time.Sleep(1e9)

给程序 1 秒的时间来运行:输出了上万个整数。

同步通道-使用带缓冲的通道

一个无缓冲通道只能包含 1 个元素,有时显得很局限。我们给通道提供了一个缓存,可以在扩展的make命令中设置它的容量,如下:

1
2
buf := 100
ch1 := make(chan string, buf)

buf 是通道可以同时容纳的元素(这里是 string)个数

在缓冲满载(缓冲被全部使用)之前,给一个带缓冲的通道发送数据是不会阻塞的,而从通道读取数据也不会阻塞,直到缓冲空了。

缓冲容量和类型无关,所以可以(尽管可能导致危险)给一些通道设置不同的容量,只要他们拥有同样的元素类型。内置的 cap 函数可以返回缓冲区的容量。

如果容量大于 0,通道就是异步的了:缓冲满载(发送)或变空(接收)之前通信不会阻塞,元素会按照发送的顺序被接收。如果容量是0或者未设置,通信仅在收发双方准备好的情况下才可以成功。

同步:ch :=make(chan type, value)

  • value == 0 -> synchronous, unbuffered (阻塞)
  • value > 0 -> asynchronous, buffered(非阻塞)取决于value元素

使用 select 切换协程

从不同的并发执行的协程中获取值可以通过关键字select来完成,它和switch控制语句非常相似,也被称作通信开关;它的行为像是“你准备好了吗”的轮询机制;select监听进入通道的数据,也可以是用通道发送值的时候。

1
2
3
4
5
6
7
8
9
select {
case u:= <- ch1:
...
case v:= <- ch2:
...
...
default: // no value ready to be received
...
}

default 语句是可选的;fallthrough行为,和普通的 switch 相似,是不允许的。在任何一个 case中执行 break或者 returnselect就结束了。

select 做的就是:选择处理列出的多个通信情况中的一个。

  • 如果都阻塞了,会等待直到其中一个可以处理

  • 如果多个可以处理,随机选择一个

  • 如果没有通道操作可以处理并且写了 default 语句,它就会执行:default 永远是可运行的(这就是准备好了,可以执行)。

select 中使用发送操作并且有 default可以确保发送不被阻塞!如果没有 caseselect 就会一直阻塞。

select 语句实现了一种监听模式,通常用在(无限)循环中;在某种情况下,通过 break 语句使循环退出。

以下程序中有 2 个通道ch1ch2,三个协程 pump1()pump2()suck()。这是一个典型的生产者消费者模式。在无限循环中,ch1ch2 通过 pump1()pump2() 填充整数;suck() 也是在无限循环中轮询输入的,通过 select 语句获取ch1ch2 的整数并输出。选择哪一个case 取决于哪一个通道收到了信息。程序在 main执行 1 秒后结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan int)
ch2 := make(chan int)

go pump1(ch1)
go pump2(ch2)
go suck(ch1, ch2)

time.Sleep(1e9)
}

func pump1(ch chan int) {
for i := 0; ; i++ {
ch <- i * 2
}
}

func pump2(ch chan int) {
for i := 0; ; i++ {
ch <- i + 5
}
}

func suck(ch1, ch2 chan int) {
for {
select {
case v := <-ch1:
fmt.Printf("Received on channel 1: %d\n", v)
case v := <-ch2:
fmt.Printf("Received on channel 2: %d\n", v)
}
}
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
Received on channel 2: 5
Received on channel 2: 6
Received on channel 1: 0
Received on channel 2: 7
Received on channel 2: 8
Received on channel 2: 9
Received on channel 2: 10
Received on channel 1: 2
Received on channel 2: 11
...
Received on channel 2: 47404
Received on channel 1: 94346
Received on channel 1: 94348

新旧模型对比:任务和worker

假设我们需要处理很多任务;一个worker处理一项任务。任务可以被定义为一个结构体

1
2
3
type Task struct {
// some state
}
  • 旧模式:使用共享内存进行同步

由各个任务组成的任务池共享内存;为了同步各个worker以及避免资源竞争,我们需要对任务池进行加锁保护:

1
2
3
4
type Pool struct {
Mu sync.Mutex
Tasks []Task
}

worker代码可能这样写:

1
2
3
4
5
6
7
8
9
10
11
func Worker(pool *Pool) {
for {
pool.Mu.lock()
// begin critical section:
task := pool.Task[0] // take the first task
pool.Tasks = pool.Task[1:] // update the pool of tasks
// end critical section
pool.Mu.Unlock()
process(task)
}
}

这些worker有许多都可以并发执行;他们可以在go协程中启动。一个worker先将pool锁定,从pool获取第一项任务,再解锁和处理任务。加锁保证了同一时间只有一个go协程可以进入到pool中:一项任务有且只能被赋予一个worer。如果不加锁,则工作协程可能会在task:=pool.Task[0]发生切换,导致pool.Tasks=pool.Task[1:]结果异常:一些worker获取不到任务,而一些任务可能被多个worker得到。加锁实现同步的方式在工作协程比较少时可以工作的很好,但是当工作协程数量很大,任务量也很多时,处理效率将会因为频繁的加锁/解锁开销而降低。当工作协程数增加到一个阈值时,程序效率会急剧下降,这就成为了瓶颈。

  • 新模式:使用通道
    使用通道进行同步:使用一个通道接受需要处理的任务,一个通道接受处理完成的任务(及其结果)。worker在协程中启动,其数量N应该根据任务数量进行调整。

主线程扮演着Master节点角色,可能写成如下形式:

1
2
3
4
5
6
7
8
func main() {
pending, done := make(chan *Task), make(chan *Task)
go sendWork(pending) // put tasks with work on the channel
for i := 0; i < N; i++ { // start N goroutines to do work
go Worker(pending, done)
}
consumeWork(done) // continue with the processed tasks
}

worker的逻辑比较简单:从pending通道拿任务,处理后将其放到done通道中:

1
2
3
4
5
6
7
func Worker(in, out chan *Task) {
for {
t := <-in
process(t)
out <- t
}
}

这里并不使用锁:从通道得到新任务的过程没有任何竞争。随着任务数量增加,worker数量也应该相应增加,同时性能并不会像第一种方式那样下降明显。在pending通道中存在一份任务的拷贝,第一个worker从pending通道中获得第一个任务并进行处理,这里并不存在竞争。某一个任务会在哪一个worker中被执行是不可知的,反过来也是。worker数量的增多也会增加通信的开销,这会对性能有轻微的影响。

从这个简单的例子中可能很难看出第二种模式的优势,但含有复杂锁运用的程序不仅在编写上显得困难,也不容易编写正确,使用第二种模式的话,就无需考虑这么复杂的东西了。

因此,第二种模式对比第一种模式而言,不仅性能是一个主要优势,而且还有个更大的优势:代码显得更清晰、更优雅。

  • 怎么选择是该使用锁还是通道?

使用锁的情景:

1) 访问共享数据结构中的缓存信息

2) 保存应用程序上下文和状态信息数据

使用通道的情景:

1) 与异步操作的结果进行交互

2) 分发任务

3) 传递数据所有权

当你发现你的锁使用规则变得很复杂时,可以反省使用通道会不会使问题变得简单些。

------ 本文结束 ------
0%