墨香年少 32 发布于 3月2日 package main import ( "database/sql" "fmt" "log" "sync" "time" _ "github.com/go-sql-driver/mysql" ) // Task represents a task that can be executed by the worker type Task struct { Query string MySQLConn *sql.DB } // WorkerPool represents a simple worker pool type WorkerPool struct { WorkerCount int JobQueue chan Task WaitGroup sync.WaitGroup } // NewWorkerPool creates a new WorkerPool func NewWorkerPool(workerCount int) *WorkerPool { return &WorkerPool{ WorkerCount: workerCount, JobQueue: make(chan Task, workerCount), } } // Start initializes and starts the worker pool func (wp *WorkerPool) Start() { for i := 0; i < wp.WorkerCount; i++ { go wp.worker() } } // Enqueue adds a task to the job queue func (wp *WorkerPool) Enqueue(task Task) { wp.WaitGroup.Add(1) wp.JobQueue <- task } // StopAndWait stops the worker pool and waits for all tasks to complete func (wp *WorkerPool) StopAndWait() { close(wp.JobQueue) wp.WaitGroup.Wait() } // worker is the actual worker that executes tasks func (wp *WorkerPool) worker() { defer wp.WaitGroup.Done() for task := range wp.JobQueue { if err := task.Execute(); err != nil { log.Printf("Error executing task: %v", err) } } } // Execute executes the task (query) on the MySQL connection func (t Task) Execute() error { _, err := t.MySQLConn.Exec(t.Query) return err } // MySQLConnectionPool represents a simple MySQL connection pool type MySQLConnectionPool struct { MaxConnections int connections chan *sql.DB mu sync.Mutex } // NewMySQLConnectionPool creates a new MySQLConnectionPool func NewMySQLConnectionPool(maxConnections int, dataSourceName string) (*MySQLConnectionPool, error) { pool := &MySQLConnectionPool{ MaxConnections: maxConnections, connections: make(chan *sql.DB, maxConnections), } for i := 0; i < maxConnections; i++ { conn, err := sql.Open("mysql", dataSourceName) if err != nil { return nil, fmt.Errorf("failed to open MySQL connection: %v", err) } pool.connections <- conn } return pool, nil } // GetConnection retrieves a connection from the pool func (p *MySQLConnectionPool) GetConnection() *sql.DB { p.mu.Lock() defer p.mu.Unlock() return <-p.connections } // ReleaseConnection releases a connection back to the pool func (p *MySQLConnectionPool) ReleaseConnection(conn *sql.DB) { p.mu.Lock() defer p.mu.Unlock() p.connections <- conn } func main() { // Replace the following MySQL connection details dataSourceName := "root:password@tcp(localhost:3306)/database" // Create MySQL connection pool mysqlPool, err := NewMySQLConnectionPool(5, dataSourceName) if err != nil { log.Fatalf("Failed to create MySQL connection pool: %v", err) } // Create and start worker pool workerPool := NewWorkerPool(4) workerPool.Start() // Use worker pool to execute tasks for i := 0; i < 10; i++ { query := fmt.Sprintf("INSERT INTO table_name (column1, column2) VALUES (%d, 'value')", i) conn := mysqlPool.GetConnection() task := Task{Query: query, MySQLConn: conn} workerPool.Enqueue(task) mysqlPool.ReleaseConnection(conn) } // Stop worker pool and wait for tasks to complete workerPool.StopAndWait() } 目之所及,皆是回忆,心之所想,皆是过往 分享这篇帖子 链接帖子 分享到其他站点