golang select多路复用

作者: adm 分类: go 发布时间: 2023-04-01

在golang中,select一般是和chan一起工作的,用于同时监听多个chan的信息,其实用方法和switch差不多:

select {
case <-ch1:
// ...
case x := <-ch2:
// ...
case ch3 <- y:
// ...
default :
// ...
}
 

和switch不同的是,每个case语句都必须对应channel的读写操作,select语句会陷入阻塞,直到一个或者多个channel可以读写才能恢复

1. select的阻塞机制
1.1 select的随机选择
当多个channel都具备了读写能力的时候,也就是说多个case都可以执行的条件下,select会执行哪一个?答案是随机执行一个

我们可以写一个简单的demo,来看一下select实际的执行情况

func main() {
	c1 := make(chan int, 10)
	c2 := make(chan int, 10)
	for i := 0; i < 10; i++ {
		c1 <- i
		c2 <- i
	}
	for {
		select {
		case <-c1:
			fmt.Println("random 1")
		case <-c2:
			fmt.Println("random 2")
		default:
			//fmt.Println("default")
		}
	}
}
 

当两个channel c1和c2都可以读取数据时,select的执行选择是随机的

1.2 select的阻塞和控制
如果select中没有任何的channel准备好,那么当前的select所在的协程会陷入阻塞,直到有一个case满足条件

通常在实践中不想一直阻塞的话,为了避免这种情况可以加上default分支,或者加入一个超时定时器

c := make( chan int, 1)
select {
case <-c:
    fmt.Println( "got it" )
case <-time.After(10 * time.Second):
    fmt.Println( "timeout" )
}
 

加入定时器超时的方式在实际中很常用,可以与超时重试或者超时直接报错等方式结合

1.3 select循环等待
通常我们对于select的需求,就是想让它一直阻塞,比如我们想要监听一个chan所下达的任务

for select结构就是为此而生的,通常的做法下,select分支需要配合定时器来使用,实现超时通知或者定时任务等功能

func main() {
    c := make( chan int, 1)
    tick := time.Tick(time.Second)
 
    for {
        select {
        case <-c:
            fmt.Println("got it")
        case <-tick:
            fmt.Println("crontab")
        case <-time.After(800 * time.Millisecond):
            fmt.Println("timeout")
        }
    }
}

 

注意这里的两个定时器time.Tick和time.After

time.After在每次for中都会被重置,所以它在记录进入一次for循环的800ms时间

time.Tick是在for循环外部初始化的,所以它会按照时间累计,只要时间满1s就会执行一次定时任务

所以这两个定时器一个是为了超时重试,一个是为了执行一个间隔为1s的定时任务

1.4 select和nil channel
一个为nil的channel,读写都处于阻塞状态,如果它在case分支中,select将永远不会执行

nil channel这种特性让我们可以设计一些特殊的数据传输方法,比如现在的需求是轮流向两个channel发送数据

那么我们可以在给一个channel发送完数据之后,将其置nil

func main() {
    c1 := make( chan int)
    c2 := make( chan int)
    go func () {
        for i := 0; i < 2; i++ {
            select {
            case c1 <- 1:
                c1 = nil
            case c2 <- 2:
                c2 = nil
            }
        }
    }()
 
    fmt.Println(<-c1)
    fmt.Println(<-c2)
}
  

2. select的底层原理
select在运行时会调用核心函数selectgo

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    pollorder := order1[:ncases:ncases]
    lockorder := order1[ncases:][:ncases:ncases]
    for i := 1; i < ncases; i++ {
        j := fastrandn(uint32(i + 1))
        pollorder[i] = pollorder[j]
        pollorder[j] = uint16(i)
    }
}
  

每一个case在运行时都是一个scase结构体,存放了chan和chan中的元素类型

type scase  struct {
    c  *hchan
    elem unsafe.Pointer
    kind uint16
    ...
}
  

其中的kind代表case的类型,主要有四种类型:

caseNil
caseRecv
caseSend
caseDefault

分别对应着四种case的操作,对于每一种分支,select会执行不同的函数

在selectgo中,有两个重要的序列结构:pollorder和lockorder

pollorder是一个乱序的case序列,就是函数体中那一段for循环代码,算法类似于洗牌算法,保证了select的随机性

lockorder是按照大小对chan地址排序的算法,对所有的scase按照其chan在堆区的地址大小,使用了小顶堆算法来排序

selectgo会按照该次序对select中的case加锁,按照地址排序的顺序加锁是为了防止多个协程并发产生死锁

当所有scase中的chan加锁完毕之后,就开始第一轮循环找出是否有准备好的分支:

如果是caseNil,忽略
如果是caseRecv,判断是否有正在等待写入的协程,如果有跳转到recv分支;判断缓冲区是否有数据,如果有则跳转bufrecv分支
如果是caseSend,判断是否有正在等待读取的协程,如果有跳转到send分支;判断缓冲区是否有空余,如果有跳转bufsend分支
如果是caseDefault,记录下来,当循环结束发现没有其他case准备好时,执行default

当select完成一轮循环不能直接退出时,意味着当前协程需要进入阻塞状态等到至少一个case具备执行条件

不管是读取还是写入chan都需要创建一个新的sudog并将其放入指定通道的等待队列,之后重新进入阻塞状态

当select case中任意一个case不再阻塞时,当前协程将会被唤醒

要注意的是,最后需要将sudog结构体在其他通道的等待队列中出栈,因为当前协程已经能够正常运行,不需要再被其他通道唤醒

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!