原理简述

Go中的channel分为了带buffer和无buffer的。

带buffer的,底层是用了一个循环数组,消息加到数组的最后就会折返到数组的开始。数组满了的时候,就会阻塞发送操作。

无buffer的,底层就不依赖数组了。因为在无buffer的channel上收发消息行为是会直接阻塞的,这种情况处理不妥善会出现死锁问题。一般可以先把读channel读的goroutine先启动起来,再启动写channel。

// 这段代码就会出现死锁,解决方案就是把A和B对调
func main() {
    c := make(chan int)
    c <- 1  // A. 先写channel
    go func() {  // B. 再读channel
        fmt.Println(<-c)
    }()
    time.Sleep(3 * time.Second)
}

当不能继续发送或读取时,就会出现阻塞。具体动作是把当前的goroutine挂起,挂到channel上。而channel是一个结构体(下文配源码)。里面有两个指针(recvq和sendq),分别指向阻塞读和阻塞写的goroutine队列。

在channel上发送和读取消息的逻辑有点不一样。

发送消息时:

channel已经关闭,那就不能发。panic掉。

看一下有没有阻塞在读操作上的goroutine,有的话,赶紧把数据复制给它。把它安排在下一次调度切换上。(p.runnext)
没有被阻塞的goroutine。如果带buffer,buffer还有空位,就放在buffer里。否则就阻塞挂起当前发送消息的goroutine。
所以要注意,不要依赖发送到channel的顺序。发送消息时遇到被阻塞读操作的goroutine时,会先满足它们,即使buffer里还有数据。

读取消息时:

channel已经关闭,也可以读,只是读出来的数据为空。

看一下有没有阻塞的写操作的goroutine,有的话唤醒它。读取它发送的数据(A)。

读取后的数据(A)放哪,视乎是带buffer还是无buffer。无buffer的话,就直接把写数据(A)给读取者。带buffer的话,就先看buffer里是否有数据(B),有就把数据(B)给读取者,再把数据(A)放到原来数据(B)空出来的位置上。

整个读写过程,都是需要加锁的。毕竟channel是在多个goroutine中被使用。

此外,无论是带buffer或无buffer,数据在 发送者-buffer-读取者 三者流转时。都是采用拷贝的方式(typedmemmove)。这也呼应了go里不存引用传递,都是值传递。所以使用channel时,避免使用过大的数据。而且要改变从channel里读取出来的值时,channel的类型要使用指针类型。

Go提倡通过通信来共享内存,而非通过共享内存来通信,正是使用了channel来达到这个效果。这个就是Hoare提出的CSP(Communicating Sequential Processes)。这种方式也在Occam和Erlang上进行了验证,是没问题的。所以Go也使用上。

了解原理后,会发现channel底层还是用了锁。但channel提供给开发者是一种更高层的并发编程思维和用法,它更易用和直接,减少bug。而非pthread那种基于信号量,锁等较底层的并发编程方式。

源码介绍

channel的数据结构

// src/runtime/chan.go
type hchan struct {
    qcount   uint           // 通道的len
    dataqsiz uint           // 通道的cap
    buf      unsafe.Pointer // 指向buffer channel底层的数组
    elemsize uint16  // channel里元素的大小
    closed   uint32  // 标记channel是否close,1 == closed
    elemtype *_type // 通道类型
    sendx    uint   // 指向下一个写位置
    recvx    uint   // 同上,指向下一个读位置
    recvq    waitq  // 阻塞在读上的g,双向链表
    sendq    waitq  // 阻塞在写上的g,双向链表

    // 用于并发控制
    lock mutex
}

通过channel写数据

// src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        // 不能往一个nil的channel里写数据
        if !block {
            // 不阻塞就直接返回吧
            return false
        }
        // 阻塞当前g,将当前g与m脱离
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    // ...

    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加锁
    lock(&c.lock)

    // 不能给已关闭的channel发送数据
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    // 有别的g被阻塞着在等待,唤醒它,赶紧喂它,然后返回成功
    // 所以使用无buffer channel时,必须要先启动进行读channel操作,否则就会出现死锁问题
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // buffer channel 还有位置,那就把数据放到buffer里
    if c.qcount < c.dataqsiz {
        // 通过指针运算,找到应该插入buf的位置
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        // 然后将发送的数据,拷贝到buf对应位置上。所以对于不论对于指针或非指针,都是值拷贝
        typedmemmove(c.elemtype, qp, ep)
        // 计算好下次插入的位置。底层是循环数组,要注意掉头
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 记录循环数组元素总数
        c.qcount++
        // 解锁
        unlock(&c.lock)
        // 返回成功
        return true
    }

    // 发送不了,也不阻塞,就返回失败吧
    if !block {
        unlock(&c.lock)
        return false
    }

    // buf里没有空间了,阻塞自己吧。把g自己放进channel的链表里等待投喂。然后gopark挂起自己这个g
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if raceenabled {
        if c.dataqsiz == 0 {
            racesync(c, sg)
        } else {
            qp := chanbuf(c, c.recvx)
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
    }
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 把gp这个g放在p的runnext下,等待调度
    goready(gp, skip+1)
}

通过channel读数据

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // ...

    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加锁
    lock(&c.lock)

    // close 了的channel还能读的
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    // 有别的g被阻塞着,唤醒它,让它发送数据
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // qcount>0,说明这是个buffer channel,并且buffer里有数据
    // 那就读出来返回
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // 无论是buffer还是unbuffer,都没数据读了,只好阻塞自己,加到chan的链表里了
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // unbuffer channel
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            // 直接从g上把数据拷贝到ep的位置上
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // buffer里有数据,把buffer里的读出来
        // buffer channel
        // buf里有数据,把它读出来
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
        }
        if ep != nil {
            // 读出来的数据要写到ep对应的地址上
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        // 然后再将被阻塞写的g的数据到buffer里
        // 然后再把阻塞着的g它对应的数据,放到上面buf空出来的位置上
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 好了,被阻塞写的g写完后,唤醒它吧
    goready(gp, skip+1)
}

recvDirect和sendDirect

// 直接从被阻塞写的g上,把数据拷贝到dst的地址上
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    // sg被阻塞写的g,elem是对应栈或堆上变量的地址,上面存有要发送的数据
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // sg被阻塞读的g,elem是对应栈或堆上变量的地址
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

参考资料