17 | go语句及其执行规则(下)

你好,我是郝林,今天我们继续分享go语句执行规则的内容。

在上一篇文章中,我们讲到了goroutine在操作系统的并发编程体系,以及在Go语言并发编程模型中的地位和作用等一系列内容,今天我们继续来聊一聊这个话题。

知识扩展

问题1:怎样才能让主goroutine等待其他goroutine?

我刚才说过,一旦主goroutine中的代码执行完毕,当前的Go程序就会结束运行,无论其他的goroutine是否已经在运行了。那么,怎样才能做到等其他的goroutine运行完毕之后,再让主goroutine结束运行呢?

其实有很多办法可以做到这一点。其中,最简单粗暴的办法就是让主goroutine“小睡”一会儿。

for i := 0; i < 10; i++ {
	go func() {
		fmt.Println(i)
	}()
}
time.Sleep(time.Millisecond * 500)

for语句的后边,我调用了time包的Sleep函数,并把time.Millisecond * 500的结果作为参数值传给了它。time.Sleep函数的功能就是让当前的goroutine(在这里就是主goroutine)暂停运行一段时间,直到到达指定的恢复运行时间。

我们可以把一个相对的时间传给该函数,就像我在这里传入的“500毫秒”那样。time.Sleep函数会在被调用时用当前的绝对时间,再加上相对时间计算出在未来的恢复运行时间。显然,一旦到达恢复运行时间,当前的goroutine就会从“睡眠”中醒来,并开始继续执行后边的代码。

这个办法是可行的,只要“睡眠”的时间不要太短就好。不过,问题恰恰就在这里,我们让主goroutine“睡眠”多长时间才是合适的呢?如果“睡眠”太短,则很可能不足以让其他的goroutine运行完毕,而若“睡眠”太长则纯属浪费时间,这个时间就太难把握了。

你可能会想到,既然不容易预估时间,那我们就让其他的goroutine在运行完毕的时候告诉我们好了。这个思路很好,但怎么做呢?

你是否想到了通道呢?我们先创建一个通道,它的长度应该与我们手动启用的goroutine的数量一致。在每个手动启用的goroutine即将运行完毕的时候,我们都要向该通道发送一个值。

注意,这些发送表达式应该被放在它们的go函数体的最后面。对应的,我们还需要在main函数的最后从通道接收元素值,接收的次数也应该与手动启用的goroutine的数量保持一致。关于这些你可以到demo39.go文件中,去查看具体的写法。

其中有一个细节你需要注意。我在声明通道sign的时候是以chan struct{}作为其类型的。其中的类型字面量struct{}有些类似于空接口类型interface{},它代表了既不包含任何字段也不拥有任何方法的空结构体类型。

注意,struct{}类型值的表示法只有一个,即:struct{}{}。并且,它占用的内存空间是0字节。确切地说,这个值在整个Go程序中永远都只会存在一份。虽然我们可以无数次地使用这个值字面量,但是用到的却都是同一个值。

当我们仅仅把通道当作传递某种简单信号的介质的时候,用struct{}作为其元素类型是再好不过的了。顺便说一句,我在讲“结构体及其方法的使用法门”的时候留过一道与此相关的思考题,你可以返回去看一看。

再说回当下的问题,有没有比使用通道更好的方法?如果你知道标准库中的代码包sync的话,那么可能会想到sync.WaitGroup类型。没错,这是一个更好的答案。不过具体的使用方式我在后边讲sync包的时候再说。

问题2:怎样让我们启用的多个goroutine按照既定的顺序运行?

在很多时候,当我沿着上面的主问题以及第一个扩展问题一路问下来的时候,应聘者往往会被这第二个扩展问题难住。

所以基于上一篇主问题中的代码,怎样做到让从09这几个整数按照自然数的顺序打印出来?你可能会说,我不用goroutine不就可以了嘛。没错,这样是可以,但是如果我不考虑这样做呢。你应该怎么解决这个问题?

当然了,众多应聘者回答的其他答案也是五花八门的,有的可行,有的不可行,还有的把原来的代码改得面目全非。我下面就来说说我的思路,以及心目中的答案吧。这个答案并不一定是最佳的,也许你在看完之后还可以想到更优的答案。

首先,我们需要稍微改造一下for语句中的那个go函数,要让它接受一个int类型的参数,并在调用它的时候把变量i的值传进去。为了不改动这个go函数中的其他代码,我们可以把它的这个参数也命名为i

for i := 0; i < 10; i++ {
	go func(i int) {
		fmt.Println(i)
	}(i)
}

只有这样,Go语言才能保证每个goroutine都可以拿到一个唯一的整数。其原因与go函数的执行时机有关。

我在前面已经讲过了。在go语句被执行时,我们传给go函数的参数i会先被求值,如此就得到了当次迭代的序号。之后,无论go函数会在什么时候执行,这个参数值都不会变。也就是说,go函数中调用的fmt.Println函数打印的一定会是那个当次迭代的序号。

然后,我们在着手改造for语句中的go函数。

for i := uint32(0); i < 10; i++ {
	go func(i uint32) {
		fn := func() {
			fmt.Println(i)
		}
		trigger(i, fn)
	}(i)
}

我在go函数中先声明了一个匿名的函数,并把它赋给了变量fn。这个匿名函数做的事情很简单,只是调用fmt.Println函数以打印go函数的参数i的值。

在这之后,我调用了一个名叫trigger的函数,并把go函数的参数i和刚刚声明的变量fn作为参数传给了它。注意,for语句声明的局部变量igo函数的参数i的类型都变了,都由int变为了uint32。至于为什么,我一会儿再说。

再来说trigger函数。该函数接受两个参数,一个是uint32类型的参数i, 另一个是func()类型的参数fn。你应该记得,func()代表的是既无参数声明也无结果声明的函数类型。

trigger := func(i uint32, fn func()) {
	for {
		if n := atomic.LoadUint32(&count); n == i {
			fn()
			atomic.AddUint32(&count, 1)
			break
		}
		time.Sleep(time.Nanosecond)
	}
}

trigger函数会不断地获取一个名叫count的变量的值,并判断该值是否与参数i的值相同。如果相同,那么就立即调用fn代表的函数,然后把count变量的值加1,最后显式地退出当前的循环。否则,我们就先让当前的goroutine“睡眠”一个纳秒再进入下一个迭代。

注意,我操作变量count的时候使用的都是原子操作。这是由于trigger函数会被多个goroutine并发地调用,所以它用到的非本地变量count,就被多个用户级线程共用了。因此,对它的操作就产生了竞态条件(race condition),破坏了程序的并发安全性。

所以,我们总是应该对这样的操作加以保护,在sync/atomic包中声明了很多用于原子操作的函数。

另外,由于我选用的原子操作函数对被操作的数值的类型有约束,所以我才对count以及相关的变量和参数的类型进行了统一的变更(由int变为了uint32)。

纵观count变量、trigger函数以及改造后的for语句和go函数,我要做的是,让count变量成为一个信号,它的值总是下一个可以调用打印函数的go函数的序号。

这个序号其实就是启用goroutine时,那个当次迭代的序号。也正因为如此,go函数实际的执行顺序才会与go语句的执行顺序完全一致。此外,这里的trigger函数实现了一种自旋(spinning)。除非发现条件已满足,否则它会不断地进行检查。

最后要说的是,因为我依然想让主goroutine最后一个运行完毕,所以还需要加一行代码。不过既然有了trigger函数,我就没有再使用通道。

trigger(10, func(){})

调用trigger函数完全可以达到相同的效果。由于当所有我手动启用的goroutine都运行完毕之后,count的值一定会是10,所以我就把10作为了第一个参数值。又由于我并不想打印这个10,所以我把一个什么都不做的函数作为了第二个参数值。

总之,通过上述的改造,我使得异步发起的go函数得到了同步地(或者说按照既定顺序地)执行,你也可以动手自己试一试,感受一下。

总结

在本篇文章中,我们接着上一篇文章的主问题,讨论了当我们想让运行结果更加可控的时候,应该怎样去做。

主goroutine的运行若过早结束,那么我们的并发程序的功能就很可能无法全部完成。所以我们往往需要通过一些手段去进行干涉,比如调用time.Sleep函数或者使用通道。我们在后面的文章中还会讨论更高级的手段。

另外,go函数的实际执行顺序往往与其所属的go语句的执行顺序(或者说goroutine的启用顺序)不同,而且默认情况下的执行顺序是不可预知的。那怎样才能让这两个顺序一致呢?其实复杂的实现方式有不少,但是可能会把原来的代码改得面目全非。我在这里提供了一种比较简单、清晰的改造方案,供你参考。

总之,我希望通过上述基础知识以及三个连贯的问题帮你串起一条主线。这应该会让你更快地深入理解goroutine及其背后的并发编程模型,从而更加游刃有余地使用go语句。

思考题

1.runtime包中提供了哪些与模型三要素G、P和M相关的函数?(模型三要素内容在上一篇)

戳此查看Go语言专栏文章配套详细代码。

精选留言

  • 枫林火山

    2019-04-01 13:05:47

    老师,关于顺序打印的demo40.go的优化版本,同来碗绿豆汤同学的实现
    package main

    import "fmt"

    func main() {

    var num = 10
    sign := make(chan struct{}, 1)

    for i := 0; i < num; i++ {
    go func(i int) {
    fmt.Println(i)
    sign <- struct{}{}
    }(i)
    <-sign
    }

    }
    这样写为什么不能保证同步,能不能再详细解释下呢。这个实现和您demo39.go 相比,只是合并了两处for循环,我看好多同学也有这个疑问,向您求解。
    demo40.go的实现相当于实现了每个异步线程的一个轮询loop。 上面的实现相当于单步间加了一个barrier。执行1->等待->执行2->等待。 实在没理解为什么不能保证同步
    作者回复

    我又看了一下“来碗绿豆汤”同学写的代码。我可能当时没看清楚,或者没说清楚。

    他写的这段代码单从“顺序打印数字”的要求上看是可以的。但是这样做就变成纯同步的流程了,go函数就完全没必要写了。把go函数中的代码拿出来、删掉go函数,再把通道的相关代码也删掉,岂不是更直截了当?像这样:

    for i := 0; i < num; i++ {
    fmt.Println(i)
    }

    这个题目的要求是“使得在for循环中启用的多个goroutine按照既定的顺序运行”。你也可以把它理解为“在异步的情况下顺序的打印数字”。所以,“来碗绿豆汤”同学写的代码只满足了其中一个要求,而没有让go函数们自由的异步执行。

    我的那个版本demo40.go是让各个go函数(确切地说,是它们调用的trigger函数)自行地检查所需条件,然后再在条件允许的情况下打印数字。这也叫“自旋”。这与纯同步的流程是有本质上的区别的。

    2019-04-01 22:17:50

  • xiao豪

    2018-09-19 14:09:36

    回楼上,atomic的加操作和读操作只有32位和64位整数型,所以必须要把int转为intxx。之所以这么做是因为int位数是根据系统决定的,而原子级操作要求速度尽可能的快,所以明确了整数的位数才能最大地提高性能。
  • 来碗绿豆汤

    2018-09-19 11:28:03

    我有一个更简单的实现方式, 如下
    func main(){
    ch := make(chan struct{})
    for i:=0; i < 100; i++{
    go func(i int){
    fmt.Println(i)
    ch <- struct{}{}
    }(i)
    <-ch
    }
    }
    这样,每次循环都包装goroutine 执行结束才进入下一次循环,就可以保证顺序执行了
    作者回复

    这些go函数的真正执行谁先谁后是不可控的,所以这样做不行的。

    2018-09-20 19:22:16

  • Geek_3241ef

    2019-08-26 20:30:07

    你好,郝老师,请问这里为什么需要sleep呢,我理解的如果不加sleep,其中某个g会一直轮询count的值,当另一个g更改这个值时,那么第一个g就会判断相等才对呀。
    但实际上去掉sleep后,程序确实没有按照我理解的逻辑执行,请问这是为什么呢
    作者回复

    这主要是因为:Go 调度器在需要的时候只会对正在运行的 goroutine 发出通知,试图让它停下来。但是,它却不会也不能强行让一个 goroutine 停下来。

    所以,如果一条 for 语句过于简单的话,比如这里的 for 语句就很简单(因为里面只有一条 if 语句),那么当前的 goroutine 就可能不会去正常响应(或者说没有机会响应)Go 调度器的停止通知。

    因此,这里加一个 sleep 是为了:在任何情况下(如任何版本的 Go、任何计算平台下的 Go、任何的 CPU 核心数等),内含这条 for 语句的这些 goroutine 都能够正常地响应停止通知。

    2019-08-27 19:45:55

  • Askerlve

    2018-09-19 09:54:36

    package main

    import (
    "fmt"
    "sync/atomic"
    )

    func main() {
    var count uint32
    trigger := func(i uint32, fn func()) {
    for {
    if n := atomic.LoadUint32(&count); n == i {
    fn()
    atomic.AddUint32(&count, 1)
    break
    }
    }
    }
    for i := uint32(0); i < 10; i++ {
    go func(i uint32) {
    fn := func() {
    fmt.Println(i)
    }
    trigger(i, fn)
    }(i)
    }
    trigger(10, func() {})
    }

    测试了下,这个函数的输出不受控,并且好像永远也不会结束,有人能帮忙解释下吗,go小白~😀
    作者回复

    可以加个sleep

    2018-09-20 18:58:39

  • 老茂

    2018-10-15 08:15:08

    不加sleep程序不能正常结束的情况貌似跟cpu核数有关,我是4核cpu,打印0到2每次都可以正常执行;0到3以上就会有卡主的情况,卡主时cpu达到100%,load会超过4。猜测是不是此时所有cpu都在处理count==0的for循环,没有空闲的cpu执行atomic.AddUint32(&count, 1)?
    作者回复

    Go语言调度goroutine是准抢占式的,虽然会防止某个goroutine运行太久,并做换下处理。但是像简单的死循环这种有可能会换下失败,尤其是windows下,这跟操作系统的底层支持有关。不过一般情况下不用担心。

    2018-10-15 12:40:14

  • 新垣结裤

    2018-09-21 15:39:45

    func main() {
    num := 10
    chs := [num+1]chan struct{}{}
    for i := 0; i < num+1; i++ {
    chs[i] = make(chan struct{})
    }
    for i := 0; i < num; i++ {
    go func(i int) {
    <- chs[i]
    fmt.Println(i)
    chs[i+1] <- struct{}{}
    }(i)
    }
    chs[0] <- struct{}{}
    <- chs[num]
    }
    每个goroutine执行完通过channel通知下一个goroutine,在主goroutine里控制第一个goroutine的开始,并接收最后一个goroutine结束的信号
    作者回复

    搞这么多通道有些浪费啊。另外切片不是并发安全的数据类型,最好不要这样用。

    2018-09-23 16:17:49

  • 言午木杉

    2020-01-14 09:20:24

    这篇加了代码,一下子就容易很多了,老师前面的几篇都太多名词了,需要琢磨去好几遍
    作者回复

    正式学习一个新东西的首要任务就是“重要名词解析”。一旦熟悉了这些名词以及它们背后的深意,后面的学习效率就会高很多。更重要的是,后面会学得更扎实(或者说很稳)。因为你真正融入了这个新东西所处的世界,站在了它的地基之上。这也是我的一点学习经验。共勉。

    2020-01-14 10:19:51

  • Askerlve

    2018-09-19 09:52:55

    package main

    import (
    "fmt"
    "sync/atomic"
    )

    func main() {
    var count uint32
    trigger := func(i uint32, fn func()) {
    for {
    if n := atomic.LoadUint32(&count); n == i {
    fn()
    atomic.AddUint32(&count, 1)
    break
    }
    }
    }
    for i := uint32(0); i < 10; i++ {
    go func(i uint32) {
    fn := func() {
    fmt.Println(i)
    }
    trigger(i, fn)
    }(i)
    }
    trigger(10, func() {})
    }

    这个函数的执行还是不可控诶,并且好像永远也不会结束,是因为我的go版本问题吗?
    作者回复

    Win下可能会有问题,你在bif语句后边加一句time.sleep(time.Nanosecond)。github上的代码我已经更新了。

    2018-09-20 19:26:52

  • 肖恩

    2019-05-08 21:01:54

    第一遍看好多都看不懂,看到后边回过头来看,发现用自旋goroutine实现,真实奇妙;现在想想,除了文章中实现方式,可以用channel同步实现;还可以用sync.WaitGroup实现
    作者回复

    祝贺你升级了;)

    2019-05-08 22:24:59

  • 嗷大猫的鱼

    2018-09-20 17:43:47

    老师,最近从头学习,前面一直没跟着动手,也没自己总结。这几天在整理每章的重点!

    https://github.com/wenxuwan/go36

    刚写完第二章,突然发现自己动手总结和只看差好多。我会继续保持喜欢总结!
    作者回复

    很好,加油!

    2018-09-20 20:46:40

  • 志鑫

    2019-05-11 08:15:21

    //个人笔记:使用一个通道来控制
    package main

    import "fmt"

    func main() {
    const n = 10
    m := 0
    ch := make(chan int, 10) //通道长度0~10之间,能够影响性能
    for i := 0; i < n; i++ {
    go func(i int) {
    for v := <-ch; v != i; v = <-ch {
    m++
    ch <- v //如果不是自己的轮次,则把值再放回去
    }
    fmt.Println(i)
    ch <- i + 1
    }(i)
    }
    ch <- 0
    for v := <-ch; v != 10; v = <-ch {
    ch <- v
    }
    fmt.Println(m)
    }
  • sky

    2018-09-19 13:50:56

    win64版本:go1.10.2
    linux64版本:go1.11

    linux下实际运行和预期一样,但为何win下会一直运行不会停止呢,且CPU也已经是100% 表示不解呀
    作者回复

    可以加个sleep。

    2018-09-20 18:57:44

  • 冰激凌的眼泪

    2018-09-19 09:47:57

    ‘’否则,我们就先让当前的 goroutine“睡眠”一个纳秒再进入下一个迭代。‘’

    示例代码里没有这个睡眠代码
    作者回复

    代码已经更新了。

    2018-09-20 19:27:08

  • 传说中的成大大

    2020-03-19 16:27:40

    本节内容主要是讲了 goroutine同步那一块
    主要通过通道 信号 应该sync包中也会有同步相关的函数
    我又去把16讲那个G队列再想了一遍
    实际上可能存在多个G队列 G队列之间属于并发关系
    但是G队列当中他们又是队列储存 顺序执行的
    作者回复

    更确切地说,是顺序地送给某个M执行。不过。它们又可以是并发的,因为如果前一个G进入了等待状态,那么同一个G队列中的后一个G就很可能会获得运行的机会。这时,前一个G并没有运行完成,它可能是在等待计时器到时或者IO操作完成。所以,我们可以说,在这种情况下,前后这两个G就是在并发运行。

    进一步讲,同一个G队列中的G都可以如此。所以我们还可以说,Go调度器中的所有可运行G都是有并发运行的能力的,只不过实际上是怎样的,还要看实时调度的具体情况。

    2020-03-19 18:07:48

  • SuperP ❤ 飝

    2018-10-11 19:37:48

    runtime.GOMAXPROCS 这个应该能控制P的数量
    作者回复

    对,可以。

    2018-10-15 12:20:20

  • cygnus

    2018-09-19 14:50:11

    demo40的执行结果不是幂等的,程序经常无法正常结束退出,只有极少数几次有正确输出。
    作者回复

    你在win下执行的嘛?

    2018-09-20 18:57:23

  • jxs1211

    2021-12-01 09:28:19

    其他地方也有类似的spinning操作,说道其实是一种乐观锁,先通过原子操作读取值,再在修改值的时候检查,符合条件才能修改,我的问题是这段代码是否就是乐观锁的定义诠释,这里仍然读取了原值再修改,而没有直接修改,是否足够乐观,不太理解乐观的真是含义,另外如果用悲观锁的话应该怎么实现
    作者回复

    乐观锁:总是假设在“我”操作共享资源的过程中没有“其他人”竞争操作。如果发现“其他人”确实在此期间竞争了,也就是发现假设失败,那就等一等再操作。CAS原子操作基本上能够体现出这种思想。通常,低频的并发操作适合用乐观锁。乐观锁一般会用比较轻量级的同步方法(如原子操作),但也不是100%。注意,高频的操作用乐观锁的话反而有可能影响性能,因为多了一步“探查是否有人与我竞争”的操作(当然了,标准的CAS操作可以把这种影响降到最低)。

    悲观锁:总是假设在“我”操作共享资源的过程中一定有“其他人”竞争操作。所以“我”会先用某种同步方法(如互斥锁)保护我的操作。这样的话,“我”在将要操作的时候就没必要去探查是否有人与我竞争(因为“我”总是假设肯定有竞争,而且已经做好了保护)。通常,频次较高的并发操作适合用悲观锁。不过,如果并发操作的频次非常低,用悲观锁也是可以的,因为这种情况下对性能影响不大。

    最后,一定要注意,使用任何同步方法和异步方法都首先要考虑程序的正确性,并且还要考虑程序的性能。程序的正确性一定要靠功能测试来保障,程序的性能一定要靠性能测试来保障。

    2021-12-02 13:02:56

  • Geek_d6cfa7

    2021-04-21 16:17:00

    go 通道传递达到顺序执行目的:
    func TestTransmitChan(t *testing.T) {
    start, end := make(chan bool), make(chan bool)
    head := start
    for i := 0; i < 10; i++ {
    end = make(chan bool)
    temp := i
    go func(head, end chan bool, i int) {
    <-head
    t.Logf("goroutine %v doing", i)
    end <- true
    }(head, end, temp)
    head = end
    }
    start <- true
    <-end

    t.Logf("main goroutine is end")
    }
    作者回复

    变量值的改变也不是并发安全的,所以这样做可能会出问题。而且这个方案有些复杂了。

    2021-04-22 11:02:41

  • Geek_9b9769

    2021-04-04 17:09:18

    老师,您看下,我的思路是用一个通道数组控制

    func main() {

    //预先创建11个通道数组,最后一个阻塞主gorouting
    chArr := [11]chan struct{}{}
    for i := 0; i < 11; i++ {
    chArr[i] = make(chan struct{})
    }

    //不阻塞第一个通道
    go func() {
    chArr[0] <- struct{}{}
    }()

    //开始循环
    for i := 0; i < 10; i++ {

    go func(i2 int) {
    //阻塞当前通道
    <-chArr[i2]
    fmt.Println(i2)
    //每次执行完通知下一个通道
    chArr[i2+1] <- struct{}{}
    }(i)
    }

    //阻塞主gorouting
    <-chArr[10]

    }
    作者回复

    可以倒是可以,但是搞这么多通道出来,太重了。

    这类同步工具从重到轻:通道 => 读写锁 => 互斥锁 => 原子操作。

    能轻尽量轻啊。

    2021-04-06 12:22:05