原理简述
Go中的channel分为了带buffer和无buffer的。
带buffer的,底层是用了一个循环数组,消息加到数组的最后就会折返到数组的开始。数组满了的时候,就会阻塞发送操作。
无buffer的,底层就不依赖数组了。因为在无buffer的channel上收发消息行为是会直接阻塞的,这种情况处理不妥善会出现死锁问题。一般可以先把读channel读的goroutine先启动起来,再启动写channel。
// 这段代码就会出现死锁,解决方案就是把A和B对调
func main() {
c := make(chan int)
c <- 1 // A. 先写channel
go func() { // B. 再读channel
fmt.Println(<-c)
}()
time.Sleep(3 * time.Second)
}
当不能继续发送或读取时,就会出现阻塞。具体动作是把当前的goroutine挂起,挂到channel上。而channel是一个结构体(下文配源码)。里面有两个指针(recvq和sendq),分别指向阻塞读和阻塞写的goroutine队列。
在channel上发送和读取消息的逻辑有点不一样。
发送消息时:
channel已经关闭,那就不能发。panic掉。
看一下有没有阻塞在读操作上的goroutine,有的话,赶紧把数据复制给它。把它安排在下一次调度切换上。(p.runnext)
没有被阻塞的goroutine。如果带buffer,buffer还有空位,就放在buffer里。否则就阻塞挂起当前发送消息的goroutine。
所以要注意,不要依赖发送到channel的顺序。发送消息时遇到被阻塞读操作的goroutine时,会先满足它们,即使buffer里还有数据。
读取消息时:
channel已经关闭,也可以读,只是读出来的数据为空。
看一下有没有阻塞的写操作的goroutine,有的话唤醒它。读取它发送的数据(A)。
读取后的数据(A)放哪,视乎是带buffer还是无buffer。无buffer的话,就直接把写数据(A)给读取者。带buffer的话,就先看buffer里是否有数据(B),有就把数据(B)给读取者,再把数据(A)放到原来数据(B)空出来的位置上。
整个读写过程,都是需要加锁的。毕竟channel是在多个goroutine中被使用。
此外,无论是带buffer或无buffer,数据在 发送者-buffer-读取者 三者流转时。都是采用拷贝的方式(typedmemmove)。这也呼应了go里不存引用传递,都是值传递。所以使用channel时,避免使用过大的数据。而且要改变从channel里读取出来的值时,channel的类型要使用指针类型。
Go提倡通过通信来共享内存,而非通过共享内存来通信,正是使用了channel来达到这个效果。这个就是Hoare提出的CSP(Communicating Sequential Processes)。这种方式也在Occam和Erlang上进行了验证,是没问题的。所以Go也使用上。
了解原理后,会发现channel底层还是用了锁。但channel提供给开发者是一种更高层的并发编程思维和用法,它更易用和直接,减少bug。而非pthread那种基于信号量,锁等较底层的并发编程方式。
源码介绍
channel的数据结构
// src/runtime/chan.go
type hchan struct {
qcount uint // 通道的len
dataqsiz uint // 通道的cap
buf unsafe.Pointer // 指向buffer channel底层的数组
elemsize uint16 // channel里元素的大小
closed uint32 // 标记channel是否close,1 == closed
elemtype *_type // 通道类型
sendx uint // 指向下一个写位置
recvx uint // 同上,指向下一个读位置
recvq waitq // 阻塞在读上的g,双向链表
sendq waitq // 阻塞在写上的g,双向链表
// 用于并发控制
lock mutex
}
通过channel写数据
// src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// 不能往一个nil的channel里写数据
if !block {
// 不阻塞就直接返回吧
return false
}
// 阻塞当前g,将当前g与m脱离
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
// ...
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// 不能给已关闭的channel发送数据
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 有别的g被阻塞着在等待,唤醒它,赶紧喂它,然后返回成功
// 所以使用无buffer channel时,必须要先启动进行读channel操作,否则就会出现死锁问题
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// buffer channel 还有位置,那就把数据放到buffer里
if c.qcount < c.dataqsiz {
// 通过指针运算,找到应该插入buf的位置
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 然后将发送的数据,拷贝到buf对应位置上。所以对于不论对于指针或非指针,都是值拷贝
typedmemmove(c.elemtype, qp, ep)
// 计算好下次插入的位置。底层是循环数组,要注意掉头
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 记录循环数组元素总数
c.qcount++
// 解锁
unlock(&c.lock)
// 返回成功
return true
}
// 发送不了,也不阻塞,就返回失败吧
if !block {
unlock(&c.lock)
return false
}
// buf里没有空间了,阻塞自己吧。把g自己放进channel的链表里等待投喂。然后gopark挂起自己这个g
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 把gp这个g放在p的runnext下,等待调度
goready(gp, skip+1)
}
通过channel读数据
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
if c == nil {
if !block {
return
}
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// close 了的channel还能读的
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 有别的g被阻塞着,唤醒它,让它发送数据
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// qcount>0,说明这是个buffer channel,并且buffer里有数据
// 那就读出来返回
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 无论是buffer还是unbuffer,都没数据读了,只好阻塞自己,加到chan的链表里了
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// unbuffer channel
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// 直接从g上把数据拷贝到ep的位置上
recvDirect(c.elemtype, sg, ep)
}
} else {
// buffer里有数据,把buffer里的读出来
// buffer channel
// buf里有数据,把它读出来
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
if ep != nil {
// 读出来的数据要写到ep对应的地址上
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
// 然后再将被阻塞写的g的数据到buffer里
// 然后再把阻塞着的g它对应的数据,放到上面buf空出来的位置上
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 好了,被阻塞写的g写完后,唤醒它吧
goready(gp, skip+1)
}
recvDirect和sendDirect
// 直接从被阻塞写的g上,把数据拷贝到dst的地址上
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// sg被阻塞写的g,elem是对应栈或堆上变量的地址,上面存有要发送的数据
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// sg被阻塞读的g,elem是对应栈或堆上变量的地址
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}