Golang errgroup设计及实现原理是什么(errgroup,golang,开发技术)

时间:2024-05-06 13:50:13 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

    Golang%C2%A0errgroup%E8%AE%BE%E8%AE%A1%E5%8F%8A%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86%E6%98%AF%E4%BB%80%E4%B9%88

接下来,请跟着小编一起来学习吧!

errgroup 定义在 golang.org/x/sync/errgroup,承载核心能力的结构体是 Group。

Group 就是对我们上面提到的一堆子任务执行计划的抽象。每一个子任务都会有自己对应的 goroutine 来执行。

通过这个结构我们也可以看出来,errgroup 底层实现多个 goroutine 调度,等待的能力还是基于 sync.WaitGroup。

我们可以调用 errgroup.WithContext 函数来创建一个 Group。

这里可以看到,Group 的 cancel 函数本质就是为了支持 context 的 cancel 能力,初始化的 Group 只有一个 cancel 属性,其他都是默认的。一旦有一个子任务返回错误,或者是 Wait 调用返回,这个新 Context 就会被 cancel。

本质上和 WaitGroup 的 Wait 方法语义一样,既然我们是个 group task,就需要等待所有任务都执行完,这个语义就由 Wait 方法提供。如果有多个子任务返回错误,它只会返回第一个出现的错误,如果所有的子任务都执行成功,就返回 nil。

Wait 的实现非常简单。一个前置的 WaitGroup Wait,结束后只做了两件事:

如果对于公共的 Context 有 cancel 函数,就将其 cancel,因为事情办完了;

返回公共的 Group 结构中的 err 给调用方。

Group 的核心能力就在于能够并发执行多个子任务,从调用者的角度,我们只需要传入要执行的函数,签名为:func() error即可,非常通用。如果任务执行成功,就返回 nil,否则就返回 error,并且会 cancel 那个新的 Context。底层的调度逻辑由 Group 的 Go 方法实现:

我们重点来分析下 Go 这里发生了什么。

WaitGroup 加 1 用作计数;

启动一个新的 goroutine 执行调用方传入的 f() 函数;

若 err 为 nil 说明执行正常;

若 err 不为 nil,说明执行出错,此时将这个返回的 err 赋值给全局 Group 的变量 err,并将 context cancel 掉。注意,这里的处理在 once 分支中,也就是只有第一个来的错误会被处理。

在 defer 语句中调用 Group 的 done 方法,底层依赖 WaitGroup 的 Done,说明这一个子任务结束。

这里也可以看到,其实所谓 errgroup,我们并不是将所有子任务的 error 拼成一个字符串返回,而是直接在 Go 方法那里将第一个错误返回,底层依赖了 once 的能力。

其实看到这里,你有没有觉得 errgroup 的功能有点鸡肋?底层核心技术都是靠 WaitGroup 完成的,自己只不过是起了个 goroutine 执行方法,err 还只能保留一个。而且 Group 里面的 sem 那个 chan 是用来干什么呢?

这一节我们就来看看,Golang 对 errgroup 能力的一次扩充。

到目前为止,errgroup 是可以做到一开始人们对它的期望的,即并发执行子任务。但问题在于,这里是每一个子任务都开了个goroutine,如果是在高并发的环境里,频繁创建大量goroutine 这样的用法很容易对资源负载产生影响。开发者们提出,希望有办法限制 errgroup 创建的 goroutine 数量,参照这个 proposal: #27837

SetLimit 的参数 n 就是我们对这个 Group 期望的最大 goroutine 数量,这里其实除去校验逻辑,只干了一件事:g.sem = make(chan token, n),即创建一个长度为 n 的 channel,赋值给 sem。

回忆一下我们在 Go 方法 defer 调用 done 中的表现,是不是清晰了很多?我们来理一下:

首先,Group 结构体就包含了 sem 变量,只作为通信,元素是空结构体,不包含实际意义:

如果你对整个 Group 的 Limit 没有要求,which is fine,你就直接忽略这个 SetLimit,errgroup 的原有能力就可以支持你的诉求。

但是,如果你希望保持 errgroup 的 goroutine 在一个上限之内,请在调用 Go 方法前,先 SetLimit,这样 Group 的 sem 就赋值为一个长度为 n 的 channel。

那么,当你调用 Go 方法时,注意下面的框架代码,此时 g.sem 不为 nil,所以我们会塞一个 token 进去,作为占坑,语义上表示我申请一个 goroutine 用。

当然,如果此时 goroutine 数量已经达到上限,这里就会 block 住,直到别的 goroutine 干完活,sem 输出了一个 token之后,才能继续往里面塞。

在每个 goroutine 执行完毕后 defer 的 g.done 方法,则是完成了这一点:

这样就把 sem 的用法串起来了。我们通过创建一个定长的channel来实现对于 goroutine 数量的管控,对于channel实际包含的元素并不关心,所以用一个空结构体省一省空间,这是非常优秀的设计,大家平常也可以参考。

TryGo 和 SetLimit 这套体系其实都是不久前欧长坤大佬提交到 errgroup 的能力。

一如既往,所有带 TryXXX的函数,都不会阻塞。 其实办的事情非常简单,和 Go 方法一样,传进来一个 func() error来执行。

Go 方法的区别在于,如果判断 limit 已经不够了,此时不再阻塞,而是直接 return false,代表执行失败。其他的部分完全一样。

这里我们先看一个最常见的用法,针对一组任务,每一个都单独起 goroutine 执行,最后获取 Wait 返回的 err 作为整个 Group 的 err。

你会发现,最后 err 打印出来就是第二个子任务的 err。

当然,上面这个 case 是我们正好只有一个报错,但是,如果实际有多个任务都挂了呢?

从完备性来考虑,有没有什么办法能够将多个任务的错误都返回呢?

这一点其实 errgroup 库并没有提供非常好的支持,需要开发者自行做一些改造。因为 Group 中只有一个 err 变量,我们不可能基于 Group 来实现这一点。

通常情况下,我们会创建一个 slice 来存储 f() 执行的 err。

可以看到,我们声明了一个 result slice,长度为 3。这里不用担心并发问题,因为每个 goroutine 读写的位置是确定唯一的。

本质上可以理解为,我们把 f() 返回的 err 不仅仅给了 Group 一份,还自己存了一份,当出错的时候,Wait 返回的错误我们不一定真的用,而是直接用自己错的这一个 error slice。

Go 官方文档中的利用 errgroup 实现 pipeline 的示例也很有参考意义:由一个子任务遍历文件夹下的文件,然后把遍历出的文件交给 20 个 goroutine,让这些 goroutine 并行计算文件的 md5。

这里贴出来简化代码学习一下.

本文:Golang errgroup设计及实现原理是什么的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:Django怎么使用Redis进行缓存下一篇:

11 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18