08-通道

Don't communicate by sharing memory;share memory bu communicating. 不要通过共享内存来通信,而应该通过通信来共享内存。

通道与goroutine共同代表Go语言独有的并发编程模式和编程哲学,利用通道在多个goroutine之间传递数据。

通道类型的值,本身就是并发安全的。这是Go语言自带的唯一一个可以满足并发安全性的类型。

在声明并初始化一个通道的时候,需要使用Go内建函数make(),传给这个函数的第一个参数应该是代表了通道的具体类型的类型字面量,第二个参数是一个int类型的值,不能小于0,表示通道的容量(该参数可选)。

  • 当容量 = 0 ,表示非缓冲通道
  • 当容量 > 0 ,表示缓冲通道

非缓冲通道和缓冲通道有不同的数据传递方式。

声明一个通道类型变量的时候,首先要确定该通道类型的元素类型,这决定了通过这个通道传递声明类型的数据。

chan int
// chan 表示通道类型的关键字
// int 说明该通道类型的元素类型

ch1 := make(chan int,3)

一个通道相当一个FIFO队列,通道中各个元素严格按照发送顺序排列,元素值的发送和接收都用到操作符<-,称为接送操作符,该符号形象的表示了元素值的传输方向

对发送与接收操作的基本特性:

  1. 对同一个通道,发送操作之间是互斥的,接收操作之间也是互斥的:

    在同一时刻,Go语言运行时系统只会执行对同一个通道的任意个发送操作中的某一个。直到这个元素值被完全复制进该通道之后,其他针对该通道的发送操作才可能被执行。接收操作也是一样的,即使操作是并发执行的也是如此。

    对于通道内的同一个元素值,发送操作和接收操作之间也是互斥的。即使一个正在被复制进通道但还未复制完成的元素值,也绝不会被想接收它的一方看到和取走。

    元素值从外界进入通道时是被复制,即进入通道的并不是接收操作符右边的那个元素值,而是它的副本

    元素从通道进入外界时会被移动:

    1. 生成正在通道中的这个元素值的副本,并准备给到接收方
    2. 删除在通道中的这个元素值
  2. 发送操作和接收操作中对元素值的处理都是不可分割的:

    不可分割表示处理通道中元素的操作是一个原子操作:

    1. 发送操作要么没复制值,要么已经复制完毕
    2. 接收操作在准备好元素值副本之后,一定会删除掉通道中的原值,绝不会出现有残留的情况
  3. 发送操作和接收操作在完全完成之前会被阻塞

    发送操作:

    1. 复制元素值
    2. 放置副本到通道内部

    接收操作:

    1. 复制通道内的元素值
    2. 放置副本到接收方
    3. 删除原值

    在所有步骤完全完成前,发起该操作的代码会一直阻塞,直到该代码所在goroutine收到了运行时系统的通知并重新获得运行机会为止。

    如此阻塞代码就是为了实现操作的互斥和元素值的完整

0.1. 长时间阻塞的发送和接收操作

0.1.1. 缓冲通道

  1. 如果通道已满,对它的所有发送操作都会被阻塞,直到通道中有元素值被接收走,此时通道会优先通知最早因此而等待的那个发送操作所在的goroutine,然后再次执行发送操作。

由于发送操作在这种情况下被阻塞后,它们所在的goroutine会顺序地进入通道内部的发送等待队列,所以通知的顺序是公平的。

  1. 如果通道已空,对它的所有接收操作都会被阻塞,直到通道中有新的元素出现,此时通道会优先通知最早等待的那个接收操作所在的goroutine,并使它再次执行接收操作。

因此而等待的所有接收操作所在的goroutine都会按照先后顺序被放入通道内部的接收等待队列。

缓冲通道作为收发双方的中间件,元素值先从发送方复制到缓冲通道,之后再由缓冲通道复制给接收方。当发送操作在执行的时候发现空的通道中,正好有等待的接收操作,那么它会直接把元素值复制给接收方

0.1.2. 非缓冲通道

无论是发送操作还是接收操作,一开始执行就会被阻塞,直到配对的操作也开始执行,才会继续传递。

非缓冲通道是在用同步的方式传递数据,只有收发双方对接上了,数据才会被传递。

数据直接从发送方复制到接收方,中间并没有非缓冲通道做中转,相比之下,缓冲通道则在用异步的方式传递数据。

0.1.3. 错误使用通道

对值为nil的通道,不论它的具体类型是什么,对它的发送和接收操作都会永久地处于阻塞状态。它们所属的goroutine中的任何代码都不再会被执行。

通道类型是引用,所以它的零值就是nil,只声明该类型的变量但没有用make()函数对它初始化时,该变量的值就是nil。

0.2. 引起panic的发送和接收操作

  1. 对已关闭的通道进行发送操作(接收操作可以感知到通道已经关闭,并安全退出)
  2. 关闭已经关闭的通道

接收操作返回两个值:

  • 元素值
  • 接收操作成功与否

如果通道关闭时,里面有元素值未取出,接收操作会把通道中的值按顺序取值之后,在返回通道已关闭的false判断。因此,通过接收操作的返回值的第二个值来判断通道是否关闭有延迟。

除非有特殊保证,否则让发送方关闭通道,而不是接收方。

0.3. 单向通道

  • 通常说的通道都是双向通道,可以发也可以收。
  • 单向通道:只能发或者之只能收。

一个通道是双向还是单向,由类型字面量体现。

var uselessChan = make(chan<- int,1)    // 发送通道,只能发(往通道中发送)

uselessChan = make(<-chan int,1)        // 接收通道只能收从通道中接收

站在操作通道的代码的角度,看单向通道是发送通道还是接收通道

0.3.1. 单向通道的价值

单向通道最主要的用途是约束其他代码的行为

例子:

func SendInt(ch chan<- int) {
    ch <- rand.Intn(1000)
}
// SendInt函数只能接受一个发送通道函数中的代码只能向通道中发送元素值而不能从通道中接收元素值

在实际场景中,约束一般出现在接口类型声明中的某个方法定义上,或者,声明函数类型时,如果使用单向通道,相当于约束所有实现这个函数类型的函数。在编写模板代码或可扩展的程序库是很有用

type Notifier interface {
    SendInt(ch chan<- int)
}
// 定义SendInt方法,参数是一个发送通道
// 该接口的所实现类型中的SendInt方法都受到限制

// 在调用SendInt函数时只需要将一个双向通道作为参数传递给它Go语言会自定把它转换为所需的单向通道

在接口类型声明的花括号中,每一行代表一个方法的定义。

接口中方法定义与函数声明很类似,只包含方法名,参数列表和结果列表。一个类型如果想要成为一个接口的实现类型,必须实现接口中定义的所有方法。因此某个方法中定义了单向通道,那么相当于对它的所有实现做出约束

func getIntChan() <-chan int {
    num := 5
    ch := make(chan int, num)
    for i := 0; i < num; i++ {
        ch <- i
    }
    close(ch)
    return ch
}

// 在函数的结果类别中使用单向通道
// 得到该通道的程序,只能从通道中接收元素值,这是对函数调用方法的一种约束

intChan2 := getIntChan()
for elem := range intChan2 {
    fmt.Printf("The element in intChan2: %v\n", elem)
}

// for语句循环的从单向通道中取出元素值

对上述for语句的解释:

  1. for语句会不断尝试从initChan2取出元素值,即使通道被关闭,也会取出所有剩余的元素值之后再结束执行
  2. 单向通道中没有元素值时,代码会被阻塞在for关键字那一行
  3. initChan2的值为nil,代码会被永远阻塞在for关键字那一行

上述三点是带range子句的for循环与通道的联系,Go还有专门操作通道的select语句。

0.4. select语句

select语句只能与通道联用,一般由若干个分支组成,每次执行select语句的时候,只有一个分支中的代码会被执行。

select语句的分支:

  1. 候选分支:以关键字case开头,后面是一个case表达式和一个冒号,从下一行开始写入,分支被选中时需要执行的语句
  2. 默认分支:default case当且仅当没有候选分支被选中时,它才会被执行,default开头后面直接是冒号,从下一行开始写入要执行的语句

select语句是专门为通道而设计的,每个case表达式中只能包含操作通道的表达式,如接收表达式。

例子:

// 准备好几个通道。
intChannels := [3]chan int{
    make(chan int, 1),
    make(chan int, 1),
    make(chan int, 1),
}

// 随机选择一个通道,并向它发送元素值。
index := rand.Intn(3)
fmt.Printf("The index: %d\n", index)
intChannels[index] <- index

// 哪一个通道中有可取的元素值,哪个对应的分支就会被执行。
select {
case <-intChannels[0]:
    fmt.Println("The first candidate case is selected.")
case <-intChannels[1]:
    fmt.Println("The second candidate case is selected.")
case elem := <-intChannels[2]:
    fmt.Printf("The third candidate case is selected, the element is %d.\n", elem)
default:
    fmt.Println("No candidate case is selected!")
}

select语句的注意点:

  1. 设置默认分支后,无论涉及通道操作的表达式是否有阻塞,select语句都不会被阻塞。
  2. 如果没有默认分支,一旦所有case表达式都没有满足求值条件,那么select语句就会被阻塞,直到至少有一个case表达式满足条件为止。
  3. 当通道关闭后,会从通道中接收到其元素类型的零值,所以需要接收表达式的第二个结果值来判断通道是否关闭。一旦发现某个通道关闭了,应该及时屏蔽对应的分支或采取其他措施。
  4. select语句只能对其中的每一个case表达式各求值一次。如果连续或定时地操作其中的通道,就需要通过for语句中嵌入select语句的方式实现。简单地在select语句的分支中使用break语句,只能结束当前的select语句的执行,而并不会对外层的for语句产生作用。这种错误的用法可能会让这个for语句无休止的运行下去。如下面的例子。
intChan := make(chan int, 1)
// 一秒后关闭通道。
time.AfterFunc(time.Second, func() {
    close(intChan)
})
select {
case _, ok := <-intChan:
    if !ok {
        fmt.Println("The candidate case is closed.")
        break
    }
    fmt.Println("The candidate case is selected.")
}

0.4.1. 分支选择规则

  1. 每一个case表达式,至少有一个发送或接收操作,也可以包含其他的表达式。多个表达式从左到右顺序被求值
  2. select语句包含的候选分支中的case表达式会在该语句执行时先被求值,求值顺序从代码编写的顺序从上往下,所有分支都会被求值,从上到下,从左往右
  3. case表达式中的发送或接收操作处于阻塞状态时,该case表达式的求值就不成功,即候选分支不满足条件
  4. 只有当所有case表达式都被求值完成后,才开始选择候选分支。只会挑选满足条件的候选分支执行
    1. 所有候选分支都不满足条件,选择默认分支
    2. 没有默认分支,select语句处于阻塞状态,直到至少有一个候选分支满足条件为止
  5. 如果同时有多个候选分支满足条件,用伪随机算法在候选分支中选择一个,然后执行
  6. 一个select语句只有一个默认分支,并且默认分支只在无候选分支可选的时候才会执行,与编写位置无关
  7. select语句的每次执行,包括case表达式求值和分支选择都是独立的,它的执行是否并发安全,要看其中的case表达式以及分支中是否包含并发不安全的代码

0.4.2. 四大用法

  • 满足条件的case是随机选择的
  • 增加超时机制time.AfterFunc()
  • 检查channel是否已满
  • for+select,要在 select 区间直接结束掉 for 循环,只能使用 break <标识> 来结束(标识定义在for循环之外)

0.5. 并发设计模式

下面每一种模式的设计都依赖于 channel。

0.5.1. Barrier模式

barrier 屏障模式故名思义就是一种屏障,用来阻塞直到聚合所有 goroutine 返回结果。可以使用 channel 来实现。

0.5.1.1. 使用场景

  • 多个网络请求并发,聚合结果
  • 粗粒度任务拆分并发执行,聚合结果
/*
* Barrier
*/
type barrierResp struct {
   Err error
   Resp string
   Status int
}

// 构造请求
func makeRequest(out chan<- barrierResp, url string) {
   res := barrierResp{}

   client := http.Client{
       Timeout: time.Duration(2*time.Microsecond),
  }

   resp, err := client.Get(url)
   if resp != nil {
       res.Status = resp.StatusCode
  }
   if err != nil {
       res.Err = err
       out <- res
       return
  }

   byt, err := ioutil.ReadAll(resp.Body)
   defer resp.Body.Close()
   if err != nil {
       res.Err = err
       out <- res
       return
  }

   res.Resp = string(byt)
   out <- res
}

// 合并结果
func barrier(endpoints ...string) {
   requestNumber := len(endpoints)

   in := make(chan barrierResp, requestNumber)
   response := make([]barrierResp, requestNumber)

   defer close(in)

   for _, endpoints := range endpoints {
       go makeRequest(in, endpoints)
  }

   var hasError bool
   for i := 0; i < requestNumber; i++ {
       resp := <-in
       if resp.Err != nil {
           fmt.Println("ERROR: ", resp.Err, resp.Status)
           hasError = true
      }
       response[i] = resp
  }
   if !hasError {
       for _, resp := range response {
           fmt.Println(resp.Status)
      }
  }
}

func main() {
   barrier([]string{"https://www.baidu.com", "http://www.sina.com", "https://segmentfault.com/"}...)
}

Barrier 模式也可以使用 golang.org/x/sync/errgroup 扩展库来实现,这样更加简单明了。这个包有点类似于 sync.WaitGroup,但是区别是当其中一个任务发生错误时,可以返回该错误。

func barrier(endpoints ...string) {
   var g errgroup.Group
   var mu sync.Mutex

   response := make([]barrierResp, len(endpoints))

   for i, endpoint := range endpoints {
       i, endpoint := i, endpoint // create locals for closure below
       g.Go(func() error {
           res := barrierResp{}
           resp, err := http.Get(endpoint)
           if err != nil {
               return err
          }

           byt, err := ioutil.ReadAll(resp.Body)
           defer resp.Body.Close()
           if err != nil {
               return err
          }

           res.Resp = string(byt)
           mu.Lock()
           response[i] = res
           mu.Unlock()
           return err
      })
  }
   if err := g.Wait(); err != nil {
      fmt.Println(err)
  }
   for _, resp := range response {
       fmt.Println(resp.Status)
  }
}

0.5.2. Future模式

常用在异步处理也称为 Promise 模式,采用一种 fire-and-forget 的方式,是指主 goroutine 不等子 goroutine 执行完就直接返回了,然后等到未来执行完的时候再去取结果。在 Go 中由于 goroutine 的存在,实现这种模式是挺简单的。

0.5.2.1. 使用场景

  • 异步
/*
* Future
*/
type Function func(string) (string, error)

type Future interface {
   SuccessCallback() error
   FailCallback()    error
   Execute(Function) (bool, chan struct{})
}

type AccountCache struct {
   Name string
}

func (a *AccountCache) SuccessCallback() error {
   fmt.Println("It's success~")
   return nil
}

func (a *AccountCache) FailCallback() error {
   fmt.Println("It's fail~")
   return nil
}

func (a *AccountCache) Execute(f Function) (bool, chan struct{}){
    // 空  struct 在 Go 中占的内存是最少的
   done := make(chan struct{})
   go func(a *AccountCache) {
       _, err := f(a.Name)
       if err != nil {
           _ = a.FailCallback()
      } else {
           _ = a.SuccessCallback()
      }
       done <- struct{}{}
  }(a)
   return true, done
}

func NewAccountCache(name string) *AccountCache {
   return &AccountCache{
       name,
  }
}

func testFuture() {
   var future Future
   future = NewAccountCache("Tom")
   updateFunc := func(name string) (string, error){
       fmt.Println("cache update:", name)
       return name, nil
  }
   _, done := future.Execute(updateFunc)
   defer func() {
       <-done
  }()
}

func main() {
   var future Future
   future = NewAccountCache("Tom")
   updateFunc := func(name string) (string, error){
       fmt.Println("cache update:", name)
       return name, nil
  }
   _, done := future.Execute(updateFunc)
   defer func() {
       <-done
  }()
   // do something
}

0.5.3. Pipeline模式

注意和 Barrire 模式不同的是,它是按顺序的,类似于流水线,通过 buffer channel 将多个goroutine串起来,只要前序 goroutine 处理完一部分数据,就往下传递,达到并行的目的。

0.5.3.1. 使用场景

  • 利用多核的优势把一段粗粒度逻辑分解成多个 goroutine 执行
/*
* Pipeline 模式
*
* 实现一个功能,给定一个切片,然后求它的子项的平方和。
*
* 例如,[1, 2, 3] -> 1^2 + 2^2 + 3^2 = 14。
*
* 正常的逻辑,遍历切片,然后求平方累加。使用 pipeline 模式,可以把求和和求平方拆分出来并行计算。
*/

func generator(max int) <-chan int{
   out := make(chan int, 100)
   go func() {
       for i := 1; i <= max; i++ {
           out <- i
      }
       close(out)
  }()
   return out
}

func power(in <-chan int) <-chan int{
   out := make(chan int, 100)
   go func() {
       for v := range in {
           out <- v * v
      }
       close(out)
  }()
   return out
}

func sum(in <-chan int) <-chan int{
   out := make(chan int, 100)
   go func() {
       var sum int
       for v := range in {
           sum += v
      }
       out <- sum
       close(out)
  }()
   return out
}

func main() {
   // [1, 2, 3]
   fmt.Println(<-sum(power(generator(3))))
}

Worker Pool模式

使用场景

  • 高并发任务

在 Go 中 goroutine 已经足够轻量,甚至 net/http server 的处理方式也是 goroutine-per-connection 的,所以比起其他语言来说可能场景稍微少一些。每个 goroutine 的初始内存消耗在 2~8kb,当我们有大批量任务的时候,需要起很多 goroutine 来处理,这会给系统代理很大的内存开销和 GC 压力,这个时候就可以考虑一下协程池。

/*
* Worker pool
*/
type TaskHandler func(interface{})

type Task struct {
   Param   interface{}
   Handler TaskHandler
}

type WorkerPoolImpl interface {
   AddWorker()                  // 增加 worker
   SendTask(Task)               // 发送任务
   Release()                    // 释放
}

type WorkerPool struct {
   wg   sync.WaitGroup
   inCh chan Task
}

func (d *WorkerPool) AddWorker() {
   d.wg.Add(1)
   go func(){
       for task := range d.inCh {
           task.Handler(task.Param)
      }
       d.wg.Done()
  }()
}

func (d *WorkerPool) Release() {
   close(d.inCh)
   d.wg.Wait()
}

func (d *WorkerPool) SendTask(t Task) {
   d.inCh <- t
}

func NewWorkerPool(buffer int) WorkerPoolImpl {
   return &WorkerPool{
       inCh: make(chan Task, buffer),
  }
}

func main() {
   bufferSize := 100
   var workerPool = NewWorkerPool(bufferSize)
   workers := 4
   for i := 0; i < workers; i++ {
       workerPool.AddWorker()
  }

   var sum int32
   testFunc := func (i interface{}) {
       n := i.(int32)
       atomic.AddInt32(&sum, n)
  }
   var i, n int32
   n = 1000
   for ; i < n; i++ {
       task := Task{
           i,
           testFunc,
      }
       workerPool.SendTask(task)
  }
   workerPool.Release()
   fmt.Println(sum)
}

Pub/Sub模式

发布订阅模式是一种消息通知模式,发布者发送消息,订阅者接收消息。

使用场景

  • 消息队列
/*
* Pub/Sub
*/
type Subscriber struct {
   in     chan interface{}
   id     int
   topic  string
   stop   chan struct{}
}

func (s *Subscriber) Close() {
   s.stop <- struct{}{}
   close(s.in)
}

func (s *Subscriber) Notify(msg interface{}) (err error) {
   defer func() {
       if rec := recover(); rec != nil {
           err = fmt.Errorf("%#v", rec)
      }
  }()
   select {
   case s.in <-msg:
   case <-time.After(time.Second):
       err = fmt.Errorf("Timeout\n")
  }
   return
}

func NewSubscriber(id int) SubscriberImpl {
   s := &Subscriber{
       id: id,
       in: make(chan interface{}),
       stop: make(chan struct{}),
  }
   go func() {
       for{
           select {
           case <-s.stop:
               close(s.stop)
               return
           default:
               for msg := range s.in {
                   fmt.Printf("(W%d): %v\n", s.id, msg)
              }
          }
  }}()
   return s
}

// 订阅者需要实现的方法
type SubscriberImpl interface {
   Notify(interface{}) error
   Close()
}

// sub 订阅 pub
func Register(sub Subscriber, pub *publisher){
   pub.addSubCh <- sub
   return
}

// pub 结果定义
type publisher struct {
   subscribers []SubscriberImpl
   addSubCh    chan SubscriberImpl
   removeSubCh chan SubscriberImpl
   in          chan interface{}
   stop        chan struct{}
}

// 实例化
func NewPublisher () *publisher{
   return &publisher{
       addSubCh: make(chan SubscriberImpl),
       removeSubCh: make(chan SubscriberImpl),
       in: make(chan interface{}),
       stop: make(chan struct{}),
  }
}

// 监听
func (p *publisher) start() {
   for {
       select {
       // pub 发送消息
       case msg := <-p.in:
           for _, sub := range p.subscribers{
               _ = sub.Notify(msg)
          }
       // 移除指定 sub
       case sub := <-p.removeSubCh:
           for i, candidate := range p.subscribers {
               if candidate == sub {
                   p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
                   candidate.Close()
                   break
              }
          }
       // 增加一个 sub
       case sub := <-p.addSubCh:
           p.subscribers = append(p.subscribers, sub)
       // 关闭 pub
       case <-p.stop:
           for _, sub := range p.subscribers {
               sub.Close()
          }
           close(p.addSubCh)
           close(p.in)
           close(p.removeSubCh)
           return
      }
  }
}


func main() {
   // 测试代码
   pub := NewPublisher()
   go pub.start()

   sub1 := NewWriterSubscriber(1)
   Register(sub1, pub)

   sub2 := NewWriterSubscriber(2)
   Register(sub2, pub)

   commands:= []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
   for _, c := range commands {
       pub.in <- c
  }

   pub.stop <- struct{}{}
   time.Sleep(time.Second*1)
}

注意事项

  • 同步问题,尤其同步原语和 channel 一起用时,容易出现死锁
  • goroutine 崩溃问题,如果子 goroutine panic 没有 recover 会引起主 goroutine 异常退出
  • goroutine 泄漏问题,确保 goroutine 能正常关闭
上次修改: 25 November 2019