项目结构:
/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 17:56 # User : geovindu # Product : GoLand # Project : godesginpattern # File : logger.go */ package utils import ( "godesginpattern/workerpool/config" "io" "log" "os" ) var Logger *log.Logger func InitLogger() { // 打开日志文件 file, err := os.OpenFile(config.LogFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Fatal("日志文件创建失败:", err) } // 同时输出到:控制台 + 文件 multiWriter := io.MultiWriter(os.Stdout, file) // 初始化日志 Logger = log.New(multiWriter, "", config.LogFlag) } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:12 # User : geovindu # Product : GoLand # Project : godesginpattern # File : retry.go */ package utils import ( "godesginpattern/workerpool/config" "time" ) // Retry 执行函数并自动重试 func Retry(task func() error) error { var err error for i := 0; i < config.MaxRetryTimes; i++ { err = task() if err == nil { return nil } Logger.Printf("任务失败,第 %d 次重试,错误: %v", i+1, err) time.Sleep(500 * time.Millisecond) } return err } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:12 # User : geovindu # Product : GoLand # Project : godesginpattern # File : monitor.go */ package utils import "sync/atomic" type Monitor struct { Running int64 Waiting int64 Finished int64 Failed int64 } func (m *Monitor) IncRunning() { atomic.AddInt64(&m.Running, 1) } func (m *Monitor) DecRunning() { atomic.AddInt64(&m.Running, -1) } func (m *Monitor) IncWaiting() { atomic.AddInt64(&m.Waiting, 1) } func (m *Monitor) DecWaiting() { atomic.AddInt64(&m.Waiting, -1) } func (m *Monitor) IncFinished() { atomic.AddInt64(&m.Finished, 1) } func (m *Monitor) IncFailed() { atomic.AddInt64(&m.Failed, 1) } func (m *Monitor) Get() (running, waiting, finished, failed int64) { return atomic.LoadInt64(&m.Running), atomic.LoadInt64(&m.Waiting), atomic.LoadInt64(&m.Finished), atomic.LoadInt64(&m.Failed) } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 17:58 # User : geovindu # Product : GoLand # Project : godesginpattern # File : settings.go */ package config import "log" const ( WorkerCount = 3 QueueMaxSize = 100 MinTaskDelay = 0.3 MaxTaskDelay = 0.8 MaxRetryTimes = 3 LogFileName = "jewelry.log" DBPath = "jewelry.db" HTTPPort = ":8080" MonitorInterval = 2 ) const LogFlag = log.LstdFlags | log.Lmicroseconds /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:01 # User : geovindu # Product : GoLand # Project : godesginpattern # File : task.go */ package tasks import ( "godesginpattern/workerpool/config" "math/rand" "time" ) func RawMaterialCheck(orderID string) error { simulateDelay() return nil } func JewelryProcess(orderID string) error { simulateDelay() return nil } func FinishedGoodsCheck(orderID string) error { simulateDelay() return nil } func InventoryRecord(orderID string) error { simulateDelay() return nil } func OrderDelivery(orderID string) error { simulateDelay() return nil } var FullProcessTasks = []func(string) error{ RawMaterialCheck, JewelryProcess, FinishedGoodsCheck, InventoryRecord, OrderDelivery, } func simulateDelay() { rand.Seed(time.Now().UnixNano()) delay := config.MinTaskDelay + rand.Float64()*(config.MaxTaskDelay-config.MinTaskDelay) time.Sleep(time.Duration(delay*1000) * time.Millisecond) } /* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:01 # User : geovindu # Product : GoLand # Project : godesginpattern # File : worker_pool.go */ package core import ( "godesginpattern/workerpool/config" "godesginpattern/workerpool/utils" "os" "os/signal" "sync" "syscall" "time" ) type Task struct { OrderID string Func func(string) error } type WorkerPool struct { workerCount int taskChan chan *Task wg sync.WaitGroup monitor *utils.Monitor quit chan os.Signal } func NewWorkerPool(workerCnt int, queueSize int) *WorkerPool { wp := &WorkerPool{ workerCount: workerCnt, taskChan: make(chan *Task, queueSize), monitor: &utils.Monitor{}, quit: make(chan os.Signal, 1), } signal.Notify(wp.quit, syscall.SIGINT, syscall.SIGTERM) return wp } func (wp *WorkerPool) worker(id int) { utils.Logger.Printf("Worker %d 已启动", id) for { select { case task, ok := <-wp.taskChan: if !ok { utils.Logger.Printf("Worker %d 安全退出", id) return } wp.monitor.DecWaiting() wp.monitor.IncRunning() utils.Logger.Printf("Worker %d 开始任务: %s", id, task.OrderID) err := utils.Retry(func() error { return task.Func(task.OrderID) }) if err != nil { utils.Logger.Printf("Worker %d 任务失败: %s, 错误: %v", id, task.OrderID, err) } else { utils.Logger.Printf("Worker %d 完成任务: %s", id, task.OrderID) } wp.monitor.DecRunning() wp.monitor.IncFinished() wp.wg.Done() case <-wp.quit: utils.Logger.Printf("Worker %d 收到关闭信号,退出", id) return } } } func (wp *WorkerPool) Start() { utils.Logger.Println("工作池启动") for i := 1; i <= wp.workerCount; i++ { go wp.worker(i) } go wp.monitorLoop() } func (wp *WorkerPool) Submit(task *Task) { wp.wg.Add(1) wp.monitor.IncWaiting() wp.taskChan <- task } func (wp *WorkerPool) monitorLoop() { ticker := time.NewTicker(time.Duration(config.MonitorInterval) * time.Second) defer ticker.Stop() for { select { case <-ticker.C: r, w, f, failed := wp.monitor.Get() utils.Logger.Printf("[监控] 运行:%d | 等待:%d | 完成:%d | 失败:%d", r, w, f, failed) // 运行、等待都为0,代表所有任务全部完成 if r == 0 && w == 0 { utils.Logger.Println("所有任务处理完成,自动停止监控输出") return } case <-wp.quit: utils.Logger.Println("监控循环收到关闭信号,停止监控输出") return } } } func (wp *WorkerPool) Wait() { // 阻塞等待 Ctrl+C / kill 信号 <-wp.quit utils.Logger.Println("优雅关闭中...") // 关闭任务通道,worker 不再接收新任务 close(wp.taskChan) // 等待正在执行的任务全部处理完毕 wp.wg.Wait() utils.Logger.Println("✅ 所有任务执行完毕,服务安全退出") }调用:
/* # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : goLang 2024.3.6 go 26.2 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 18:02 # User : geovindu # Product : GoLand # Project : godesginpattern # File : workerpoolbll.go */ package bll import ( "fmt" "godesginpattern/workerpool/config" "godesginpattern/workerpool/core" "godesginpattern/workerpool/tasks" "godesginpattern/workerpool/utils" ) func WorkerPoolMain() { utils.InitLogger() logger := utils.Logger logger.Println("================================================") logger.Println(" 珠宝企业级生产系统(Go 企业级 Worker Pool)") logger.Println("================================================") pool := core.NewWorkerPool(config.WorkerCount, config.QueueMaxSize) pool.Start() totalOrder := 10 logger.Printf("开始提交 %d 个珠宝订单\n", totalOrder) for i := 1; i <= totalOrder; i++ { orderID := fmt.Sprintf("订单-%03d", i) for _, fn := range tasks.FullProcessTasks { pool.Submit(&core.Task{ OrderID: orderID, Func: fn, }) } } logger.Println("✅ 所有订单已提交,按 Ctrl+C 优雅关闭") pool.Wait() }输出: