Skip to content

《Go语言实战》入门实战系列 07:并发模式——生命周期控制、资源池与工作池

约 3024 字大约 10 分钟

《Go语言实战》入门实战系列云原生

2026-04-01

开篇引导

在第6章中,我们学习了Go语言并发的基本概念——goroutine、通道、竞争检测以及基本的同步手段。但在实际项目中,仅仅知道如何启动一个goroutine是远远不够的。我们需要的是可复用的并发模式,这些模式能帮助我们优雅地控制程序的生命周期、高效地管理共享资源、安全地处理大量并发任务。

本章将深入三个实用的并发模式,分别对应三个可复用的包:

  • runner:通过通道和select管理程序执行超时和中断信号,适合需要严格控制运行时间的后台任务。
  • pool:使用有缓冲通道构建资源池,复用数据库连接、内存缓冲区等昂贵资源。
  • work:基于无缓冲通道实现固定大小的goroutine工作池,确保任务不堆积、不丢失。

学完本篇,你将掌握如何将通道与select组合成实用的并发模式,并能够将这些模式直接应用到自己的项目中。

【本篇核心收获】

  • 掌握通过超时通道和中断信号控制程序生命周期的runner模式
  • 学会使用有缓冲通道实现资源池,管理可复用的io.Closer资源
  • 理解无缓冲通道如何构建工作池,确保任务提交即执行
  • 熟练运用带default分支的select实现非阻塞通道操作
  • 掌握sync.WaitGroup在多goroutine协作中的经典用法

1. runner——程序生命周期管理

runner包演示了如何利用通道来监视程序的执行时间,并在超时或收到操作系统中断信号时优雅地终止程序。这种模式特别适合cron作业、云任务等需要控制运行时长的后台程序。

1.1 Runner结构体与错误定义

type Runner struct {
    interrupt chan os.Signal      // 接收操作系统中断信号
    complete  chan error          // 报告任务完成
    timeout   <-chan time.Time    // 超时通道
    tasks     []func(int)         // 顺序执行的任务函数列表
}

var ErrTimeout = errors.New("received timeout")
var ErrInterrupt = errors.New("received interrupt")
  • interrupt:容量为1的通道,用于接收os.Interrupt信号。缓冲区大小为1保证信号不会丢失。
  • complete:无缓冲通道,用于接收任务执行返回的错误(nil表示成功)。
  • timeout:只读通道,由time.After返回,在指定时间后自动接收到一个time.Time值。
  • tasks:任务函数切片,每个函数接收一个int参数(任务ID),按顺序执行。

1.2 工厂函数与任务添加

func New(d time.Duration) *Runner {
    return &Runner{
        interrupt: make(chan os.Signal, 1),
        complete:  make(chan error),
        timeout:   time.After(d),
    }
}

func (r *Runner) Add(tasks ...func(int)) {
    r.tasks = append(r.tasks, tasks...)
}

New函数创建Runner实例,time.After(d)d时间后向timeout通道发送一个值。Add方法允许我们添加任意数量的任务。

1.3 核心逻辑:run与gotInterrupt

func (r *Runner) run() error {
    for id, task := range r.tasks {
        if r.gotInterrupt() {
            return ErrInterrupt
        }
        task(id)
    }
    return nil
}

func (r *Runner) gotInterrupt() bool {
    select {
    case <-r.interrupt:
        signal.Stop(r.interrupt)   // 停止接收后续中断
        return true
    default:
        return false
    }
}

run方法顺序执行所有任务,每次执行前通过gotInterrupt检查是否收到中断信号。gotInterrupt使用带defaultselect实现非阻塞检查:如果interrupt通道有值,则返回true并停止接收后续信号;否则返回false继续执行。

1.4 启动与事件选择

func (r *Runner) Start() error {
    signal.Notify(r.interrupt, os.Interrupt)   // 订阅中断信号
    go func() {
        r.complete <- r.run()
    }()

    select {
    case err := <-r.complete:
        return err
    case <-r.timeout:
        return ErrTimeout
    }
}

Start方法首先注册要接收的中断信号(这里只监听os.Interrupt,即Ctrl+C)。然后启动一个goroutine执行run,并将结果发送到complete通道。主goroutine在select中等待两个事件:

  • complete通道返回:任务正常完成或收到中断信号(返回对应的错误)。
  • timeout通道触发:超时,返回ErrTimeout

1.5 测试示例

const timeout = 3 * time.Second

func main() {
    r := runner.New(timeout)
    r.Add(createTask(), createTask(), createTask())

    if err := r.Start(); err != nil {
        switch err {
        case runner.ErrTimeout:
            log.Println("Terminating due to timeout.")
            os.Exit(1)
        case runner.ErrInterrupt:
            log.Println("Terminating due to interrupt.")
            os.Exit(2)
        }
    }
    log.Println("Process ended.")
}

func createTask() func(int) {
    return func(id int) {
        log.Printf("Processor - Task %d.", id)
        time.Sleep(time.Duration(id) * time.Second)
    }
}

这里创建了三个任务,分别休眠1、2、3秒。总耗时6秒,而超时时间只有3秒,因此会触发超时退出。如果在运行期间按下Ctrl+C,则会触发中断退出。

模块小结:runner模式通过两个通道(complete和timeout)与select语句,优雅地管理了程序的超时和中断。带default分支的select实现了非阻塞的中断检测。

2. pool——资源池

pool包演示了如何使用有缓冲通道来管理一组可复用的资源(如数据库连接、内存缓冲区)。资源池允许任意数量的goroutine安全地获取和归还资源,当池中没有空闲资源时,自动创建新的资源;当资源被归还时,如果池已满则直接关闭该资源。

2.1 资源池结构体

type Pool struct {
    m         sync.Mutex
    resources chan io.Closer
    factory   func() (io.Closer, error)
    closed    bool
}

var ErrPoolClosed = errors.New("Pool has been closed.")
  • m:互斥锁,保护closed标志和通道关闭操作。
  • resources:有缓冲通道,存储空闲资源(类型为io.Closer)。
  • factory:工厂函数,用于创建新资源。
  • closed:标识池是否已关闭。

2.2 创建池

func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("Size value too small.")
    }
    return &Pool{
        factory:   fn,
        resources: make(chan io.Closer, size),
    }, nil
}

New函数接收工厂函数和池大小,返回一个已初始化的池。池的大小决定了有缓冲通道的容量。

2.3 获取资源

func (p *Pool) Acquire() (io.Closer, error) {
    select {
    case r, ok := <-p.resources:
        if !ok {
            return nil, ErrPoolClosed
        }
        return r, nil
    default:
        return p.factory()
    }
}

Acquire使用带defaultselect:如果通道中有空闲资源,直接取出返回;否则(通道为空)执行default分支,调用工厂函数创建一个新资源返回。注意,当池已关闭时,通道会被关闭,此时从通道接收会得到零值且okfalse,返回ErrPoolClosed

2.4 释放资源

func (p *Pool) Release(r io.Closer) {
    p.m.Lock()
    defer p.m.Unlock()

    if p.closed {
        r.Close()
        return
    }

    select {
    case p.resources <- r:
        // 资源放回队列
    default:
        // 队列已满,关闭资源
        r.Close()
    }
}

Release先将资源放回通道,如果通道已满(即池中已有size个空闲资源),则直接关闭该资源。这里使用了互斥锁来保护对closed标志的读取,防止在关闭池的同时向已关闭的通道发送数据。

2.5 关闭池

func (p *Pool) Close() {
    p.m.Lock()
    defer p.m.Unlock()

    if p.closed {
        return
    }
    p.closed = true
    close(p.resources)   // 必须先关闭通道,再遍历关闭资源

    for r := range p.resources {
        r.Close()
    }
}

关闭池时,先加锁设置closed标志,然后关闭通道。关闭通道后,for range会取出通道中所有剩余资源并逐一关闭,确保所有资源都被清理。

2.6 测试示例

type dbConnection struct {
    ID int32
}

func (dbConn *dbConnection) Close() error {
    log.Println("Close: Connection", dbConn.ID)
    return nil
}

var idCounter int32

func createConnection() (io.Closer, error) {
    id := atomic.AddInt32(&idCounter, 1)
    log.Println("Create: New Connection", id)
    return &dbConnection{id}, nil
}

func main() {
    const maxGoroutines = 25
    const pooledResources = 2

    p, _ := pool.New(createConnection, pooledResources)

    var wg sync.WaitGroup
    wg.Add(maxGoroutines)

    for query := 0; query < maxGoroutines; query++ {
        go func(q int) {
            performQueries(q, p)
            wg.Done()
        }(query)
    }

    wg.Wait()
    p.Close()
}

func performQueries(query int, p *pool.Pool) {
    conn, err := p.Acquire()
    if err != nil {
        log.Println(err)
        return
    }
    defer p.Release(conn)

    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}

该示例创建了一个大小为2的资源池,然后启动25个goroutine并发获取资源。由于只有2个空闲资源,前两个goroutine会从池中获取已有资源,后续的goroutine会触发工厂函数创建新资源。当资源释放回池时,如果池未满则放入通道,否则直接关闭该资源。最终程序会显示所有资源被创建和关闭的过程。

模块小结:资源池模式通过有缓冲通道存储空闲资源,利用select的非阻塞特性快速创建新资源,同时用互斥锁保护关闭操作,实现了一个线程安全的资源复用管理器。

3. work——工作池

work包演示了如何使用无缓冲通道来创建固定大小的goroutine池,确保每个提交的任务都能被一个工作goroutine立即执行。与有缓冲通道不同,无缓冲通道在提交任务时会阻塞直到工作goroutine准备好接收,这保证了任务不会在队列中堆积,也不会丢失。

3.1 Worker接口与Pool结构

type Worker interface {
    Task()
}

type Pool struct {
    work chan Worker
    wg   sync.WaitGroup
}
  • Worker:只需实现一个Task方法,即可被提交到工作池执行。
  • Pool:包含一个无缓冲的work通道,以及一个WaitGroup用于等待所有工作goroutine退出。

3.2 创建工作池

func New(maxGoroutines int) *Pool {
    p := Pool{
        work: make(chan Worker),
    }
    p.wg.Add(maxGoroutines)
    for i := 0; i < maxGoroutines; i++ {
        go func() {
            for w := range p.work {
                w.Task()
            }
            p.wg.Done()
        }()
    }
    return &p
}

New函数创建指定数量(maxGoroutines)的工作goroutine。每个goroutine在for range循环中从work通道接收Worker任务并执行其Task方法。当通道关闭时,for range循环退出,goroutine调用Done递减WaitGroup。

3.3 提交与关闭

func (p *Pool) Run(w Worker) {
    p.work <- w
}

func (p *Pool) Shutdown() {
    close(p.work)
    p.wg.Wait()
}

Run方法将任务发送到无缓冲通道。由于通道无缓冲,发送会阻塞直到某个工作goroutine准备好接收。这保证了任务提交时即被处理。Shutdown先关闭通道,再等待所有工作goroutine退出。

3.4 测试示例

type namePrinter struct {
    name string
}

func (m *namePrinter) Task() {
    log.Println(m.name)
    time.Sleep(time.Second)
}

func main() {
    p := work.New(2)   // 创建两个工作goroutine

    var wg sync.WaitGroup
    wg.Add(100 * len(names))

    for i := 0; i < 100; i++ {
        for _, name := range names {
            np := namePrinter{name: name}
            go func() {
                p.Run(&np)   // 提交任务
                wg.Done()
            }()
        }
    }

    wg.Wait()
    p.Shutdown()
}

这里创建了2个工作goroutine,并提交了500个任务(5个名字×100次)。由于Run是同步的(阻塞直到工作goroutine开始处理),每个提交任务的goroutine会等待对应的任务被执行后才继续,从而控制了任务提交的速率。最终所有任务都会被两个工作goroutine处理完毕。

模块小结:工作池模式利用无缓冲通道的特性,实现了“提交即执行”的语义,避免了任务堆积。固定数量的工作goroutine可以控制并发度,适合CPU密集型任务或需要限制并发资源使用的场景。

4. 本篇核心知识点速记

  • 生命周期管理(runner):通过time.After创建超时通道,通过signal.Notify接收中断信号,使用select同时等待任务完成、超时或中断,实现可控制的程序退出。
  • 资源池(pool):有缓冲通道作为资源容器,selectdefault分支实现非阻塞获取/归还,互斥锁保护关闭标志,确保资源复用安全。
  • 工作池(work):无缓冲通道确保任务提交即执行,固定数量的工作goroutine控制并发度,for range接收任务,关闭通道优雅停止。
  • select与defaultdefault分支让通道操作变成非阻塞,常用于尝试获取资源或检查信号。
  • sync.WaitGroup:在三种模式中都用于等待一组goroutine完成,保证主程序在资源释放或关闭前不退出。

文末小结

本章我们学习了三种经典的并发模式:runner用于控制程序生命周期,pool用于管理可复用的资源,work用于固定大小的任务处理池。这三种模式分别对应了实际开发中常见的需求:后台任务超时控制、连接池管理、并发任务调度。通过通道与select的组合,我们可以用简洁、安全的方式实现复杂的并发逻辑,而无需直接使用锁和条件变量。