Skip to content
<

更友好的并发库-conc

要在 Go 语言中实现并发太容易了,对于初学者来说也容易掉入并发陷阱。社区中经验老道的 Gopher 为我们封装了一个并发工具包——conc,使用它可以轻松应对绝大多数并发场景。

conc 是由 sourcegraph 开源的一套有好的结构化并发工具包,其中总结了 sourcegraph 内部在编写并发代码时反复遇到的问题的解决方案。

conc.WaitGroup

conc 库中的WaitGroup是作用于并发的主要构建块。调用其Go方法能够生成 goroutine,调用其Wait方法可以确保在生成的 goroutine 都退出后再继续执行。这一些特性和标准库中的sync.WaitGroup一样。区别的地方在于conc.WaitGroup中子 goroutine 中的 panic 会被传递给Wait方法的调用方,这也正是我们需要conc.WaitGroup的原因,要不然我们就需要自己去 recover 子 goroutine 中的 panic。

conc.WaitGroup的使用非常简单,可以参考以下代码示例。

go
// waitGroupDemo conc.WaitGroup 并发示例
func waitGroupDemo() {
    var count atomic.Int64

    var wg.conc.WaitGroup
    // 开启 10 个 goroutine 并发执行 count.Add(1)
    for i := 0; i < 10; i++ {
        wg.Go(func() {
            count.Add(1)
        })
    }
    // 等待 10 个 goroutine 都执行完
    wg.Wait()

    fmt.Println(count.Load())
}

如果想要自动 recover 子 goroutine 可能传递出的 panic,可以使用WaitAndRecover方法。示例代码如下。

go
// waitGroupDemo 自动 recover 示例
func waitGroupDemo2() {
    var count atomic.Int64

    var wg conc.WaitGroup
    // 开启 10 个 goroutine 并发执行 count.Add(1)
    for i := 0; i < 10; i++ {
        wg.Go(func() {
            if i == 7 {
                panic("bad thing")
            }
            count.Add(1)
        })
    }
    // 等待 10 个 goroutine 都执行完
    wg.WaitAndRecover()

    fmt.Println(count.Load())
}

扩展:为什么要使用作用域并发?

有一种观点认为:Go 语言通过go关键字随时随地创建 goroutine 发起并发就像在代码中使用goto一样,让程序显得混乱和令人困惑。在并发代码中无论是出现 panic 还是出现其他错误,为了避免引发更大问题,通常的解决方式是将一场或者堆栈信息传递给调用方,这就需要有堆栈信息和明确的“调用者”为前提,所以我们应该使用结构化的并发编程——把 goroutine 放到托儿所里,而不是让他们像野孩子一样到处乱跑。

推荐阅读👉Notes on structured concurrency, or: Go statement considered harmful

goroutine 池

Pool是一个用于并发执行任务的 goroutines 池。conc 包中针对不同的业务场景定义了以下几种 goroutine 池。

pool.Pool

使用New()创建一个池对象,然后通过调用Go()提交要执行的任务。提交完所有任务后,必须调用Wait()来清理任何派生的 goroutines 并传播可能出现的 panic。

Pool中的 goroutine 是延迟启动的(用到的时候再启动),所以创建一个新的Pool是廉价的。产生的 goroutine 永远不会比提交的任务多。

创建Pool是高效的,但也不是零成本。它不适用于耗时非常短的任务。启动和拆卸的开销约为 1µs,每个任务的开销约 300ns。

对于创建得到的池对象,可以使用With系列函数进行配置。其中最常用的便是通过WithMaxGoroutines()指定池中最大 goroutine 数量。

例如下面的示例中使用WithMaxGoroutines(3)配置最大 goroutine 数量为 3。

go
// poolDemo goroutine 池示例
func poolDemo() {
    // 创建一个最大数量为 3 的 goroutine 池
    p := pool.New().WithMaxGoroutines(3)
    // 使用 p.Go() 提交 5 个任务
    for i := 0; i < 5; i++ {
        p.Go(func() {
            fmt.Println("Q1mi")
        })
    }
    p.Wait()
}
:对Pool使用Go()提交任务后不允许再调用With系列方法进行配置。

pool.ContextPool

使用WithContext可以创建一个传递 Context 的Pool,通过这个父 Context 来控制池中的 goroutine。默认情况下,在取消父 Context 之前,Pool中的 Context 不会取消。

想要在任何任务返回错误或出现 panic 时立即取消其 Context,可以通过WithCancelOnError进行配置。

go
// poolWithContextDemoCancelOnError 支持 context 的池
// goroutine 中出错时取消 context
func poolWithContextDemoCancelOnError() {
	p := pool.New().
		WithMaxGoroutines(4).
		WithContext(context.Background()).
		WithCancelOnError() // 出错时取消所有 goroutine
	// 提交 3 个任务
	for i := 0; i < 3; i++ {
		i := i
		p.Go(func(ctx context.Context) error {
			if i == 2 {
				return errors.New("cancel all other tasks")
			}
			<-ctx.Done()
			return nil
		})
	}
	err := p.Wait()
	fmt.Println(err)
}

pool.WithErrors

当提交的任务有可能返回错误时,可以使用WithErrors得到一个ErrorPool,并通过Wait()获取可能返回的错误。

go
func poolWithError() {
	p := pool.New().WithErrors()
	for i := 0; i < 3; i++ {
		i := i
		p.Go(func() error {
			if i == 2 {
				return errors.New("oh no!")
			}
			return nil
		})
	}
	err := p.Wait()
	fmt.Println(err)
}

pool.ResultPool

ResultPool是一个执行返回泛型结果的任务池。使用Go()在池中执行任务,然后由Wait()返回任务的结果。

go
// poolWithResult 执行返回结果的任务池
func poolWithResult() {
	// 创建一个任务池,其中任务返回的结果为 int
	p := pool.NewWithResults[int]()
	for i := 0; i < 10; i++ {
		i := i
		p.Go(func() int {
			return i * 2
		})
	}
	res := p.Wait()
	// 结果的顺序是不确定的, 所以这里先排序再打印
	sort.Ints(res)
	fmt.Println(res)
}

pool.ResultContextPool

ResultContextPool中执行的任务接受一个 context 参数并返回结果。

pool.ResultErrorPool

ResultErrorPool中执行的任务会返回一个泛型结果和错误。

Stream

Pool中并发执行任务后返回结果是无序的,conc 中有一个 stream 包提供任务并发、结果有序的实现,适用于在维护结果顺序的同时并发执行任务流。

使用Stream时提交的每个人物都返回一个回调函数。每个任务都将在任务池中同时执行,但是回调函数将按照任务提交的顺序依次执行。

等到所有任务都提交后,必须调用Wait()来等待正在运行的 goroutine 运行完。当任务执行过程中或回调函数执行期间出现 panic 时,所有其他任务和回调仍将执行。当调用Wait()时,panic 将传给调用方。

Pool一样,Stream也不适用于非常短的任务。启动和拆卸会增加几微秒的开销,每个任务的开销大约是 500ns。对于任何需要网络通话的任务来说,这性能都足够好了。

go
// streamDemo 并发的流式任务示例
func streamDemo() {
	times := []int{20, 52, 16, 45, 4, 80}

	s := stream.New()
	for _, millis := range times {
		dur := time.Duration(millis) * time.Millisecond
		// 提交任务
		s.Go(func() stream.Callback {
			time.Sleep(dur)
			// 虽然上一行通过 sleep 增加了时间
			// 但最终结果仍按任务提交(s.Go)的顺序打印
			return func() { fmt.Println(dur) }
		})
	}
	s.Wait()
}

输出结果:

20ms
52ms
16ms
45ms
4ms
80ms

iter

conc 包还提供了一个 iter 包中提供了基于泛型实现的iteratormapper,封装了以下四种日常开发中常用的迭代方法。

go
func ForEach[T any](input []T, f func(*T))
func ForEachIdx[T any](input []T, f func(int, *T))
func Map[T, R any](input []T, f func(*T) R) []R
func MapErr[T, R any](input []T, f func(*T) (R, error)) ([]R, error)

初始状态下可直接调用iter包的上述函数。

go
// iterDemo 迭代器ForEach示例
func iterDemo() {
	input := []int{1, 2, 3, 4}
	// 可直接调用 iter 包的 ForEach 函数
	iter.ForEach(input, func(v *int) {
		if *v%2 != 0 {
			*v = -1
		}
	})
	fmt.Println(input)
}

如果想要更定制化的功能,可以自行构造iteratormapper

iterator 示例

iterator适用于对序列中的每个元素执行统一操作的场景。

go
// iteratorDemo 迭代器示例
func iteratorDemo() {
	input := []int{1, 2, 3, 4}
	// 创建一个最大 goroutine 个数为输入元素一半的迭代器
	iterator := iter.Iterator[int]{
		MaxGoroutines: len(input) / 2,
	}

	iterator.ForEach(input, func(v *int) {
		if *v%2 != 0 {
			*v = -1
		}
	})

	fmt.Println(input)
}

输出:

[-1 2 -1 4]

mapper 示例

mapper是一个带有结果类型的迭代器,适用于遍历序列中的每个元素执行统一操作后拿到返回结果的场景。

go
// mapperDemo mapper 示例
func mapperDemo() {
    input := []int{1, 2, 3, 4}
    // 创建一个最大 goroutine 个数为输入元素一半的映射器
    mapper := iter.Mapper[int, bool]{
        MaxGoroutines: len(input) / 2,
    }

    results := mapper.Map(input, func(v *int) bool {
        return *v%2 == 0
    })
    fmt.Println(results)
}

输出:

[false true false true]

panics

conc 下提供了一个处理 panic 相关的 panics 包。其中panics.Catcher是用来捕获任务运行时可能出现的 panic。具体使用方法是通过Try()执行任务函数,该函数将捕获任何产生的 panic。Try()可以在任意 goroutine 中被调用任意次数。一旦所有调用完成后,使用Recovered()获取第一个 panic(如果有的话)的值,或者使用Repanic()传递 panic(重新 panic)。

go
// panicDemo recover 可能出现的异常
func panicDemo() {
	var pc panics.Catcher
	i := 0
	pc.Try(func() { i += 1 })
	pc.Try(func() { panic("abort!") })
	pc.Try(func() { i += 1 })

	// recover 可能出现的 panic
	rc := pc.Recovered()
	// 重新 panic
	// pc.Repanic()

	fmt.Println(i)
	fmt.Println(rc.Value.(string))
}

总结

对于不太熟悉并发编程的初学者来说 conc 提供了一套简单易用的并发工具,同时其代码实现也比较简洁,所以很适合用来学习源码。