28|调度引擎:负载均衡与调度器实战

你好,我是郑建勋。

在上一节课程中,我们实战了广度优先搜索算法,不过我们对网站的爬取都是在一个协程中进行的。在真实的实践场景中,我们常常需要爬取多个初始网站,我们希望能够同时爬取这些网站。这就需要合理调度和组织爬虫任务了。因此,这节课的重点就是实战任务调度的高并发模型,使资源得到充分的利用。

实战调度引擎

首先,我们新建一个文件夹engine用于存储调度引擎的代码,核心的调度逻辑位于ScheduleEngine.Run中。这部分的完整代码位于 tag v0.1.4,你可以对照代码进行查看。

调度引擎主要目标是完成下面几个功能:

  1. 创建调度程序,接收任务并将任务存储起来;
  2. 执行调度任务,通过一定的调度算法将任务调度到合适的worker中执行;
  3. 创建指定数量的worker,完成实际任务的处理;
  4. 创建数据处理协程,对爬取到的数据进行进一步处理。
func (s *ScheduleEngine) Run() {
	requestCh := make(chan *collect.Request)
	workerCh := make(chan *collect.Request)
	out := make(chan collect.ParseResult)
	s.requestCh = requestCh
	s.workerCh = workerCh
	s.out = out
	go s.Schedule()
}

Run方法首先初始化了三个通道。其中,requestCh负责接收请求,workerCh负责分配任务给worker,out负责处理爬取后的数据,完成下一步的存储操作。schedule函数会创建调度程序,负责的是调度的核心逻辑。

第一步我们来看看schedule函数如何接收任务并完成任务的调度。

schedule函数如下所示,其中,requestCh通道接收来自外界的请求,并将请求存储到reqQueue队列中。workerCh通道负责传送任务,后面每一个worker将获取该通道的内容,并执行对应的操作。

在这里,我们使用了for语句,让调度器循环往复地获取外界的爬虫任务,并将任务分发到worker中。如果任务队列reqQueue大于0,意味着有爬虫任务,这时我们获取队列中第一个任务,并将其剔除出队列。最后ch <- req 会将任务发送到workerCh通道中,等待worker接收。

func (s *Schedule) Schedule() {
	var reqQueue = s.Seeds
	go func() {
		for {
			var req *collect.Request
			var ch chan *collect.Request

			if len(reqQueue) > 0 {
				req = reqQueue[0]
				reqQueue = reqQueue[1:]
				ch = s.workerCh
			}
			select {
			case r := <-s.requestCh:
				reqQueue = append(reqQueue, r)

			case ch <- req:
			}
		}
	}()
}

通道还有一个特性,就是我们往nil通道中写入数据会陷入到堵塞的状态。因此,如果reqQueue为空,这时req和ch都是nil,当前协程就会陷入到堵塞的状态,直到接收到新的请求才会被唤醒。

我们可以用一个例子来验证这一特性:

func main() {
	var ch chan *int
	go func() {
		<-ch
	}()
	select {
	case ch <- nil:
		fmt.Println("it's time")
	}
}

在这个例子中,运行后会出现死锁,因为当前程序全部陷入到了无限堵塞的状态。

fatal error: all goroutines are asleep - deadlock!

调度引擎除了启动schedule函数,还需要安排多个实际干活的worker程序。

下一步,让我们创建指定数量的worker,完成实际任务的处理。其中WorkCount为执行任务的数量,可以灵活地去配置。

func (s *ScheduleEngine) Run() {
	...
	go s.Schedule()
	for i := 0; i < s.WorkCount; i++ {
		go s.CreateWork()
	}
}

这里的 CreateWork 创建出实际处理任务的函数,它又细分为下面几个步骤:

  • ←s.workerCh接收到调度器分配的任务;
  • s.fetcher.Get 访问服务器,r.ParseFunc 解析服务器返回的数据;
  • s.out ← result 将返回的数据发送到out通道中,方便后续的处理。
func (s *Schedule) CreateWork() {
	for {
		r := <-s.workerCh
		body, err := s.Fetcher.Get(r)
		if err != nil {
			s.Logger.Error("can't fetch ",
				zap.Error(err),
			)
			continue
		}
		result := r.ParseFunc(body, r)
		s.out <- result
	}
}

最后一步,我们需要单独安排一个函数来处理爬取并解析后的数据结构,完整的函数如下:

func (s *Schedule) HandleResult() {
	for {
		select {
		case result := <-s.out:
			for _, req := range result.Requesrts {
				s.requestCh <- req
			}
			for _, item := range result.Items {
				// todo: store
				s.Logger.Sugar().Info("get result", item)
			}
		}
	}
}

在HandleResult函数中,<-s.out接收所有worker解析后的数据,其中要进一步爬取的Requests列表将全部发送回s.requestCh通道,而result.Items里包含了我们实际希望得到的结果,所以我们先用日志把结果打印出来。

最后,让我们用之前介绍的爬取豆瓣小组的例子来验证调度器的功能。

在main函数中,生成初始网址列表作为种子任务。构建engine.Schedule,设置worker的数量,采集器Fetcher和日志Logger,并调用s.Run()运行调度器。

 func main(){
		var seeds []*collect.Request
		for i := 0; i <= 0; i += 25 {
			str := fmt.Sprintf("<https://www.douban.com/group/szsh/discussion?start=%d>", i)
			seeds = append(seeds, &collect.Request{
				Url:       str,
				WaitTime:  1 * time.Second,
				Cookie:    "xxx",
				ParseFunc: doubangroup.ParseURL,
			})
		}
	
		var f collect.Fetcher = &collect.BrowserFetch{
			Timeout: 3000 * time.Millisecond,
			Logger:  logger,
			Proxy:   p,
		}
	
		s := engine.Schedule{
			WorkCount: 5,
			Logger:    logger,
			Fetcher:   f,
			Seeds:     seeds,
		}
		s.Run()
}

输出结果为:

{"level":"INFO","ts":"2022-10-19T00:55:54.281+0800","caller":"engine/schedule.go:78","msg":"get result: <https://www.douban.com/group/topic/276978032/>"}
{"level":"INFO","ts":"2022-10-19T00:55:54.355+0800","caller":"engine/schedule.go:78","msg":"get result: <https://www.douban.com/group/topic/276973871/>"}

函数式选项模式

在上面的例子中,我们初始化engine.Schedule时将一系列的参数传递到了结构体当中。在实践中可能会有几十个参数等着我们赋值,从面向对象的角度来看,不同参数的灵活组合可能会带来不同的调度器类型。在实践中为了方便使用,开发者可能会创建非常多的 API 来满足不同场景的需要,如下所示:

// 基本调度器
func NewBaseSchedule() *Schedule {
	return &Schedule{
		WorkCount: 1,
		Fetcher:baseFetch,
	}
}
// 多worker调度器
func NewMultiWorkSchedule(workCount int) *Schedule {
	return &Schedule{
		WorkCount: workCount,
		Fetcher:baseFetch,
	}
}

// 代理调度器
func NewProxySchedule(proxy string) *Schedule {
	return &Schedule{
		WorkCount: 1,
		Fetcher:proxyFetch(proxy),
	}
}

随着参数的不断增多,这种API会变得越来越多,这就增加了开发者的心理负担。

另一种使用方式就是传递一个统一的Config配置结构,如下所示。这种方式只需要创建单个API,但是需要在内部对所有的变量进行判断,繁琐且不优雅。对于使用者来说,也很难确定自己需要使用哪一个字段。

type Config struct {
	WorkCount int
	Fetcher   collect.Fetcher
	Logger    *zap.Logger
	Seeds     []*collect.Request
}

func NewSchedule(c *Config) *Schedule {
	var s = &Schedule{}
	if c.Seeds != nil {
		s.Seeds = c.Seeds
	}
	if c.Fetcher != nil {
		s.Fetcher = c.Fetcher
	}

	if c.Logger != nil {
		s.Logger = c.Logger
	}
	...
	return s
}

那么有没有方法可以更加优雅地处理这种多参数配置问题呢?

Rob Pike 在2014年的一篇博客中提到了一种优雅的处理方法叫做函数式选项模式(Functional Options)。这种模式展示了闭包函数的有趣用途,目前在很多开源库中都能看到它的身影,我们项目中使用的日志库 Zap 也使用了这种模式。下面我以上面schedule的配置为例来说明函数式选项模式(完整代码请查看tag v0.1.5)。

第一步,我们要对schedule结构进行改造,把可以配置的参数放入到options 结构中:

type Schedule struct {
	requestCh chan *collect.Request
	workerCh  chan *collect.Request
	out       chan collect.ParseResult
	options
}

type options struct {
	WorkCount int
	Fetcher   collect.Fetcher
	Logger    *zap.Logger
	Seeds     []*collect.Request
}

第二步,我们需要书写一系列的闭包函数,这些函数的返回值是一个参数为 options 的函数:


type Option func(opts *options)

func WithLogger(logger *zap.Logger) Option {
	return func(opts *options) {
		opts.Logger = logger
	}
}
func WithFetcher(fetcher collect.Fetcher) Option {
	return func(opts *options) {
		opts.Fetcher = fetcher
	}
}

func WithWorkCount(workCount int) Option {
	return func(opts *options) {
		opts.WorkCount = workCount
	}
}

func WithSeeds(seed []*collect.Request) Option {
	return func(opts *options) {
		opts.Seeds = seed
	}
}

第三步,创建一个生成schedule的新函数,函数参数为Option的可变参数列表。defaultOptions为默认的Option,代表默认的参数列表,然后循环遍历可变函数参数列表并执行。

func NewSchedule(opts ...Option) *Schedule {
	options := defaultOptions
	for _, opt := range opts {
		opt(&options)
	}
	s := &Schedule{}
	s.options = options
	return s
}

第四步,在main函数中调用NewSchedule。让我们来看看函数式选项模式的效果:

func main(){
	s := engine.NewSchedule(
			engine.WithFetcher(f),
			engine.WithLogger(logger),
			engine.WithWorkCount(5),
			engine.WithSeeds(seeds),
		)
	s.Run()
}

从这个例子中,我们可以看到函数式选项模式的好处:

  • API 具有可扩展性,高度可配置化,新增参数不会破坏现有代码;
  • 参数列表非常简洁,并且可以使用默认的参数;
  • option函数使参数的含义非常清晰,易于开发者理解和使用;
  • 如果将 options 结构中的参数设置为小写,还可以限制这些参数的权限,防止这些参数在package 外部使用。

刚才,我们实战了 fan-in 和 fan-out 高并发模型,并深度使用了通道和select的机制。接下来让我们更进一步,看一下实现通道和select机制的原理,掌握这种高并发模型的底层图像。

通道的底层原理

通道的实现并没有想象中复杂。它利用互斥锁实现了并发安全,只不过Go运行时为我们屏蔽了底层的细节。通道包括两种类型,一种是无缓冲的通道,另一种是带缓冲区的通道。通道的结构如下:

图片

可以看到,通道中包含了数据的类型、大小、数量,堵塞协程队列,以及用于缓存区的诸多字段。

我们先来看看无缓冲区的通道是怎么实现的。

无缓冲区的通道

通道需要有多个协程分别完成读和写的功能,这样才能保证数据传输是顺畅的。对于无缓冲区的通道来说,如果有一个协程正在将数据写入通道,但是当前没有协程读取数据,那么写入协程将立即陷入到休眠状态。写入协程堵塞之前协程会被封装到sudog结构中,并存储到写入的堵塞队列sendq中,之后协程陷入休眠。

之前我们介绍过,协程的堵塞是位于用户态的,协程切换时,运行时会保存当前协程的状态、并调用 gopark 函数切换到g0完成新一轮的调度。如果之后有协程读取数据,那么读取协程会立即读取sendq队列中第一个等待的协程,并将该协程对应的元素拷贝到读取协程中,同时调用 goready 唤醒写入协程,将写入协程放入到可运行队列中等待被调度器调度。

图片

带缓冲区的通道

而对于带缓冲区的通道来说,假设缓存队列的数量为N,那么如果写入的数据量不大于N,写入协程就不会陷入到休眠状态,所有数据都会存储在缓冲队列中。

缓冲队列可以在一定程度上削峰填谷,加快处理速度。但是如果写入速度始终大于读取数据,那么缓冲区迟早也有写满的时候,到时候仍然会陷入堵塞,只是延迟了问题的暴露并带来内存的浪费。因此缓冲区的容量不可以过大,我们可以根据实际情况给出一个经验值。例如上面的爬虫案例中,我们就可以给接收任务的requestCh通道加上缓存区,先将缓存区设置为500,这样就不会频繁堵塞住调度器了。

对于有缓存的通道,存储在 buf 中的数据虽然是线性的数组,但是这些数组和序号recvx、recvq 模拟了一个环形队列。recvx 可以定位到 buf 是从哪个位置读取的通道中的元素,而 sendx 则能够找到写入时放入 buf 的位置,这样做主要是为了再次利用已经使用过的空间。从 recvx 到 sendx 的距离qcount就是通道队列中的元素数量。

图片

Select 机制的底层原理

在前面的实战案例中,我们还看到了大量channel与select结合使用的场景。

受到通道特性的限制,如果单个通道被堵塞,协程就无法继续执行了。那有没有一种机制可以像网络中的多路复用一样,监听多个通道,使后续处理协程能够及时地运行?

其实就和网络中把select用于Socket的多路复用机制一样,Go中也可以用select语句实现这样的多路复用机制。select语句中的每一个case都对应着一个待处理的读取或写入通道。举个简单实用的例子,下面的程序如果800毫秒之后也接受不到通道c中的数据,定时器time.After就会接收到数据,从而打印timeout。

select {
	case <-c:
		fmt.Println("random 01")
	case <-time.After(800 * time.Millisecond):
		fmt.Println("timeout")
	}

借助select可以实现许多有表现力的设计,那select是如何工作的呢?

Select底层调用了 selectgo 函数,它的工作可以分为三个部分:

  • 第一部分涉及到遍历。selectgo 首先循环查找当前准备就绪的通道,如果能够找到,则正常进行后续处理。在具体的实现方式上,由于 select内部的 scases 数组存储了所有需要管理的通道,所以很容易想到循环遍历 scases ,最终找到可用的通道。不过这可能导致一个问题,那就是如果前面的通道始终有数据,后面的通道将永远得不到执行的机会。为了解决这一问题,Go语言为select加入了随机性,会利用洗牌算法随机打散数组顺序,保证了所有通道都有执行的机会。
  • 第二部分涉及到协程的休眠。如果select找不到准备就绪的通道,这时和单个协程的堵塞一样,它会将当前协程放入到所有通道的等待队列中,并陷入到休眠状态。
  • 第三部分涉及到协程的唤醒。如果有任意一个通道准备就绪,当前的协程将会被唤醒,并到准备就绪的case处继续执行。 要注意的一点是,最后 selectgo 会将sudog结构体从其他通道的等待队列中移出,因为当前协程已经能够正常运行,不再需要被其他通道唤醒了。

总结

这节课,我们用fan-in、fan-out并发模式实战了爬虫的任务调度器。在实战中,我们频繁使用了通道与select结合的方式,还深入底层看到了通道与select的原理。最后我们还学习了函数选项模式在构建API时的优势。在后面的项目中,我们还会频繁地用到这种特性。

课后题

在我们的课程中,schedule函数其实有一个bug,您能看出来吗?你觉得可以用什么方式找出这样的Bug?

欢迎你在留言区与我交流讨论,我们下节课见!

精选留言

  • 周龙亭

    2023-01-25 22:29:13

    func (s *Schedule) Schedule() {
    var reqQueue = s.Seeds
    go func() {
    for {
    var req *collect.Request
    var ch chan *collect.Request

    if len(reqQueue) > 0 {
    req = reqQueue[0]
    ch = s.workerCh
    }
    select {
    case r := <-s.requestCh:
    reqQueue = append(reqQueue, r)

    case ch <- req:
    reqQueue = reqQueue[1:]
    }
    }
    }()
    }
  • shuff1e

    2022-12-15 08:54:16

    bug应该是,会丢失发给worker的任务。

    case r := <-s.requestCh:的情况下,如果req不是nil,应该把req再添加到reqQueue头部
    作者回复

    是的,后面有修复

    2023-02-12 23:02:12

  • Realm

    2022-12-13 21:10:31


    Seeds
    |
    | req ParseFunc(req) HandleResult()
    requestCh----> reqQueue -----> workerCh ----------> out-----------> result:
    ^ - item ==> 存储
    | - req |
    |---------------<----------------------<---------------------<------|
  • Geek_38ea75

    2024-03-07 19:59:23

    我有好几个问题
    1.为啥req和ch放在for循环内部声明,这样有个问题,就是work来不及执行的话,会丢失。
    2.如果requestCh中的任务很多的话,会导致work队列中没有能够运行的任务。
  • Alex

    2023-02-12 03:36:55

    有个小问题请教一下老师, 这个Seeds 是slice, 我觉得在取出任务的时候会有并发问题 如果没有请老师指教下
    作者回复

    目前只有初始化会写,运行时都是读,所以不会有并发问题。 随着课程的深入,运行时可以添加资源,到时会有并发冲突的考虑

    2023-02-12 22:11:17

  • 周龙亭

    2023-01-25 22:13:13

    修复下Schedule方法的bug:

    func (s *Schedule) schedule() {
    go func() {
    for r := range s.requestCh {
    s.reqQueueCond.L.Lock()
    s.reqQueue = append(s.reqQueue, r)
    s.reqQueueCond.Signal()
    s.reqQueueCond.L.Unlock()
    }
    }()

    go func() {
    for {
    s.reqQueueCond.L.Lock()
    for len(s.reqQueue) == 0 {
    s.reqQueueCond.Wait()
    }

    var movedReqQueue = s.reqQueue
    s.reqQueue = nil

    s.reqQueueCond.L.Unlock()

    for _, r := range movedReqQueue {
    s.workerCh <- r
    }
    }

    }()
    }
  • mantra

    2023-01-13 13:01:26

    本文档中的代码链接 tag v0.1.4 (之前的文档,也一样)对应的 URL 都是主库的地址(https://github.com/dreamerjackson/crawler),这是故意的吗?正确的应该是 https://github.com/dreamerjackson/crawler/archive/refs/tags/v0.1.4.tar.gz
    作者回复

    我所有的项目链接都放置的主库的地址,tag的地址就稍微点击一下吧

    2023-02-12 22:29:47

  • 7oty

    2022-12-20 22:56:11

    如何控制某个爬虫任务的启动,停止和任务的实时运行状态?
    作者回复

    后面文章会有介绍

    2023-02-12 23:04:07

  • 翡翠虎

    2022-12-20 03:58:56

    如果任务推送到worker,但又还没实施,这时候那台服务器停机了,或者进程退出了,任务会不会丢?怎么处理任务还没执行就丢了的这种情况呢?
    作者回复

    随着课程深入会看到,我们的任务是存储在etcd中的,所以不会丢失

    2023-02-12 23:09:34

  • Realm

    2022-12-14 11:00:03

    ```
    func (s *ScheduleEngine) Schedule() {
    var reqQueue = s.Seeds
    go func() {
    for {
    var req *collect.Request
    //var ch chan *collect.Request
    ch := make(chan *collect.Request)
    ```
    使用make创建ch,这样ch就不是nil了,即使reqQueue为空的时候,case ch <- req:就不是往nil通道中写数据了。
  • 老猫

    2022-12-13 23:28:06

    // s.requestCh = make(chan *collect.Request,100)
    // s.workerCh = make(chan *collect.Request,500)
    func (s *ScheduleEngine) Schedule() {
    var reqQueue = s.Seeds
    go func() {
    for _, req := range reqQueue {
    s.workerCh <- req
    }
    for {
    select {
    case r := <-s.requestCh:
    s.workerCh <- r
    }
    }
    }()
    }
  • 拾掇拾掇

    2022-12-13 22:17:15

    开启go run 的datarace参数吗?
  • Geek_a9ea01

    2022-12-13 17:45:51

    for { var req *collect.Request var ch chan *collect.Request if len(reqQueue) > 0 { req = reqQueue[0] reqQueue = reqQueue[1:] ch = s.workerCh } select { case r := <-s.requestCh: reqQueue = append(reqQueue, r) case ch <- req: } }
    有个问题:
    如果ch堵塞了 这时候又有requestCh请求上来;会不会导致ch数据丢失?
    作者回复

    这个是已知的问题,我在后面的课程中做了修复

    2023-02-12 23:47:59