Golang的并发控制

作者: adm 分类: go 发布时间: 2024-08-11

我们考虑这么一种场景,协程A执行过程中需要创建子协程A1、A2、A3…An,协程A创建完子协程后就等待子协程退 出。针对这种场景,GO提供了三种解决方案:

Channel: 使用channel控制子协程
WaitGroup : 使用信号量机制控制子协程
Context: 使用上下文控制子协程

三种方案各有优劣,比如Channel优点是实现简单,清晰易懂,WaitGroup优点是子协程个数动态可调整,Context 优点是对子协程派生出来的孙子协程的控制。缺点是相对而言的,要结合实例应用场景进行选择。

channel
channel一般用于协程之间的通信,channel也可以用于并发控制。比如主协程启动N个子协程,主协程等待所有子 协程退出后再继续后续流程,这种场景下channel也可轻易实现。

场景示例
下面程序展示一个使用channel控制子协程的例子:

package main

import(

"time"
"fmt"
)
func Process(ch chan int){


//Do some work...
time.Sleep(time.Second)

ch <-1 //管道中写入一个元素表示当前协程已结束
}

func main(){
channels := make(chan int,10)//创建一个10个元素的切片,元素类型为channe]

for i:= 0;i<10;i++ {
   channels[i]= make(chan int)//切片中放入一个
   channelgo Process(channels[i])//启动协程,传一个管道用于通信
}
for i,ch := range channels{ //遍历切片,等待子协程结束<-ch
   fmt.Println("Routine ",i," quit!")
}
}

上面程序通过创建N个channel来管理N个协程,每个协程都有一个channel用于跟父协程通信,父协程创建完所有协 程中等待所有协程结束。
这个例子中,父协程仅仅是等待子协程结束,其实父协程也可以向管道中写入数据通知子协程结束,这时子协程需要 定期的探测管道中是否有消息出现。

总结
使用channel来控制子协程的优点是实现简单,缺点是当需要大量创建协程时就需要有相同数量的channel,而且对于子协程继续派生出来的协程不方便控制。
后面继续介绍的WaitGroup、Context看起来比channel优雅一些,在各种开源组件中使用频率比channel高得 多。

WaitGroup
WaitGroup是Golang应用开发过程中经常使用的并发控制技术。 WaitGroup,可理解为Wait-Goroutine-Group,即等待一组goroutine结束。比如某个goroutine需要等待其 他几个goroutine全部完成,那么使用WaitGroup可以轻松实现。
下面程序展示了一个goroutine等待另外两个goroutine结束的例子:

package ma1n
import(
"fmt"
"time"
"sync"
)

 func main(){
   var wg sync.waitGroup

   wg.Add(2)//设置计数器,数值即为goroutine的个数go func(){
   //Do some work
   time.sleep(1*time.second)
   fmt.Println("Goroutine 1 finished!")
   wg.Done()//goroutine执行结束后将计数器减1
}()

go func(){//Do some worktime.sleep(2*time.second)

fmt.Println("Goroutine 2 finished!")
wg.Done()//goroutine执行结束后将计数器减1
}()

wg.wait()//主goroutine阻塞等待计数器变为0
fmt.Printf("All Goroutine finished!"
}
简单的说,上面程序中wg内部维护了一个计数器:
1.启动goroutine前将计数器通过Add(2)将计数器设置为待启动的goroutine个数。
2. 启动goroutine后,使用Wait()方法阻塞自己,等待计数器变为0。
3. 每个goroutine执行结束通过Done()方法将计数器减1。
4. 计数器变为0后,阻塞的goroutine被唤醒。

其实WaitGroup也可以实现一组goroutine等待另一组goroutine,这有点像玩杂技,很容出错,如果不了解其实现原理更是如此。实际上,WaitGroup的实现源码非常简单。

信号量
信号量是Unix系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源。 可简单理解为信号量为一个数值:
当信号量>0时,表示资源可用,获取信号量时系统自动将信号量减1;
当信号量= =0时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒;
由于WaitGroup实现中也使用了信号量,在此做个简单介绍。

WaitGroup数据结构
源码包中 src/sync/waitgroup.go:WaitGroup 定义了其数据结构:
type WaitGroup struct { 
state1 [3]uint32
}

state1是个长度为3的数组,其中包含了state和一个信号量,而state实际上是两个计数器:
counter: 当前还未执行结束的goroutine计数器
waiter count: 等待goroutine-group结束的goroutine数量,即有多少个等候者 semaphore: 信号量

考虑到字节是否对齐,三者出现的位置不同,为简单起见,依照字节已对齐情况下,三者在内存中的位置如下所示:
在这里插入图片描述
WaitGroup对外提供三个接口:
Add(delta int): 将delta值加到counter中
Wait(): waiter递增1,并阻塞等待信号量
semaphore Done(): counter递减1,按照waiter数值释放相应次数信号量

下面分别介绍这三个函数的实现细节。

Add(delta int)
Add()做了两件事,一是把delta值累加到counter中,因为delta可以为负值,也就是说counter有可能变成0或 负值,所以第二件事就是当counter值变为0时,跟据waiter数值释放等量的信号量,把等待的goroutine全部唤 醒,如果counter变为负值,则panic

Add()伪代码如下:
在这里插入图片描述
在这里插入图片描述
Wait()
Wait()方法也做了两件事,一是累加waiter, 二是阻塞等待信号量
在这里插入图片描述
这里用到了CAS算法保证有多个goroutine同时执行Wait()时也能正确累加waiter。

Done()
Done()只做一件事,即把counter减1,我们知道Add()可以接受负值,所以Done实际上只是调用了Add(-1)。 源码如下:

func (wg *WaitGroup) Done() { 
wg.Add(-1) 
}

Done()的执行逻辑就转到了Add(),实际上也正是最后一个完成的goroutine把等待者唤醒的。

注意事项:
Add()操作必须早于Wait(), 否则会panic
Add()设置的值必须与实际等待的goroutine个数一致,否则会panic

context
Golang context是Golang应用开发常用的并发控制技术,它与WaitGroup最大的不同点是context对于派生 goroutine有更强的控制力,它可以控制多级的goroutine。
context翻译成中文是”上下文”,即它可以控制一组呈树状结构的goroutine,每个goroutine拥有相同的上下 文。
典型的使用场景如下图所示:
在这里插入图片描述
上图中由于goroutine派生出子goroutine,而子goroutine又继续派生新的goroutine,这种情况下使用 WaitGroup就不太容易,因为子goroutine个数不容易确定。而使用context就可以很容易实现。

Context实现原理
context实际上只定义了接口,凡是实现该接口的类都可称为是一种context,官方包中实现了几个常用的 context,分别可用于不同的场景。

接口定义
源码包中 src/context/context.go:Context 定义了该接口:
在这里插入图片描述
在这里插入图片描述
基础的context接口只定义了4个方法,下面分别简要说明一下:

Deadline()
该方法返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline,则ok == false,此 时deadline为一个初始值的time.Time值

Done()
该方法返回一个channel,需要在select-case语句中使用,如”case <-context.Done():”。 当context关闭后,Done()返回一个被关闭的管道,关闭的管理仍然是可读的,据此goroutine可以收到关闭请 求;当context还未关闭时,Done()返回nil。 Err() 该方法描述context关闭的原因。关闭原因由context实现控制,不需要用户设置。比如Deadline context,关 闭原因可能是因为deadline,也可能提前被主动关闭,那么关闭原因就会不同: 因deadline关闭:“context deadline exceeded”; 因主动关闭: “context canceled”。 当context关闭后,Err()返回context的关闭原因;当context还未关闭时,Err()返回nil; Value() 有一种context,它不是用于控制呈树状分布的goroutine,而是用于在树状分布的goroutine间传递信息。 Value()方法就是用于此种类型的context,该方法根据key值查询map中的value。具体使用后面示例说明。 空context context包中定义了一个空的context, 名为emptyCtx,用于context的根节点,空的context只是简单的实现 了Context,本身不包含任何值,仅用于其他context的父节点。 emptyCtx类型定义如下代码所示: 在这里插入图片描述 在这里插入图片描述 context包中定义了一个公用的emptCtx全局变量,名为background,可以使用context.Background()获取 它,实现代码如下所示: 在这里插入图片描述 context包提供了4个方法创建不同类型的context,使用这四个方法时如果没有父context,都需要传入 backgroud,即backgroud作为其父节点:

WithCancel()
WithDeadline()
WithTimeout()
WithValue()

context包中实现Context接口的struct,除了emptyCtx外,还有cancelCtx、timerCtx和valueCtx三种,正 是基于这三种context实例,实现了上述4种类型的context。
context包中各context类型之间的关系,如下图所示:
在这里插入图片描述
struct cancelCtx、valueCtx、valueCtx都继承于Context,下面分别介绍这三个struct。

cancelCtx
源码包中 src/context/context.go:cancelCtx 定义了该类型context:
在这里插入图片描述
children中记录了由此context派生的所有child,此context被cancle时会把其中的所有child都cancle掉。
cancelCtx与deadline和value无关,所以只需要实现Done()和Err()接口外露接口即可。
Done()接口实现
按照Context定义,Done()接口只需要返回一个channel即可,对于cancelCtx来说只需要返回成员变量done即 可。
这里直接看下源码,非常简单:
在这里插入图片描述
由于cancelCtx没有指定初始化函数,所以cancelCtx.done可能还未分配,所以需要考虑初始化。 cancelCtx.done会在context被cancel时关闭,所以cancelCtx.done的值一般经历如三个阶段:nil —> chan struct{} —> closed chan。

Err()接口实现
按照Context定义,Err()只需要返回一个error告知context被关闭的原因。对于cancelCtx来说只需要返回成员 变量err即可。
还是直接看下源码:
在这里插入图片描述
在这里插入图片描述
cancelCtx.err默认是nil,在context被cancel时指定一个error变量: var Canceled = errors.New(“context canceled”) 。

cancel()接口实现
cancel()内部方法是理解cancelCtx的最关键的方法,其作用是关闭自己和其后代,其后代存储在 cancelCtx.children的map中,其中key值即后代对象,value值并没有意义,这里使用map只是为了方便查询而 已。
cancel方法实现伪代码如下所示:
在这里插入图片描述
实际上,WithCancel()返回的第二个用于cancel context的方法正是此cancel()。

WithCancel()方法实现
WithCancel()方法作了三件事:
初始化一个cancelCtx实例
将cancelCtx实例添加到其父节点的children中(如果父节点也可以被cancel的话)
返回cancelCtx实例和cancel()方法

其实现源码如下所示:
在这里插入图片描述
这里将自身添加到父节点的过程有必要简单说明一下:

如果父节点也支持cancel,也就是说其父节点肯定有children成员,那么把新context添加到children里 即可;
如果父节点不支持cancel,就继续向上查询,直到找到一个支持cancel的节点,把新context添加到 children里;
如果所有的父节点均不支持cancel,则启动一个协程等待父节点结束,然后再把当前context结束。
典型使用案例
一个典型的使用cancel context的例子如下所示:

package main

import (
	"context"
	"sync"
	"github.com/pkg/errors"
)

func Rpc(ctx context.Context, url string) error {
	result := make(chan int)
	err := make(chan error)

	go func() {
		// 进行RPC调用,并且返回是否成功,成功通过result传递成功信息,错误通过error传递错误信息
		isSuccess := true
		if isSuccess {
			result <- 1
		} else {
			err <- errors.New("some error happen")
		}
	}()

	select {
		case <- ctx.Done():
			// 其他RPC调用调用失败
			return ctx.Err()
		case e := <- err:
			// 本RPC调用失败,返回错误信息
			return e
		case <- result:
			// 本RPC调用成功,不返回错误信息
			return nil
	}
}


func main() {
	ctx, cancel := context.WithCancel(context.Background())

	// RPC1调用
	err := Rpc(ctx, "http://rpc_1_url")
	if err != nil {
		return
	}

	wg := sync.WaitGroup{}

	// RPC2调用
	wg.Add(1)
	go func(){
		defer wg.Done()
		err := Rpc(ctx, "http://rpc_2_url")
		if err != nil {
			cancel()
		}
	}()

	// RPC3调用
	wg.Add(1)
	go func(){
		defer wg.Done()
		err := Rpc(ctx, "http://rpc_3_url")
		if err != nil {
			cancel()
		}
	}()

	// RPC4调用
	wg.Add(1)
	go func(){
		defer wg.Done()
		err := Rpc(ctx, "http://rpc_4_url")
		if err != nil {
			cancel()
		}
	}()

	wg.Wait()
}

上面代码中协程HandelRequest()用于处理某个请求,其又会创建两个协程:WriteRedis()、 WriteDatabase(),main协程创建创建context,并把context在各子协程间传递,main协程在适当的时机可以 cancel掉所有子协程。

程序输出如下所示:
在这里插入图片描述

timerCtx
源码包中 src/context/context.go:timerCtx 定义了该类型context:

type timerCtx struct { 
cancelCtx 
timer *time.Timer // Under cancelCtx.mu. 
deadline time.Time 
}

timerCtx在cancelCtx基础上增加了deadline用于标示自动cancel的最终时间,而timer就是一个触发自动 cancel的定时器。
由此,衍生出WithDeadline()和WithTimeout()。实现上这两种类型实现原理一样,只不过使用语境不一样:
deadline: 指定最后期限,比如context将2018.10.20 00:00:00之时自动结束 timeout: 指定最长存活时间,比如context将在30s后结束。
对于接口来说,timerCtx在cancelCtx基础上还需要实现Deadline()和cancel()方法,其中cancel()方法是重 写的。

Deadline()接口实现
Deadline()方法仅仅是返回timerCtx.deadline而矣。而timerCtx.deadline是WithDeadline()或 WithTimeout()方法设置的。

cancel()接口实现
cancel()方法基本继承cancelCtx,只需要额外把timer关闭。
timerCtx被关闭后,timerCtx.cancelCtx.err将会存储关闭原因:
如果deadline到来之前手动关闭,则关闭原因与cancelCtx显示一致;
如果deadline到来时自动关闭,则原因为:”context deadline exceeded”

WithDeadline()方法实现
WithDeadline()方法实现步骤如下:
初始化一个timerCtx实例
将timerCtx实例添加到其父节点的children中(如果父节点也可以被cancel的话)
启动定时器,定时器到期后会自动cancel本context
返回timerCtx实例和cancel()方法

也就是说,timerCtx类型的context不仅支持手动cancel,也会在定时器到来后自动cancel。

WithTimeout()方法实现
WithTimeout()实际调用了WithDeadline,二者实现原理一致。 看代码会非常清晰:

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { 
return WithDeadline(parent, time.Now().Add(timeout)) 
}

典型使用案例
下面例子中使用WithTimeout()获得一个context并在其了协程中传递:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
主协程中创建一个10s超时的context,并将其传递给子协程,10s自动关闭context。程序输出如下:
在这里插入图片描述

valueCtx
源码包中 src/context/context.go:valueCtx 定义了该类型context:

type valueCtx struct { 
Context 
key, val interface{} 
}

valueCtx只是在Context基础上增加了一个key-value对,用于在各级协程间传递一些数据。
由于valueCtx既不需要cancel,也不需要deadline,那么只需要实现Value()接口即可。

Value()接口实现
由valueCtx数据结构定义可见,valueCtx.key和valueCtx.val分别代表其key和value值。 实现也很简单:
在这里插入图片描述
这里有个细节需要关注一下,即当前context查找不到key时,会向父节点查找,如果查询不到则最终返回
interface{}。也就是说,可以通过子context查询到父的value值。

WithValue()方法实现
WithValue()实现也是非常的简单, 伪代码如下:
在这里插入图片描述
典型使用案例
下面示例程序展示valueCtx的用法:
在这里插入图片描述
在这里插入图片描述
上例main()中通过WithValue()方法获得一个context,需要指定一个父context、key和value。然后通将该 context传递给子协程HandelRequest,子协程可以读取到context的key-value。 注意:本例中子协程无法自动结束,因为context是不支持cancle的,也就是说<-ctx.Done()永远无法返回。 如果需要返回,需要在创建context时指定一个可以cancel的context作为父节点,使用父节点的cancel()在适当的 时机结束整个context。 总结 Context仅仅是一个接口定义,跟据实现的不同,可以衍生出不同的context类型; cancelCtx实现了Context接口,通过WithCancel()创建cancelCtx实例; timerCtx实现了Context接口,通过WithDeadline()和WithTimeout()创建timerCtx实例; valueCtx实现了Context接口,通过WithValue()创建valueCtx实例; 三种context实例可互为父节点,从而可以组合成不同的应用形式;

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!