Don't communicate by sharing memory;share memory bu communicating. 不要通过共享内存来通信,而应该通过通信来共享内存。
通道与goroutine共同代表Go语言独有的并发编程模式和编程哲学,利用通道在多个goroutine之间传递数据。
通道类型的值,本身就是并发安全的。这是Go语言自带的唯一一个可以满足并发安全性的类型。
在声明并初始化一个通道的时候,需要使用Go内建函数make()
,传给这个函数的第一个参数应该是代表了通道的具体类型的类型字面量,第二个参数是一个int类型的值,不能小于0,表示通道的容量(该参数可选)。
非缓冲通道和缓冲通道有不同的数据传递方式。
声明一个通道类型变量的时候,首先要确定该通道类型的元素类型,这决定了通过这个通道传递声明类型的数据。
chan int
// chan 表示通道类型的关键字
// int 说明该通道类型的元素类型
ch1 := make(chan int,3)
一个通道相当一个FIFO队列,通道中各个元素严格按照发送顺序排列,元素值的发送和接收都用到操作符<-
,称为接送操作符,该符号形象的表示了元素值的传输方向。
对发送与接收操作的基本特性:
对同一个通道,发送操作之间是互斥的,接收操作之间也是互斥的:
在同一时刻,Go语言运行时系统只会执行对同一个通道的任意个发送操作中的某一个。直到这个元素值被完全复制进该通道之后,其他针对该通道的发送操作才可能被执行。接收操作也是一样的,即使操作是并发执行的也是如此。
对于通道内的同一个元素值,发送操作和接收操作之间也是互斥的。即使一个正在被复制进通道但还未复制完成的元素值,也绝不会被想接收它的一方看到和取走。
元素值从外界进入通道时是被复制,即进入通道的并不是接收操作符右边的那个元素值,而是它的副本。
元素从通道进入外界时会被移动:
发送操作和接收操作中对元素值的处理都是不可分割的:
不可分割表示处理通道中元素的操作是一个原子操作:
发送操作和接收操作在完全完成之前会被阻塞
发送操作:
接收操作:
在所有步骤完全完成前,发起该操作的代码会一直阻塞,直到该代码所在goroutine收到了运行时系统的通知并重新获得运行机会为止。
如此阻塞代码就是为了实现操作的互斥和元素值的完整。
由于发送操作在这种情况下被阻塞后,它们所在的goroutine会顺序地进入通道内部的发送等待队列,所以通知的顺序是公平的。
因此而等待的所有接收操作所在的goroutine都会按照先后顺序被放入通道内部的接收等待队列。
缓冲通道作为收发双方的中间件,元素值先从发送方复制到缓冲通道,之后再由缓冲通道复制给接收方。当发送操作在执行的时候发现空的通道中,正好有等待的接收操作,那么它会直接把元素值复制给接收方。
无论是发送操作还是接收操作,一开始执行就会被阻塞,直到配对的操作也开始执行,才会继续传递。
非缓冲通道是在用同步的方式传递数据,只有收发双方对接上了,数据才会被传递。
数据直接从发送方复制到接收方,中间并没有非缓冲通道做中转,相比之下,缓冲通道则在用异步的方式传递数据。
对值为nil
的通道,不论它的具体类型是什么,对它的发送和接收操作都会永久地处于阻塞状态。它们所属的goroutine中的任何代码都不再会被执行。
通道类型是引用,所以它的零值就是nil,只声明该类型的变量但没有用
make()
函数对它初始化时,该变量的值就是nil。
接收操作返回两个值:
如果通道关闭时,里面有元素值未取出,接收操作会把通道中的值按顺序取值之后,在返回通道已关闭的false判断。因此,通过接收操作的返回值的第二个值来判断通道是否关闭有延迟。
除非有特殊保证,否则让发送方关闭通道,而不是接收方。
一个通道是双向还是单向,由类型字面量体现。
var uselessChan = make(chan<- int,1) // 发送通道,只能发(往通道中发送)
uselessChan = make(<-chan int,1) // 接收通道,只能收(从通道中接收)
站在操作通道的代码的角度,看单向通道是发送通道还是接收通道。
单向通道最主要的用途是约束其他代码的行为。
例子:
func SendInt(ch chan<- int) {
ch <- rand.Intn(1000)
}
// SendInt函数,只能接受一个发送通道,函数中的代码只能向通道中发送元素值,而不能从通道中接收元素值
在实际场景中,约束一般出现在接口类型声明中的某个方法定义上,或者,声明函数类型时,如果使用单向通道,相当于约束所有实现这个函数类型的函数。在编写模板代码或可扩展的程序库是很有用。
type Notifier interface {
SendInt(ch chan<- int)
}
// 定义SendInt方法,参数是一个发送通道
// 该接口的所实现类型中的SendInt方法都受到限制
// 在调用SendInt函数时,只需要将一个双向通道作为参数传递给它,Go语言会自定把它转换为所需的单向通道
在接口类型声明的花括号中,每一行代表一个方法的定义。
接口中方法定义与函数声明很类似,只包含方法名,参数列表和结果列表。一个类型如果想要成为一个接口的实现类型,必须实现接口中定义的所有方法。因此某个方法中定义了单向通道,那么相当于对它的所有实现做出约束。
func getIntChan() <-chan int {
num := 5
ch := make(chan int, num)
for i := 0; i < num; i++ {
ch <- i
}
close(ch)
return ch
}
// 在函数的结果类别中使用单向通道
// 得到该通道的程序,只能从通道中接收元素值,这是对函数调用方法的一种约束
intChan2 := getIntChan()
for elem := range intChan2 {
fmt.Printf("The element in intChan2: %v\n", elem)
}
// for语句循环的从单向通道中取出元素值
对上述for语句的解释:
上述三点是带range子句的for循环与通道的联系,Go还有专门操作通道的select语句。
select语句只能与通道联用,一般由若干个分支组成,每次执行select语句的时候,只有一个分支中的代码会被执行。
select语句的分支:
case
开头,后面是一个case表达式和一个冒号,从下一行开始写入,分支被选中时需要执行的语句default case
,当且仅当没有候选分支被选中时,它才会被执行,default开头后面直接是冒号,从下一行开始写入要执行的语句select语句是专门为通道而设计的,每个case表达式中只能包含操作通道的表达式,如接收表达式。
例子:
// 准备好几个通道。
intChannels := [3]chan int{
make(chan int, 1),
make(chan int, 1),
make(chan int, 1),
}
// 随机选择一个通道,并向它发送元素值。
index := rand.Intn(3)
fmt.Printf("The index: %d\n", index)
intChannels[index] <- index
// 哪一个通道中有可取的元素值,哪个对应的分支就会被执行。
select {
case <-intChannels[0]:
fmt.Println("The first candidate case is selected.")
case <-intChannels[1]:
fmt.Println("The second candidate case is selected.")
case elem := <-intChannels[2]:
fmt.Printf("The third candidate case is selected, the element is %d.\n", elem)
default:
fmt.Println("No candidate case is selected!")
}
select语句的注意点:
intChan := make(chan int, 1)
// 一秒后关闭通道。
time.AfterFunc(time.Second, func() {
close(intChan)
})
select {
case _, ok := <-intChan:
if !ok {
fmt.Println("The candidate case is closed.")
break
}
fmt.Println("The candidate case is selected.")
}
time.AfterFunc()
<标识>
来结束(标识定义在for循环之外)下面每一种模式的设计都依赖于 channel。
barrier 屏障模式故名思义就是一种屏障,用来阻塞直到聚合所有 goroutine 返回结果。可以使用 channel 来实现。
/*
* Barrier
*/
type barrierResp struct {
Err error
Resp string
Status int
}
// 构造请求
func makeRequest(out chan<- barrierResp, url string) {
res := barrierResp{}
client := http.Client{
Timeout: time.Duration(2*time.Microsecond),
}
resp, err := client.Get(url)
if resp != nil {
res.Status = resp.StatusCode
}
if err != nil {
res.Err = err
out <- res
return
}
byt, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
res.Err = err
out <- res
return
}
res.Resp = string(byt)
out <- res
}
// 合并结果
func barrier(endpoints ...string) {
requestNumber := len(endpoints)
in := make(chan barrierResp, requestNumber)
response := make([]barrierResp, requestNumber)
defer close(in)
for _, endpoints := range endpoints {
go makeRequest(in, endpoints)
}
var hasError bool
for i := 0; i < requestNumber; i++ {
resp := <-in
if resp.Err != nil {
fmt.Println("ERROR: ", resp.Err, resp.Status)
hasError = true
}
response[i] = resp
}
if !hasError {
for _, resp := range response {
fmt.Println(resp.Status)
}
}
}
func main() {
barrier([]string{"https://www.baidu.com", "http://www.sina.com", "https://segmentfault.com/"}...)
}
Barrier 模式也可以使用 golang.org/x/sync/errgroup
扩展库来实现,这样更加简单明了。这个包有点类似于 sync.WaitGroup
,但是区别是当其中一个任务发生错误时,可以返回该错误。
func barrier(endpoints ...string) {
var g errgroup.Group
var mu sync.Mutex
response := make([]barrierResp, len(endpoints))
for i, endpoint := range endpoints {
i, endpoint := i, endpoint // create locals for closure below
g.Go(func() error {
res := barrierResp{}
resp, err := http.Get(endpoint)
if err != nil {
return err
}
byt, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
return err
}
res.Resp = string(byt)
mu.Lock()
response[i] = res
mu.Unlock()
return err
})
}
if err := g.Wait(); err != nil {
fmt.Println(err)
}
for _, resp := range response {
fmt.Println(resp.Status)
}
}
常用在异步处理也称为 Promise 模式,采用一种 fire-and-forget 的方式,是指主 goroutine 不等子 goroutine 执行完就直接返回了,然后等到未来执行完的时候再去取结果。在 Go 中由于 goroutine 的存在,实现这种模式是挺简单的。
/*
* Future
*/
type Function func(string) (string, error)
type Future interface {
SuccessCallback() error
FailCallback() error
Execute(Function) (bool, chan struct{})
}
type AccountCache struct {
Name string
}
func (a *AccountCache) SuccessCallback() error {
fmt.Println("It's success~")
return nil
}
func (a *AccountCache) FailCallback() error {
fmt.Println("It's fail~")
return nil
}
func (a *AccountCache) Execute(f Function) (bool, chan struct{}){
// 空 struct 在 Go 中占的内存是最少的
done := make(chan struct{})
go func(a *AccountCache) {
_, err := f(a.Name)
if err != nil {
_ = a.FailCallback()
} else {
_ = a.SuccessCallback()
}
done <- struct{}{}
}(a)
return true, done
}
func NewAccountCache(name string) *AccountCache {
return &AccountCache{
name,
}
}
func testFuture() {
var future Future
future = NewAccountCache("Tom")
updateFunc := func(name string) (string, error){
fmt.Println("cache update:", name)
return name, nil
}
_, done := future.Execute(updateFunc)
defer func() {
<-done
}()
}
func main() {
var future Future
future = NewAccountCache("Tom")
updateFunc := func(name string) (string, error){
fmt.Println("cache update:", name)
return name, nil
}
_, done := future.Execute(updateFunc)
defer func() {
<-done
}()
// do something
}
注意和 Barrire 模式不同的是,它是按顺序的,类似于流水线,通过 buffer channel 将多个goroutine串起来,只要前序 goroutine 处理完一部分数据,就往下传递,达到并行的目的。
/*
* Pipeline 模式
*
* 实现一个功能,给定一个切片,然后求它的子项的平方和。
*
* 例如,[1, 2, 3] -> 1^2 + 2^2 + 3^2 = 14。
*
* 正常的逻辑,遍历切片,然后求平方累加。使用 pipeline 模式,可以把求和和求平方拆分出来并行计算。
*/
func generator(max int) <-chan int{
out := make(chan int, 100)
go func() {
for i := 1; i <= max; i++ {
out <- i
}
close(out)
}()
return out
}
func power(in <-chan int) <-chan int{
out := make(chan int, 100)
go func() {
for v := range in {
out <- v * v
}
close(out)
}()
return out
}
func sum(in <-chan int) <-chan int{
out := make(chan int, 100)
go func() {
var sum int
for v := range in {
sum += v
}
out <- sum
close(out)
}()
return out
}
func main() {
// [1, 2, 3]
fmt.Println(<-sum(power(generator(3))))
}
在 Go 中 goroutine 已经足够轻量,甚至 net/http server 的处理方式也是 goroutine-per-connection 的,所以比起其他语言来说可能场景稍微少一些。每个 goroutine 的初始内存消耗在 2~8kb,当我们有大批量任务的时候,需要起很多 goroutine 来处理,这会给系统代理很大的内存开销和 GC 压力,这个时候就可以考虑一下协程池。
/*
* Worker pool
*/
type TaskHandler func(interface{})
type Task struct {
Param interface{}
Handler TaskHandler
}
type WorkerPoolImpl interface {
AddWorker() // 增加 worker
SendTask(Task) // 发送任务
Release() // 释放
}
type WorkerPool struct {
wg sync.WaitGroup
inCh chan Task
}
func (d *WorkerPool) AddWorker() {
d.wg.Add(1)
go func(){
for task := range d.inCh {
task.Handler(task.Param)
}
d.wg.Done()
}()
}
func (d *WorkerPool) Release() {
close(d.inCh)
d.wg.Wait()
}
func (d *WorkerPool) SendTask(t Task) {
d.inCh <- t
}
func NewWorkerPool(buffer int) WorkerPoolImpl {
return &WorkerPool{
inCh: make(chan Task, buffer),
}
}
func main() {
bufferSize := 100
var workerPool = NewWorkerPool(bufferSize)
workers := 4
for i := 0; i < workers; i++ {
workerPool.AddWorker()
}
var sum int32
testFunc := func (i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
}
var i, n int32
n = 1000
for ; i < n; i++ {
task := Task{
i,
testFunc,
}
workerPool.SendTask(task)
}
workerPool.Release()
fmt.Println(sum)
}
发布订阅模式是一种消息通知模式,发布者发送消息,订阅者接收消息。
/*
* Pub/Sub
*/
type Subscriber struct {
in chan interface{}
id int
topic string
stop chan struct{}
}
func (s *Subscriber) Close() {
s.stop <- struct{}{}
close(s.in)
}
func (s *Subscriber) Notify(msg interface{}) (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("%#v", rec)
}
}()
select {
case s.in <-msg:
case <-time.After(time.Second):
err = fmt.Errorf("Timeout\n")
}
return
}
func NewSubscriber(id int) SubscriberImpl {
s := &Subscriber{
id: id,
in: make(chan interface{}),
stop: make(chan struct{}),
}
go func() {
for{
select {
case <-s.stop:
close(s.stop)
return
default:
for msg := range s.in {
fmt.Printf("(W%d): %v\n", s.id, msg)
}
}
}}()
return s
}
// 订阅者需要实现的方法
type SubscriberImpl interface {
Notify(interface{}) error
Close()
}
// sub 订阅 pub
func Register(sub Subscriber, pub *publisher){
pub.addSubCh <- sub
return
}
// pub 结果定义
type publisher struct {
subscribers []SubscriberImpl
addSubCh chan SubscriberImpl
removeSubCh chan SubscriberImpl
in chan interface{}
stop chan struct{}
}
// 实例化
func NewPublisher () *publisher{
return &publisher{
addSubCh: make(chan SubscriberImpl),
removeSubCh: make(chan SubscriberImpl),
in: make(chan interface{}),
stop: make(chan struct{}),
}
}
// 监听
func (p *publisher) start() {
for {
select {
// pub 发送消息
case msg := <-p.in:
for _, sub := range p.subscribers{
_ = sub.Notify(msg)
}
// 移除指定 sub
case sub := <-p.removeSubCh:
for i, candidate := range p.subscribers {
if candidate == sub {
p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
candidate.Close()
break
}
}
// 增加一个 sub
case sub := <-p.addSubCh:
p.subscribers = append(p.subscribers, sub)
// 关闭 pub
case <-p.stop:
for _, sub := range p.subscribers {
sub.Close()
}
close(p.addSubCh)
close(p.in)
close(p.removeSubCh)
return
}
}
}
func main() {
// 测试代码
pub := NewPublisher()
go pub.start()
sub1 := NewWriterSubscriber(1)
Register(sub1, pub)
sub2 := NewWriterSubscriber(2)
Register(sub2, pub)
commands:= []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
for _, c := range commands {
pub.in <- c
}
pub.stop <- struct{}{}
time.Sleep(time.Second*1)
}