35|即学即练:如何实现一个轻量级线程池?

你好,我是Tony Bai。

在这一讲的开始,首先恭喜你完成了这门课核心篇语法部分的学习。这一部分的篇幅不多,主要讲解了Go的两个核心语法知识点:接口与并发原语。它们分别是耦合设计与并发设计的主要参与者,Go应用的骨架设计离不开它们。

但理论和实践毕竟是两回事,学完了基本语法,也需要实操来帮助我们落地。所以,在这核心篇的最后一讲,我依然会用一个小实战项目,帮助你学会灵活运用这部分的语法点。

不过,关于接口类型做为“关节”作用的演示,我们前面的两个小实战项目中都有一定的体现了,只是那时还没有讲到接口类型,你现在可以停下来,回顾一下09讲27讲的代码,看看是否有更深刻的体会。

而且,接口类型对Go应用静态骨架的编织作用,在接口类型数量较多的项目中体现得更明显,由于篇幅有限,我很难找到一个合适的演示项目。

因此,这一讲的实战项目,我们主要围绕Go并发来做,实现一个轻量级线程池,也就是Goroutine池。

为什么要用到Goroutine池?

第31讲学习Goroutine的时候,我们就说过:相对于操作系统线程,Goroutine的开销十分小,一个Goroutine的起始栈大小为2KB,而且创建、切换与销毁的代价很低,我们可以创建成千上万甚至更多Goroutine。

所以和其他语言不同的是,Go应用通常可以为每个新建立的连接创建一个对应的新Goroutine,甚至是为每个传入的请求生成一个Goroutine去处理。这种设计还有一个好处,实现起来十分简单,Gopher们在编写代码时也没有很高的心智负担。

不过,Goroutine的开销虽然“廉价”,但也不是免费的

最明显的,一旦规模化后,这种非零成本也会成为瓶颈。我们以一个Goroutine分配2KB执行栈为例,100w Goroutine就是2GB的内存消耗。

其次,Goroutine从Go 1.4版本开始采用了连续栈的方案,也就是每个Goroutine的执行栈都是一块连续内存,如果空间不足,运行时会分配一个更大的连续内存空间作为这个Goroutine的执行栈,将原栈内容拷贝到新分配的空间中来。

连续栈的方案,虽然能避免Go 1.3采用的分段栈会导致的“hot split”问题,但连续栈的原理也决定了,一旦Goroutine的执行栈发生了grow,那么即便这个Goroutine不再需要那么大的栈空间,这个Goroutine的栈空间也不会被Shrink(收缩)了,这些空间可能会处于长时间闲置的状态,直到Goroutine退出。

另外,随着Goroutine数量的增加,Go运行时进行Goroutine调度的处理器消耗,也会随之增加,成为阻碍Go应用性能提升的重要因素。

那么面对这样的问题,常见的应对方式是什么呢?

Goroutine池就是一种常见的解决方案。这个方案的核心思想是对Goroutine的重用,也就是把M个计算任务调度到N个Goroutine上,而不是为每个计算任务分配一个独享的Goroutine,从而提高计算资源的利用率。

接下来,我们就来真正实现一个简单的Goroutine池,我们叫它 workerpool

workerpool的实现原理

workerpool的工作逻辑通常都很简单,所以即便是用于生产环境的workerpool实现,代码规模也都在千行左右。

当然,workerpool有很多种实现方式,这里为了更好地演示Go并发模型的应用模式,以及并发原语间的协作,我们采用完全基于channel+select的实现方案,不使用其他数据结构,也不使用sync包提供的各种同步结构,比如Mutex、RWMutex,以及Cond等。

workerpool的实现主要分为三个部分:

  • pool的创建与销毁;
  • pool中worker(Goroutine)的管理;
  • task的提交与调度。

其中,后两部分是pool的“精髓”所在,这两部分的原理我也用一张图表示了出来:

图片

我们先看一下图中pool对worker的管理。

capacity是pool的一个属性,代表整个pool中worker的最大容量。我们使用一个带缓冲的channel:active,作为worker的“计数器”,这种channel使用模式就是我们在第33讲中讲过的计数信号量,如果记不太清了可以复习一下第33讲中的相关内容。

当active channel可写时,我们就创建一个worker,用于处理用户通过Schedule函数提交的待处理的请求。当active channel满了的时候,pool就会停止worker的创建,直到某个worker因故退出,active channel又空出一个位置时,pool才会创建新的worker填补那个空位。

这张图里,我们把用户要提交给workerpool执行的请求抽象为一个Task。Task的提交与调度也很简单:Task通过Schedule函数提交到一个task channel中,已经创建的worker将从这个task channel中读取task并执行。

好了!“Talk is cheap,show me the code”!接下来,我们就来写一版workerpool的代码,来验证一下这里分析的原理是否可行。

workerpool的一个最小可行实现

我们先建立workerpool目录作为实战项目的源码根目录,然后为这个项目创建go module:

$mkdir workerpool1
$cd workerpool1
$go mod init github.com/bigwhite/workerpool

接下来,我们创建pool.go作为workpool包的主要源码文件。在这个源码文件中,我们定义了Pool结构体类型,这个类型的实例代表一个workerpool:

type Pool struct {
    capacity int         // workerpool大小

    active chan struct{} // 对应上图中的active channel
    tasks  chan Task     // 对应上图中的task channel

    wg   sync.WaitGroup  // 用于在pool销毁时等待所有worker退出
    quit chan struct{}   // 用于通知各个worker退出的信号channel
}

workerpool包对外主要提供三个API,它们分别是:

  • workerpool.New:用于创建一个pool类型实例,并将pool池的worker管理机制运行起来;
  • workerpool.Free:用于销毁一个pool池,停掉所有pool池中的worker;
  • Pool.Schedule:这是Pool类型的一个导出方法,workerpool包的用户通过该方法向pool池提交待执行的任务(Task)。

接下来我们就重点看看这三个API的实现。

我们先来看看workerpool.New是如何创建一个pool实例的:

func New(capacity int) *Pool {
    if capacity <= 0 {
        capacity = defaultCapacity
    }
    if capacity > maxCapacity { 
        capacity = maxCapacity
    } 

    p := &Pool{
        capacity: capacity,
        tasks:    make(chan Task),
        quit:     make(chan struct{}),
        active:   make(chan struct{}, capacity),
    }

    fmt.Printf("workerpool start\n")

    go p.run()

    return p
}

我们看到,New函数接受一个参数capacity用于指定workerpool池的容量,这个参数用于控制workerpool最多只能有capacity个worker,共同处理用户提交的任务请求。函数开始处有一个对capacity参数的“防御性”校验,当用户传入不合理的值时,函数New会将它纠正为合理的值。

Pool类型实例变量p完成初始化后,我们创建了一个新的Goroutine,用于对workerpool进行管理,这个Goroutine执行的是Pool类型的run方法:

func (p *Pool) run() { 
    idx := 0 

    for { 
        select { 
        case <-p.quit:
            return
        case p.active <- struct{}{}:
            // create a new worker
            idx++
            p.newWorker(idx)
        } 
    } 
}

run方法内是一个无限循环,循环体中使用select监视Pool类型实例的两个channel:quit和active。这种在for中使用select监视多个channel的实现,在Go代码中十分常见,是一种惯用法。

当接收到来自quit channel的退出“信号”时,这个Goroutine就会结束运行。而当active channel可写时,run方法就会创建一个新的worker Goroutine。 此外,为了方便在程序中区分各个worker输出的日志,我这里将一个从1开始的变量idx作为worker的编号,并把它以参数的形式传给创建worker的方法。

我们再将创建新的worker goroutine的职责,封装到一个名为newWorker的方法中:

func (p *Pool) newWorker(i int) {
    p.wg.Add(1)
    go func() {
        defer func() {
            if err := recover(); err != nil {
                fmt.Printf("worker[%03d]: recover panic[%s] and exit\n", i, err)
                <-p.active
            }
            p.wg.Done()
        }()

        fmt.Printf("worker[%03d]: start\n", i)

        for {
            select {
            case <-p.quit:
                fmt.Printf("worker[%03d]: exit\n", i)
                <-p.active
                return
            case t := <-p.tasks:
                fmt.Printf("worker[%03d]: receive a task\n", i)
                t()
            }
        }
    }()
}

我们看到,在创建一个新的worker goroutine之前,newWorker方法会先调用p.wg.Add方法将WaitGroup的等待计数加一。由于每个worker运行于一个独立的Goroutine中,newWorker方法通过go关键字创建了一个新的Goroutine作为worker。

新worker的核心,依然是一个基于for-select模式的循环语句,在循环体中,新worker通过select监视quit和tasks两个channel。和前面的run方法一样,当接收到来自quit channel的退出“信号”时,这个worker就会结束运行。tasks channel中放置的是用户通过Schedule方法提交的请求,新worker会从这个channel中获取最新的Task并运行这个Task。

Task是一个对用户提交的请求的抽象,它的本质就是一个函数类型:

type Task func()

这样,用户通过Schedule方法实际上提交的是一个函数类型的实例。

在新worker中,为了防止用户提交的task抛出panic,进而导致整个workerpool受到影响,我们在worker代码的开始处,使用了defer+recover对panic进行捕捉,捕捉后worker也是要退出的,于是我们还通过<-p.active更新了worker计数器。并且一旦worker goroutine退出,p.wg.Done也需要被调用,这样可以减少WaitGroup的Goroutine等待数量。

我们再来看workerpool提供给用户提交请求的导出方法Schedule:

var ErrWorkerPoolFreed    = errors.New("workerpool freed")       // workerpool已终止运行

func (p *Pool) Schedule(t Task) error {
    select {
    case <-p.quit:
        return ErrWorkerPoolFreed
    case p.tasks <- t:
        return nil
    }
}

Schedule方法的核心逻辑,是将传入的Task实例发送到workerpool的tasks channel中。但考虑到现在workerpool已经被销毁的状态,我们这里通过一个select,检视quit channel是否有“信号”可读,如果有,就返回一个哨兵错误ErrWorkerPoolFreed。如果没有,一旦p.tasks可写,提交的Task就会被写入tasks channel,以供pool中的worker处理。

这里要注意的是,这里的Pool结构体中的tasks是一个无缓冲的channel,如果pool中worker数量已达上限,而且worker都在处理task的状态,那么Schedule方法就会阻塞,直到有worker变为idle状态来读取tasks channel,schedule的调用阻塞才会解除。

至此,workerpool的最小可行实现的主要逻辑都实现完了。我们来验证一下它是否能按照我们的预期逻辑运行。

现在我们建立一个使用workerpool的项目demo1:

$mkdir demo1
$cd demo1
$go mod init demo1

由于我们要引用本地的module,所以我们需要手工修改一下demo1的go.mod文件,并利用replace指示符将demo1对workerpool的引用指向本地workerpool1路径:

module demo1

go 1.17

require github.com/bigwhite/workerpool v1.0.0

replace github.com/bigwhite/workerpool v1.0.0 => ../workerpool1

然后创建demo1的main.go文件,源码如下:

package main
  
import (
    "time"
    "github.com/bigwhite/workerpool"
)

func main() {
    p := workerpool.New(5)

    for i := 0; i < 10; i++ {
        err := p.Schedule(func() {
            time.Sleep(time.Second * 3)
        })
        if err != nil {
            println("task: ", i, "err:", err)
        }
    }

    p.Free()
}

这个示例程序创建了一个capacity为5的workerpool实例,并连续向这个workerpool提交了10个task,每个task的逻辑很简单,只是Sleep 3秒后就退出。main函数在提交完任务后,调用workerpool的Free方法销毁pool,pool会等待所有worker执行完task后再退出。

demo1示例的运行结果如下:

workerpool start
worker[005]: start
worker[005]: receive a task
worker[003]: start
worker[003]: receive a task
worker[004]: start
worker[004]: receive a task
worker[001]: start
worker[002]: start
worker[001]: receive a task
worker[002]: receive a task
worker[004]: receive a task
worker[005]: receive a task
worker[003]: receive a task
worker[002]: receive a task
worker[001]: receive a task
worker[001]: exit
worker[005]: exit
worker[002]: exit
worker[003]: exit
worker[004]: exit
workerpool freed

从运行的输出结果来看,workerpool的最小可行实现的运行逻辑与我们的原理图是一致的。

不过,目前的workerpool实现好比“铁板一块”,虽然我们可以通过capacity参数可以指定workerpool容量,但我们无法对workerpool的行为进行定制。

比如当workerpool中的worker数量已达上限,而且worker都在处理task时,用户调用Schedule方法将阻塞,如果用户不想阻塞在这里,以我们目前的实现是做不到的。

那我们可以怎么改进呢?我们可以尝试在上面实现的基础上,为workerpool添加功能选项(functional option)机制。

添加功能选项机制

功能选项机制,可以让某个包的用户可以根据自己的需求,通过设置不同功能选项来定制包的行为。Go语言中实现功能选项机制有多种方法,但Go社区目前使用最为广泛的一个方案,是Go语言之父Rob Pike在2014年在博文《自引用函数与选项设计》中论述的一种,这种方案也被后人称为“功能选项(functional option)”方案。

接下来,我们就来看看如何使用Rob Pike的这种“功能选项”方案,让workerpool支持行为定制机制。

首先,我们将workerpool1目录拷贝一份形成workerpool2目录,我们将在这个目录下为workerpool包添加功能选项机制。

然后,我们在workerpool2目录下创建option.go文件,在这个文件中,我们定义用于代表功能选项的类型Option:

type Option func(*Pool)

我们看到,这个Option实质是一个接受*Pool类型参数的函数类型。那么如何运用这个Option类型呢?别急,马上你就会知道。现在我们先要做的是,明确给workerpool添加什么功能选项。这里我们为workerpool添加两个功能选项:Schedule调用是否阻塞,以及是否预创建所有的worker。

为了支持这两个功能选项,我们需要在Pool类型中增加两个bool类型的字段,字段的具体含义,我也在代码中注释了:

type Pool struct {
    ... ...
    preAlloc bool // 是否在创建pool的时候就预创建workers,默认值为:false

    // 当pool满的情况下,新的Schedule调用是否阻塞当前goroutine。默认值:true
    // 如果block = false,则Schedule返回ErrNoWorkerAvailInPool
    block  bool
    ... ...
}

针对这两个字段,我们在option.go中添加两个功能选项,WithBlock与WithPreAllocWorkers:

func WithBlock(block bool) Option {
    return func(p *Pool) {
        p.block = block
    }
}

func WithPreAllocWorkers(preAlloc bool) Option {
    return func(p *Pool) {
        p.preAlloc = preAlloc
    }
}

我们看到,这两个功能选项实质上是两个返回闭包函数的函数。

为了支持将这两个Option传给workerpool,我们还需要改造一下workerpool包的New函数,改造后的New函数代码如下:

func New(capacity int, opts ...Option) *Pool {
    ... ...
    for _, opt := range opts {
        opt(p)
    }

    fmt.Printf("workerpool start(preAlloc=%t)\n", p.preAlloc)

    if p.preAlloc {
        // create all goroutines and send into works channel
        for i := 0; i < p.capacity; i++ {
            p.newWorker(i + 1)
            p.active <- struct{}{}
        }
    }

    go p.run()

    return p
}

新版New函数除了接受capacity参数之外,还在它的参数列表中增加了一个类型为Option的可变长参数opts。在New函数体中,我们通过一个for循环,将传入的Option运用到Pool类型的实例上。

新版New函数还会根据preAlloc的值来判断是否预创建所有的worker,如果需要,就调用newWorker方法把所有worker都创建出来。newWorker的实现与上一版代码并没有什么差异,这里就不再详说了。

但由于preAlloc选项的加入,Pool的run方法的实现有了变化,我们来看一下:

 func (p *Pool) run() {
     idx := len(p.active)
 
     if !p.preAlloc {
     loop:
         for t := range p.tasks {
             p.returnTask(t)
             select {
             case <-p.quit:
                 return
             case p.active <- struct{}{}:
                 idx++
                 p.newWorker(idx)
             default:
                 break loop
             }
         }
     }
 
     for {
         select {
         case <-p.quit:
             return
         case p.active <- struct{}{}:
             // create a new worker
             idx++
             p.newWorker(idx)
         }
     }
 }

新版run方法在preAlloc=false时,会根据tasks channel的情况在适合的时候创建worker(第4行~第18行),直到active channel写满,才会进入到和第一版代码一样的调度逻辑中(第20行~第29行)。

而且,提供给用户的Schedule函数也因WithBlock选项,有了一些变化:

 func (p *Pool) Schedule(t Task) error {
     select {
     case <-p.quit:
         return ErrWorkerPoolFreed
     case p.tasks <- t:
         return nil
     default:
         if p.block {
             p.tasks <- t
             return nil
         }
         return ErrNoIdleWorkerInPool
     }
 }

Schedule在tasks chanel无法写入的情况下,进入default分支。在default分支中,Schedule根据block字段的值,决定究竟是继续阻塞在tasks channel上,还是返回ErrNoIdleWorkerInPool错误。

和第一版worker代码一样,我们也来验证一下新增的功能选项是否好用。我们建立一个使用新版workerpool的项目demo2,demo2的go.mod与demo1的go.mod相似:

module demo2

go 1.17

require github.com/bigwhite/workerpool v1.0.0

replace github.com/bigwhite/workerpool v1.0.0 => ../workerpool2

demo2的main.go文件如下:

package main
  
import (
    "fmt"
    "time"

    "github.com/bigwhite/workerpool"
)

func main() {
    p := workerpool.New(5, workerpool.WithPreAllocWorkers(false), workerpool.WithBlock(false))

    time.Sleep(time.Second * 2)
    for i := 0; i < 10; i++ {
        err := p.Schedule(func() {
            time.Sleep(time.Second * 3)
        })
        if err != nil {
            fmt.Printf("task[%d]: error: %s\n", i, err.Error())
        }
    }

    p.Free()
}

在demo2中,我们使用workerpool包提供的功能选项,设置了我们期望的workerpool的运作行为,包括不要预创建worker,以及不要阻塞Schedule调用。

考虑到Goroutine调度的次序的不确定性,这里我在创建workerpool与真正开始调用Schedule方法之间,做了一个Sleep,尽量减少Schedule都返回失败的频率(但这仍然无法保证这种情况不会发生)。

运行demo2,我们会得到这个结果:

workerpool start(preAlloc=false)
task[1]: error: no idle worker in pool
worker[001]: start
task[2]: error: no idle worker in pool
task[4]: error: no idle worker in pool
task[5]: error: no idle worker in pool
task[6]: error: no idle worker in pool
task[7]: error: no idle worker in pool
task[8]: error: no idle worker in pool
task[9]: error: no idle worker in pool
worker[001]: receive a task
worker[002]: start
worker[002]: exit
worker[001]: receive a task
worker[001]: exit
workerpool freed(preAlloc=false)

不过,由于Goroutine调度的不确定性,这个结果仅仅是很多种结果的一种。我们看到,仅仅001这个worker收到了task,其余的worker都因为worker尚未创建完毕,而返回了错误,而不是像demo1那样阻塞在Schedule调用上。

小结

好了,今天的课讲到这里就结束了,现在我们一起来回顾一下吧。

在这一讲中,我们基于我们前面所讲的Go并发方面的内容,设计并实现了一个workerpool的最小可行实现,只用了不到200行代码。为了帮助你理解Go并发原语是如何运用的,这个workerpool实现完全基于channel+select,并没有使用到sync包提供的各种锁。

我们还基于workerpool的最小可行实现,为这个pool增加了功能选项的支持,我们采用的功能选项方案也是Go社区最为流行的方案,日常编码中如果你遇到了类似的需求可以重点参考。

最后我要提醒你:上面设计与实现的workerpool只是一个演示项目,不能作为生产项目使用。

思考题

关于workerpool这样的项目,如果让你来设计,你的设计思路是什么,不妨在留言区敞开谈谈?

欢迎你把这节课分享给更多感兴趣的朋友。我是Tony Bai,我们下节课见。

今天的项目源码在这里!

精选留言

  • ivhong

    2022-03-17 09:18:29

    非常感谢老师带着做了一次这样的实现,因为我自己也尝试过这种实现(纯粹是为了学习用)。有几个问题我不是特别明白,不知道是不是和老师理解的一样,望老师闲暇之余给予指正,谢谢!
    1. 这个是不是叫“协程池”,为什么叫做“线程池”?两者有什么区别呢?或者是到底什么是“协程”呢?
    2. 是不是这节课的实现,也纯粹是为了学习而实现的,个人理解,go实现Goroutine,就是为了解决“线程池”的繁琐,让“并发”实现的不用那么的麻烦,如果是超小“任务”,不用考虑线程频繁切换导致系统资源的浪费。如果再实现“协程池”的话,是不是丢失了这种优点?
    3. 常驻内存的Goroutine,反复使用,会导致这个Goroutine的内存越来越大,或者其他隐藏的风险么?
    作者回复

    好问题。

    1. 传统理解的coroutine一般是面向协作式,而非抢占式。像python中通过yield关键字创建的协程,与主routine之间是在一个线程上实现的切换执行,从设计角度是通过coroutine实现了并发(concurrency),但其实它们还是串行执行的,不会真正并行(paralellism),即便在多核处理器上。

    基于上面的理解,我们就可以意识到goroutine并非传统意义上的coroutine,是支持抢占的,而且也必须依赖抢占实现runtime对goroutine的调度。它更像thread,可以绑定不同的cpu核并行执行(如果是在多核处理器上的话)。同时基于goroutine的设计也会一种并发的设计。

    而goroutine与thread又不同,goroutine是在用户层(相较于os的内核层)调度的,os并不知道其存在,goroutine的切换相对轻量。而thread是os
    来调度的,切换代价更高一些。

    所以文中将goroutine称为“轻量级线程”,而不是协程。

    2. 你理解的没错。这节课是为了演示goroutine、channel之间的调度与通信机制而“设计”出来的。goroutine使用代价很低,通常不用考虑池化。但是在一些大型网络服务程序时,一旦goroutine数量过多,内存占用以及调度goroutine的代价就不能不考虑了。于是有了“池化”的思路。这与传统的线程池的思路的确是一脉相承的

    3. go是gc的,内存不会越来越大。

    2022-03-17 13:33:39

  • Darren

    2022-01-17 22:32:31

    老师以下几个问题哈:
    1、第一种实现中,这块是不是有点问题:

    go func() {
    defer func() {
    if err := recover(); err != nil {
    fmt.Printf("worker[%03d]: recover panic[%s] and exit\n", i, err)
    <-p.active
    }
    p.wg.Done()
    }()

    <-p.active是不是应该要放到if的外面,如果task执行本身没有出错,正常结束了,active没有减少的地方

    2、这块文字描述有点问题,p<-active应该是<-p.active
    “使用了 defer+recover 对 panic 进行捕捉,捕捉后 worker 也是要退出的,于是我们还通过p<-active更新了 worker 计数器”

    3、第二种实现中,当没有提前创建worker,那么当tasks中有任务的时候,p.returnTask方法是干啥的?文章中没有这个方法,且文字也没有说明呀

    func (p *Pool) run() {
    idx := len(p.active)

    if !p.preAlloc {
    loop:
    for t := range p.tasks {
    p.returnTask(t)
    select {
    case <-p.quit:
    return
    case p.active <- struct{}{}:
    idx++
    p.newWorker(idx)
    default:
    break loop
    }
    }
    }
    作者回复

    看的真细致!👍

    1. worker一旦创建后,除了panic和quit通知退出,worker是不会退出的,也就是没有所谓“正常退出”的情况。所以没在defer中调用<-p.active。
    2. 的确是笔误,感谢指出。
    3. 在文后有源码链接。这里的task仅是触发了worker创建,这里是调度循环,不处理task,所以要把task扔回tasks channel,等worker启动后再处理。

    2022-01-18 14:47:09

  • Aeins

    2022-06-05 00:08:23

    1. demo01 的实现,已经是一种预分配的实现,demo02 在 New 里面 去预分配,没有必要

    2. demo02 ,task 取出来,再放回去,总感觉不优雅。
  • $侯

    2022-01-17 22:36:25

    老师您好请教几个问题:
    第一个问题,demo1中没有看到p.Free的代码示例,Free方法只是向p.quit <- struct{}{}发送一个空结构体就可以吗,请教下Free方式该如何写
    第二个问题,demo1中好像也没看看到p.wg.Wait()
    作者回复

    原文中有源码的链接,在最后。

    源码在 https://github.com/bigwhite/publication/tree/master/column/timegeek/go-first-course/35

    看了后,就可以回答你的问题了。

    2022-01-18 11:04:32

  • Geek_0d5d37

    2023-04-06 15:05:43

    老师您好,这个段代码作用我也不太理解
    if !p.preAlloc {
    loop:
    for t := range p.tasks {
    p.returnTask(t)
    select {
    case <-p.quit:
    return
    case p.active <- struct{}{}:
    idx++
    p.newWorker(idx)
    default:
    break loop
    }
    }
    }

    您在留言中回答 当preAlloc=false时有用 ,如果是这样demo1 就是等于fasle的情况没使用这段代码的 ,请老师有空回答一下
    作者回复

    当preAlloc=false时,即不预分配时。这样就根据tasks的情况来创建worker。如果当前没有task,实际上系统中没有worker被创建出来,直到有task才会创建worker。一直到active channel满了!这样pool中所有worker都创建出来后,再跳出循环,进入下面的quit监听。

    不过workpool2的这段代码的确有refactor的空间😁。可以写的更好理解一些。

    2023-04-07 12:56:23

  • Six Days

    2023-03-09 15:48:49

    请教一下,池化的话,当前的demo1场景是不是没有考虑使用同一个worker进行t任务的处理,而是通过不断的创建 Goroutine实现的,通过capacity控制了处理任务Goroutine的数量,通过Go gc 来实现Goroutine的回收,是不是因为Goroutine 的占内存比较小,为此没有做Goroutine 的复用,所以采用不断创建,还是当前为了简单演示呢,实际还是需要复用Goroutine 的呢?
    作者回复

    demo1创建goroutine后没有回收,一直是复用的,最多创建capacity个goroutine,直到pool销毁。

    2023-03-10 16:07:45

  • Six Days

    2023-03-09 15:17:50

    请教一下,p.active 的chan 容量指定是capacity,而只有run的时候,才会通过p.active <- struct{}{} 往p.active中丢东西,p.active 才会变多,达到capacity时。任务T则阻塞,我理解 run 只会New的时候触发,请问是否与文中描述一致呢?
    func (p *Pool) run() {
    idx := 0

    for {
    select {
    case <-p.quit:
    return
    case p.active <- struct{}{}:
    // create a new worker
    idx++
    p.newWorker(idx)
    }
    }
    }
    作者回复

    run在New中调用,active满了,代表capacity个worker goroutine已经创建完毕,后续将重用这些goroutine。这时候run会阻塞在select上直到quit channel有数据才会退出。p.active 那个case后续没用了。

    2023-03-10 16:19:21

  • demajiao

    2023-01-22 22:13:56

    if !p.preAlloc {
    loop:
    for t := range p.tasks {
    p.returnTask(t)
    select {
    case <-p.quit:
    return
    case p.active <- struct{}{}:
    idx++
    p.newWorker(idx)
    default:
    break loop
    }
    }
    }

    这段代码感觉没用呀。
    作者回复

    当preAlloc=false时,即不预分配时,有用。

    2023-01-26 08:23:15

  • 撕影

    2023-01-14 15:50:49

    为何关键变化不写出来?太仓促了吧,一篇最后一节以没看懂收场,对学生打击可不小啊老师
    作者回复

    “关键变化”,指的是?

    2023-01-16 16:18:57

  • Sunrise

    2022-11-24 08:38:28

    考虑到 Goroutine 调度的次序的不确定性,这里我在创建 workerpool 与真正开始调用 Schedule 方法之间,做了一个 Sleep,尽量减少 Schedule 都返回失败的频率
    这块也不太懂,为啥不加 Sleep 会全返回失败呢?
    作者回复

    最后一版Schedule加入了default分支,当pool资源不够又设置为non block时,schedule肯定会返回error啊。

    2022-11-27 03:33:23

  • Sunrise

    2022-11-23 15:32:56

    有几个问题不大理解,望老师抽空解答:
    1)自引用函数与选项设计是为了解决 go 函数没有默认参数和可选参数吗? go 函数为什么没有设计默认参数和可选参数呢?
    2)为什么下面的 for { select ... } 放到 goroutine 中 才会输出 ch2: 2 ch1: 1 done, 如果直接放到外面只会输出 done?
    func TestSelect(t *testing.T) {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go func() {
    ch1 <- 1
    }()

    go func() {
    ch2 <- 2
    }()

    go func() {
    for {
    select {
    case i := <-ch1:
    fmt.Println("ch1:", i)
    case j := <-ch2:
    fmt.Println("ch2:", j)
    default:
    fmt.Println("done")
    return
    }
    }
    }()
    // ch2: 2 ch1: 1 done
    }
    作者回复

    问题1:主要是为了可选参数吧。为什么go没有原生支持默认参数与可选参数,我猜是因为go设计者压根就不想引入这个复杂性。

    问题2: 和goroutine的调度顺序有关。

    2022-11-24 09:03:22

  • 菠萝吹雪—Code

    2022-09-12 17:03:27

    中秋假期打卡
    作者回复

    👍

    2022-09-14 03:49:53

  • H

    2022-08-25 19:06:53

    有些没太懂:
    1. run里面 p.returnTask(t),相当于又把 task 异步扔p.tasks 里面了,然后创建worker等待从 p.tasks里面取task执行。因为returnTask和newWorker都是异步,所以无法保证是range p.tasks还是worker先执行
    2.在我理解,既然是想开5个容量的非预加载的线程池,10个循环中应该前5个应该都创建woker呀
    作者回复

    1. 是的。
    2. 原因就是问题1的描述

    2022-08-28 20:51:50

  • 骚动

    2022-08-19 21:52:45

    老师,请教一下,这种软件架构图是用什么画的啊?平时自己也想画画自己做的东西的架构图,但是没找到什么合适的工具
    作者回复

    在线工具,可以用processon.com,离线的我一般用draw.io。

    2022-08-23 09:15:33

  • MClink

    2022-07-21 08:03:21

    功能选项这种封装的方式确实拓展性很好,和传统的参数传入并自动绑定来说,不需要更改New 的代码,只需要在外拓展
    作者回复

    👍

    2022-07-21 19:10:47

  • 曾祥金

    2022-07-16 16:42:36

    感觉退出可以用context处理
  • Geek_a6104e

    2022-07-10 22:39:32

    请问tasks通道为什么是不带缓存区的
    作者回复

    示例原意就是设计一个同步提交的workerpool池。即如果Schedule方法返回成功,说明用户提交的task肯定被某个worker处理了。

    如果要设计为异步提交语义,可以使用带缓冲的tasks channel。那样schedule方法提交后,task可能一直在channel中存着,直到有worker处理。但当缓冲区满了,还是会退化为同步提交。

    2022-07-13 16:22:32

  • Hugh

    2022-05-17 22:44:40

    老师您好,请教个问题,之前说过在GMP模型中数据结构G是可以被复用的,那么协程池还有必要吗?因为协程池的一大优势就是复用协程,避免反复创建协程,那么其实go scheduler已经做到了
    作者回复

    这是两个层面的复用。GMP对G数据结构的复用是在runtime调度层面,但这种复用 用户层无法控制,G也不是总能被复用,就像sync.Pool中数据项,在一定时间内如果没有goroutine去从池中get,那么还是会被释放的。

    当然用户层面实现的goroutine 池也可以增加idle释放的特性(这一讲中的例子不支持),但这种释放是用户层可控的。

    2022-05-18 05:24:54

  • Geek_as

    2022-04-27 17:43:16

    老师,最后一个例子好像有问题,假如我设置的是当没有活动go worker的时候阻塞,然后当前线程不会预创建,这时候我往池里面添加一个任务,由于没有worker可以接收,导致它走到了default,由于我是设置了获取不到就阻塞,所以我的代码走到了
    if p.block{
    p.tasks <- t
    }
    这里,这时候run方法检测到task管道里有task,就接收下来,前面的p.tasks <- t阻塞释放了,会继续走下去,然后run的话,在接收了管道的task后,执行的下一步操作是returnTask,然后创建活动worker,去执行returnTask返回的task,但出现了神奇的一幕,run的执行流程卡在returnTask的p.tasks <- task中,导致后面的活动worker没法创建,一直卡在returnTask那里,后面我分析了一下,感觉应该是我的tasks是无缓冲队列,所以returnTask执行p.tasks <- task的时候就卡在这里,等待其他人的接收,但这样就有问题了,因为接收操作要么在run的range中,要么是由活动worker接收,因为卡在returnTask这里,所以没法创建活动worker,也就没worker消费,变成了类似于死锁的样子,假如我配置了阻塞,和不预创建,理论上我提交给池里面的所有任务都不会被执行,我试了一下,当提交的任务超过两个就会爆死锁异常,后面我将returnTask这一步动作移动到了创建worker之后,就解决了这个问题
    作者回复

    demo2使用Workpool2中的returnTask代码如下:

    func (p *Pool) returnTask(t Task) {
    go func() {
    p.tasks <- t
    }()
    }

    returnTask中创建了一个新goroutine来异步send task,returnTask怎么会卡住呢?

    2022-04-28 17:33:31

  • 木木

    2022-03-18 15:15:16

    学到很多!感谢
    作者回复

    👍

    2022-03-21 05:08:21