声明:内容来自《Go语言编程》
[TOC]
1 并发基础
目前几种主流的实现模式:
多进程
多进程是在操作系统层面进行并发的基本模式。同时也是开销最大的模式。
多线程
多线程在大部分操作系统上都属于系统层面的并发模式,也是我们使用最多的 最有效的一种模式。它比多进程 的开销小很多,但是其开销依旧比较大,且在高并发模式下,效率会有影响。
基于回调的非阻塞/异步IO
使用多线程模式会很快耗尽服务器的内存和CPU资源。而这 种模式通过事件驱动的方式使用异步IO,使服务器持续运转,且尽可能地少用线程,降 低开销,它目前在Node.js中得到了很好的实践。
协程
协程(Coroutine)本质上是一种用户态线程,不需要操作系统来进行抢占式调度, 且在真正的实现中寄存于线程中,因此,系统开销极小,可以有效提高线程的任务并发 性,而避免多线程的缺点。
2 协程
Go语言在语言级别支持轻量级线程,叫goroutine。Go 语言标准库提供的所有系统调用操作(当然也包括所有同步IO操作),都会出让CPU给其他goroutine。轻量级线程的切换管理不依赖于系统的线程和进程,也不依赖于CPU的核心数量。
3 goroutine
goroutine是Go语言中的轻量级线程实现,由Go运行时(runtime)管理。
func Add(x, y int) { z := x + y fmt.Println(z) } go Add(1,1)
需要注意的是,如果这个函数有返回值,那么这个 返回值会被丢弃。
package main import "fmt" func Add(x, y int) { z := x + y fmt.Println(z) } func main() { for i := 0; i < 10; i++ { go Add(i, i) } }
Go程序从初始化main package并执行main()函数开始,当main()函数返回时,程序退出, 且程序并不等待其他goroutine(非主goroutine)结束。
4 并发通信
并发编程的难度在于协调,而协调就要通过交流。在工程上,有两种最常见的并发通信模型:共享数据和消息。
共享数据
指多个并发单元分别保持对同一个数据的引用,实现对该数据的共享。被共享的 数据可能有多种形式,比如内存数据块、磁盘文件、网络数据等。在实际工程应用中最常见的无 疑是内存了,也就是常说的共享内存。
消息机制
Go语言提供的是另一种通信模型,即以消息机制而非共享内存作为通信方式。消息机制认为每个并发单元是自包含的、独立的个体,并且都有自己的变量,但在不同并发 单元间这些变量不共享。每个并发单元的输入和输出只有一种,那就是消息。
5 channel
channel是进程内的通信方式,如果需要跨进程通信,我们建议用 分布式系统的方法来解决,比如使用Socket或者HTTP等通信协议。,一个channel只能传递一种类型的值,这个类型需要在声明channel时指定。
package main import "fmt" func Count(ch chan int) { ch <- 1 fmt.Println("Counting") } func main() { chs := make([]chan int, 10) for i := 0; i < 10; i++ { chs[i] = make(chan int) go Count(chs[i]) } for _, ch := range chs { <-ch } }
在这个例子中,我们定义了一个包含10个channel的数组(名为chs),并把数组中的每个 channel分配给10个不同的goroutine。在每个goroutine的Add()函数完成后,我们通过ch <- 1语 句向对应的channel中写入一个数据。在这个channel被读取前,这个操作是阻塞的。在所有的 goroutine启动完成后,我们通过<-ch语句从10个channel中依次读取数据。在对应的channel写入 数据前,这个操作也是阻塞的。这样,我们就用channel实现了类似锁的功能,进而保证了所有 goroutine完成后主函数才返回。
5.1 基本语法
var ch chan int var m map[string] chan bool //声明一个map,key是string,value是channel ch := make(chan int) //写入数据 ch <- value //读出数据 value := <-ch
向channel写入数据通常会导致程序阻塞,直到有其他goroutine从这个channel中读取数据。如果channel之前没有写入数据,那么从channel中读取数据也会导致程序阻塞,直到channel 中被写入数据为止。
5.2 select
通过调用select()函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了IO动作,该select()调用就会被返回。后来该机制也被用于 实现高并发的Socket服务器程序。Go语言直接在语言级别支持select关键字,用于处理异步IO 问题。
select有比较多的 限制,其中最大的一条限制就是每个case语句里必须是一个IO操作。
select { case <-chan1: // 如果chan1成功读到数据,则进行该case处理语句 case chan2 <- 1: // 如果成功向chan2写入数据,则进行该case处理语句 default: // 如果上面都没有成功,则进入default处理流程 }
select的每个case语句都必须是一个面向channel的操作。
5.3 缓冲机制
给channel带上缓冲, 从而达到消息队列的效果。要创建一个带缓冲的channel,其实也非常容易:
c := make(chan int, 1024)
在调用make()时将缓冲区大小作为第二个参数传入即可,比如上面这个例子就创建了一个大小 为1024的int类型channel,即使没有读取方,写入方也可以一直往channel里写入,在缓冲区被 填完之前都不会阻塞。
5.4 超时机制
在并发编程的通信过程中,最需要处理的就是超时问题,即向channel写数据时发现channel 已满,或者从channel试图读取数据时发现channel为空。如果不正确处理这些情况,很可能会导 致整个goroutine锁死。
使用channel时需要小心,比如对于以下这个用法:
i:= <-ch
如果出现了一个错误情况,即永远都没有人往ch里写数据,那 么上述这个读取动作也将永远无法从ch中读取到数据,导致的结果就是整个goroutine永远阻塞并 没有挽回的机会。
Go语言没有提供直接的超时处理机制,但我们可以利用select机制。虽然select机制不是 专为超时而设计的,却能很方便地解决超时问题。因为select的特点是只要其中一个case已经 完成,程序就会继续往下执行,而不会考虑其他case的情况。
timeout := make(chan bool, 1) go func() { time.Sleep(1e9) // 等待1秒钟 timeout <- true }() // 然后我们把timeout这个channel利用起来 select { case <-ch: // 从ch中读取到数据 case <-timeout: // 一直没有从ch中读取到数据,但从timeout中读取到了数据 }
这样使用select机制可以避免永久等待的问题,是在Go语言开发中避免channel通信超时的最有效方法。
5.5 channel的传递
channel本身也是一个原生类型,具有可被传递的特性。
示例:定义一系列PipeData的数据结构并一起传递给一个函数,就可以达到流式处理数据的目的。
type PipeData struct { value int handler func(int) int next chan int } func handle(queue chan *PipeData) { for data := range queue { data.next <- data.handler(data.value) } }
5.6 单向channel
单向channel只能用于发送或者接收数据。channel本身必然是同时支持读写的, 否则根本没法用。在此,我们只是对其增加一些限制。
单向channel变量的声明非常简单,如下:
var ch1 chan int // ch1是一个正常的channel,不是单向的 var ch2 chan<- float64// ch2是单向channel,只用于写float64数据 var ch3 <-chan int // ch3是单向channel,只用于读取int数据
channel是一个原生类型,因此不仅 支持被传递,还支持类型转换。
初始化:
ch4 := make(chan int) ch5 := <-chan int(ch4) // ch5就是一个单向的读取channel ch6 := chan<- int(ch4) // ch6 是一个单向的写入channel
基于ch4,我们通过类型转换初始化了两个单向channel:单向读的ch5和单向写的ch6。
5.7 关闭channel
关闭channel非常简单,直接使用Go语言内置的close()函数即可:
close(ch)
如何判断一个channel是否已经被关 闭?我们可以在读取的时候使用多重返回值的方式:
x,ok := <- ch
6 多核并行化
下面我们来模拟一个完全可以并行的计算任务:计算N个整型数的总和。我们可以将所有整 型数分成M份,M即CPU的个数。让每个CPU开始计算分给它的那份计算任务,最后将每个CPU 的计算结果再做一次累加,这样就可以得到所有N个整型数的总和:
type Vector []float64 // 分配给每个CPU的计算任务 func (v Vector) DoSome(i, n int, u Vector, c chan int) { for ; i < n; i++ { v[i] += u.Op(v[i]) } c <- 1 // 发信号告诉任务管理者我已经计算完成了 } const NCPU = 16 // 假设总共有16核 func (v Vector) DoAll(u Vector) { c := make(chan int, NCPU) // 用于接收每个CPU的任务完成信号 for i := 0; i < NCPU; i++ { go v.DoSome(i*len(v)/NCPU, (i+1)*len(v)/NCPU, u, c) } // 等待所有CPU的任务完成 for i := 0; i < NCPU; i++ { <-c // 获取到一个数据,表示一个CPU计算完成了 } // 到这里表示所有计算已经结束 }
这或许可能只在一个核上跑。在Go语言升级到默认支持多CPU的某个版本之前,我们可以先通过设置环境变量 GOMAXPROCS的值来控制使用多少个CPU核心。具体操作方法是通过直接设置环境变量 GOMAXPROCS的值,或者在代码中启动goroutine之前先调用以下这个语句以设置使用16个CPU 核心:
runtime.GOMAXPROCS(16)
到底应该设置多少个CPU核心呢,其实runtime包中还提供了另外一个函数NumCPU()来获 取核心数。
7 出让时间片
我们可以在每个goroutine中控制何时主动出让时间片给其他goroutine,这可以使用runtime 包中的Gosched()函数实现。
如果要比较精细地控制goroutine的行为,就必须比较深入地了解Go语言开发包中 runtime包所提供的具体功能。
8 同步
Go语言包中的sync包提供了两种锁类型:sync.Mutex和sync.RWMutex。
当一个goroutine获得了Mutex后,其他goroutine就只能乖乖等 到这个goroutine释放该Mutex。RWMutex相对友好些,是经典的单写多读模型。
8.1 全局唯一性操作
Go语言提供了一个Once 类型来保证全局的唯一性操作,具体代码如下:
var a string var once sync.Once func setup() { a = "hello, world" } func doprint() { once.Do(setup) print(a) } func twoprint() { go doprint() go doprint() }
once的Do()方法可以保证在全局范围内只调用指定的函数一次(这里指 setup()函数),而且所有其他goroutine在调用到此语句时,将会先被阻塞,直至全局唯一的 once.Do()调用结束后才继续(继续执行此语句后的语句,此语句不再执行)。
为了更好地控制并行中的原子性操作,sync包中还包含一个atomic子包,它提供了对于一 些基础数据类型的原子操作函数,比如下面这个函数:
func CompareAndSwapUint64(val *uint64, old, new uint64) (swapped bool)
就提供了比较和交换两个uint64类型数据的操作。这让开发者无需再为这样的操作专门添加 Lock操作。