《Go语言实战》入门实战系列 07:并发模式——生命周期控制、资源池与工作池
开篇引导
在第6章中,我们学习了Go语言并发的基本概念——goroutine、通道、竞争检测以及基本的同步手段。但在实际项目中,仅仅知道如何启动一个goroutine是远远不够的。我们需要的是可复用的并发模式,这些模式能帮助我们优雅地控制程序的生命周期、高效地管理共享资源、安全地处理大量并发任务。
本章将深入三个实用的并发模式,分别对应三个可复用的包:
- runner:通过通道和
select管理程序执行超时和中断信号,适合需要严格控制运行时间的后台任务。 - pool:使用有缓冲通道构建资源池,复用数据库连接、内存缓冲区等昂贵资源。
- work:基于无缓冲通道实现固定大小的goroutine工作池,确保任务不堆积、不丢失。
学完本篇,你将掌握如何将通道与select组合成实用的并发模式,并能够将这些模式直接应用到自己的项目中。
【本篇核心收获】
- 掌握通过超时通道和中断信号控制程序生命周期的runner模式
- 学会使用有缓冲通道实现资源池,管理可复用的io.Closer资源
- 理解无缓冲通道如何构建工作池,确保任务提交即执行
- 熟练运用带
default分支的select实现非阻塞通道操作 - 掌握
sync.WaitGroup在多goroutine协作中的经典用法
1. runner——程序生命周期管理
runner包演示了如何利用通道来监视程序的执行时间,并在超时或收到操作系统中断信号时优雅地终止程序。这种模式特别适合cron作业、云任务等需要控制运行时长的后台程序。
1.1 Runner结构体与错误定义
type Runner struct {
interrupt chan os.Signal // 接收操作系统中断信号
complete chan error // 报告任务完成
timeout <-chan time.Time // 超时通道
tasks []func(int) // 顺序执行的任务函数列表
}
var ErrTimeout = errors.New("received timeout")
var ErrInterrupt = errors.New("received interrupt")- interrupt:容量为1的通道,用于接收
os.Interrupt信号。缓冲区大小为1保证信号不会丢失。 - complete:无缓冲通道,用于接收任务执行返回的错误(nil表示成功)。
- timeout:只读通道,由
time.After返回,在指定时间后自动接收到一个time.Time值。 - tasks:任务函数切片,每个函数接收一个
int参数(任务ID),按顺序执行。
1.2 工厂函数与任务添加
func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
}
}
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}New函数创建Runner实例,time.After(d)在d时间后向timeout通道发送一个值。Add方法允许我们添加任意数量的任务。
1.3 核心逻辑:run与gotInterrupt
func (r *Runner) run() error {
for id, task := range r.tasks {
if r.gotInterrupt() {
return ErrInterrupt
}
task(id)
}
return nil
}
func (r *Runner) gotInterrupt() bool {
select {
case <-r.interrupt:
signal.Stop(r.interrupt) // 停止接收后续中断
return true
default:
return false
}
}run方法顺序执行所有任务,每次执行前通过gotInterrupt检查是否收到中断信号。gotInterrupt使用带default的select实现非阻塞检查:如果interrupt通道有值,则返回true并停止接收后续信号;否则返回false继续执行。
1.4 启动与事件选择
func (r *Runner) Start() error {
signal.Notify(r.interrupt, os.Interrupt) // 订阅中断信号
go func() {
r.complete <- r.run()
}()
select {
case err := <-r.complete:
return err
case <-r.timeout:
return ErrTimeout
}
}Start方法首先注册要接收的中断信号(这里只监听os.Interrupt,即Ctrl+C)。然后启动一个goroutine执行run,并将结果发送到complete通道。主goroutine在select中等待两个事件:
complete通道返回:任务正常完成或收到中断信号(返回对应的错误)。timeout通道触发:超时,返回ErrTimeout。
1.5 测试示例
const timeout = 3 * time.Second
func main() {
r := runner.New(timeout)
r.Add(createTask(), createTask(), createTask())
if err := r.Start(); err != nil {
switch err {
case runner.ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case runner.ErrInterrupt:
log.Println("Terminating due to interrupt.")
os.Exit(2)
}
}
log.Println("Process ended.")
}
func createTask() func(int) {
return func(id int) {
log.Printf("Processor - Task %d.", id)
time.Sleep(time.Duration(id) * time.Second)
}
}这里创建了三个任务,分别休眠1、2、3秒。总耗时6秒,而超时时间只有3秒,因此会触发超时退出。如果在运行期间按下Ctrl+C,则会触发中断退出。
模块小结:runner模式通过两个通道(complete和timeout)与
select语句,优雅地管理了程序的超时和中断。带default分支的select实现了非阻塞的中断检测。
2. pool——资源池
pool包演示了如何使用有缓冲通道来管理一组可复用的资源(如数据库连接、内存缓冲区)。资源池允许任意数量的goroutine安全地获取和归还资源,当池中没有空闲资源时,自动创建新的资源;当资源被归还时,如果池已满则直接关闭该资源。
2.1 资源池结构体
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
var ErrPoolClosed = errors.New("Pool has been closed.")- m:互斥锁,保护
closed标志和通道关闭操作。 - resources:有缓冲通道,存储空闲资源(类型为
io.Closer)。 - factory:工厂函数,用于创建新资源。
- closed:标识池是否已关闭。
2.2 创建池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("Size value too small.")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}New函数接收工厂函数和池大小,返回一个已初始化的池。池的大小决定了有缓冲通道的容量。
2.3 获取资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
case r, ok := <-p.resources:
if !ok {
return nil, ErrPoolClosed
}
return r, nil
default:
return p.factory()
}
}Acquire使用带default的select:如果通道中有空闲资源,直接取出返回;否则(通道为空)执行default分支,调用工厂函数创建一个新资源返回。注意,当池已关闭时,通道会被关闭,此时从通道接收会得到零值且ok为false,返回ErrPoolClosed。
2.4 释放资源
func (p *Pool) Release(r io.Closer) {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
r.Close()
return
}
select {
case p.resources <- r:
// 资源放回队列
default:
// 队列已满,关闭资源
r.Close()
}
}Release先将资源放回通道,如果通道已满(即池中已有size个空闲资源),则直接关闭该资源。这里使用了互斥锁来保护对closed标志的读取,防止在关闭池的同时向已关闭的通道发送数据。
2.5 关闭池
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
close(p.resources) // 必须先关闭通道,再遍历关闭资源
for r := range p.resources {
r.Close()
}
}关闭池时,先加锁设置closed标志,然后关闭通道。关闭通道后,for range会取出通道中所有剩余资源并逐一关闭,确保所有资源都被清理。
2.6 测试示例
type dbConnection struct {
ID int32
}
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
var idCounter int32
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
func main() {
const maxGoroutines = 25
const pooledResources = 2
p, _ := pool.New(createConnection, pooledResources)
var wg sync.WaitGroup
wg.Add(maxGoroutines)
for query := 0; query < maxGoroutines; query++ {
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
wg.Wait()
p.Close()
}
func performQueries(query int, p *pool.Pool) {
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
defer p.Release(conn)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}该示例创建了一个大小为2的资源池,然后启动25个goroutine并发获取资源。由于只有2个空闲资源,前两个goroutine会从池中获取已有资源,后续的goroutine会触发工厂函数创建新资源。当资源释放回池时,如果池未满则放入通道,否则直接关闭该资源。最终程序会显示所有资源被创建和关闭的过程。
模块小结:资源池模式通过有缓冲通道存储空闲资源,利用
select的非阻塞特性快速创建新资源,同时用互斥锁保护关闭操作,实现了一个线程安全的资源复用管理器。
3. work——工作池
work包演示了如何使用无缓冲通道来创建固定大小的goroutine池,确保每个提交的任务都能被一个工作goroutine立即执行。与有缓冲通道不同,无缓冲通道在提交任务时会阻塞直到工作goroutine准备好接收,这保证了任务不会在队列中堆积,也不会丢失。
3.1 Worker接口与Pool结构
type Worker interface {
Task()
}
type Pool struct {
work chan Worker
wg sync.WaitGroup
}- Worker:只需实现一个
Task方法,即可被提交到工作池执行。 - Pool:包含一个无缓冲的
work通道,以及一个WaitGroup用于等待所有工作goroutine退出。
3.2 创建工作池
func New(maxGoroutines int) *Pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
}
return &p
}New函数创建指定数量(maxGoroutines)的工作goroutine。每个goroutine在for range循环中从work通道接收Worker任务并执行其Task方法。当通道关闭时,for range循环退出,goroutine调用Done递减WaitGroup。
3.3 提交与关闭
func (p *Pool) Run(w Worker) {
p.work <- w
}
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}Run方法将任务发送到无缓冲通道。由于通道无缓冲,发送会阻塞直到某个工作goroutine准备好接收。这保证了任务提交时即被处理。Shutdown先关闭通道,再等待所有工作goroutine退出。
3.4 测试示例
type namePrinter struct {
name string
}
func (m *namePrinter) Task() {
log.Println(m.name)
time.Sleep(time.Second)
}
func main() {
p := work.New(2) // 创建两个工作goroutine
var wg sync.WaitGroup
wg.Add(100 * len(names))
for i := 0; i < 100; i++ {
for _, name := range names {
np := namePrinter{name: name}
go func() {
p.Run(&np) // 提交任务
wg.Done()
}()
}
}
wg.Wait()
p.Shutdown()
}这里创建了2个工作goroutine,并提交了500个任务(5个名字×100次)。由于Run是同步的(阻塞直到工作goroutine开始处理),每个提交任务的goroutine会等待对应的任务被执行后才继续,从而控制了任务提交的速率。最终所有任务都会被两个工作goroutine处理完毕。
模块小结:工作池模式利用无缓冲通道的特性,实现了“提交即执行”的语义,避免了任务堆积。固定数量的工作goroutine可以控制并发度,适合CPU密集型任务或需要限制并发资源使用的场景。
4. 本篇核心知识点速记
- 生命周期管理(runner):通过
time.After创建超时通道,通过signal.Notify接收中断信号,使用select同时等待任务完成、超时或中断,实现可控制的程序退出。 - 资源池(pool):有缓冲通道作为资源容器,
select的default分支实现非阻塞获取/归还,互斥锁保护关闭标志,确保资源复用安全。 - 工作池(work):无缓冲通道确保任务提交即执行,固定数量的工作goroutine控制并发度,
for range接收任务,关闭通道优雅停止。 - select与default:
default分支让通道操作变成非阻塞,常用于尝试获取资源或检查信号。 - sync.WaitGroup:在三种模式中都用于等待一组goroutine完成,保证主程序在资源释放或关闭前不退出。
文末小结
本章我们学习了三种经典的并发模式:runner用于控制程序生命周期,pool用于管理可复用的资源,work用于固定大小的任务处理池。这三种模式分别对应了实际开发中常见的需求:后台任务超时控制、连接池管理、并发任务调度。通过通道与select的组合,我们可以用简洁、安全的方式实现复杂的并发逻辑,而无需直接使用锁和条件变量。
