跳转到帖子
登录关注  
墨香年少

Go语言实现线程池、MySQL连接池和任务池的示例

已推荐帖子

package main

import (
	"database/sql"
	"fmt"
	"log"
	"sync"
	"time"

	_ "github.com/go-sql-driver/mysql"
)

// Task represents a task that can be executed by the worker
type Task struct {
	Query      string
	MySQLConn  *sql.DB
}

// WorkerPool represents a simple worker pool
type WorkerPool struct {
	WorkerCount int
	JobQueue    chan Task
	WaitGroup   sync.WaitGroup
}

// NewWorkerPool creates a new WorkerPool
func NewWorkerPool(workerCount int) *WorkerPool {
	return &WorkerPool{
		WorkerCount: workerCount,
		JobQueue:    make(chan Task, workerCount),
	}
}

// Start initializes and starts the worker pool
func (wp *WorkerPool) Start() {
	for i := 0; i < wp.WorkerCount; i++ {
		go wp.worker()
	}
}

// Enqueue adds a task to the job queue
func (wp *WorkerPool) Enqueue(task Task) {
	wp.WaitGroup.Add(1)
	wp.JobQueue <- task
}

// StopAndWait stops the worker pool and waits for all tasks to complete
func (wp *WorkerPool) StopAndWait() {
	close(wp.JobQueue)
	wp.WaitGroup.Wait()
}

// worker is the actual worker that executes tasks
func (wp *WorkerPool) worker() {
	defer wp.WaitGroup.Done()

	for task := range wp.JobQueue {
		if err := task.Execute(); err != nil {
			log.Printf("Error executing task: %v", err)
		}
	}
}

// Execute executes the task (query) on the MySQL connection
func (t Task) Execute() error {
	_, err := t.MySQLConn.Exec(t.Query)
	return err
}

// MySQLConnectionPool represents a simple MySQL connection pool
type MySQLConnectionPool struct {
	MaxConnections int
	connections    chan *sql.DB
	mu             sync.Mutex
}

// NewMySQLConnectionPool creates a new MySQLConnectionPool
func NewMySQLConnectionPool(maxConnections int, dataSourceName string) (*MySQLConnectionPool, error) {
	pool := &MySQLConnectionPool{
		MaxConnections: maxConnections,
		connections:    make(chan *sql.DB, maxConnections),
	}

	for i := 0; i < maxConnections; i++ {
		conn, err := sql.Open("mysql", dataSourceName)
		if err != nil {
			return nil, fmt.Errorf("failed to open MySQL connection: %v", err)
		}
		pool.connections <- conn
	}

	return pool, nil
}

// GetConnection retrieves a connection from the pool
func (p *MySQLConnectionPool) GetConnection() *sql.DB {
	p.mu.Lock()
	defer p.mu.Unlock()
	return <-p.connections
}

// ReleaseConnection releases a connection back to the pool
func (p *MySQLConnectionPool) ReleaseConnection(conn *sql.DB) {
	p.mu.Lock()
	defer p.mu.Unlock()
	p.connections <- conn
}

func main() {
	// Replace the following MySQL connection details
	dataSourceName := "root:password@tcp(localhost:3306)/database"

	// Create MySQL connection pool
	mysqlPool, err := NewMySQLConnectionPool(5, dataSourceName)
	if err != nil {
		log.Fatalf("Failed to create MySQL connection pool: %v", err)
	}

	// Create and start worker pool
	workerPool := NewWorkerPool(4)
	workerPool.Start()

	// Use worker pool to execute tasks
	for i := 0; i < 10; i++ {
		query := fmt.Sprintf("INSERT INTO table_name (column1, column2) VALUES (%d, 'value')", i)
		conn := mysqlPool.GetConnection()
		task := Task{Query: query, MySQLConn: conn}
		workerPool.Enqueue(task)
		mysqlPool.ReleaseConnection(conn)
	}

	// Stop worker pool and wait for tasks to complete
	workerPool.StopAndWait()
}

 


目之所及,皆是回忆,心之所想,皆是过往

分享这篇帖子


链接帖子
分享到其他站点

创建帐户或登录来提出意见

你需要成为会员才能提出意见

创建帐户

注册成为会员。只要几个简单步骤!

注册帐户

登录

已有帐户? 请登录。

现在登录
登录关注  

×
×
  • 创建新的...

重要信息

注册必须使用2-8个中文汉字作为账号