在 Go 语言中,Goroutine 是一种轻量级的线程实现,它使得并发编程变得非常容易。然而,在实际应用中,我们常常需要对这些 Goroutine 进行管理,例如启动和停止它们。本文将详细介绍如何通过 API 来启动和停止 Goroutine,同时会对提供的示例代码进行深入分析。
代码功能概述
我们的目标是创建一个可以通过 HTTP API 启动和停止特定任务的服务。这些任务以 Goroutine 的形式运行,并且会按照一定的时间间隔执行特定的操作。具体来说,我们有两个任务:SellerWellOrder 和 WpsTask,它们会每 5 秒打印一条日志信息。
代码详细分析
数据结构定义
1 2 3 4 5 6 7 8 9
| type GoroutineInfo struct { stopCh chan struct{} wg sync.WaitGroup }
type GoroutineManager struct { goroutines map[string]*GoroutineInfo mu sync.Mutex }
|
GoroutineInfo 结构体用于存储每个 Goroutine 的停止通道 stopCh 和等待组 wg。stopCh 用于向 Goroutine 发送停止信号,wg 用于等待 Goroutine 执行完毕。
GoroutineManager 结构体用于管理所有的 Goroutine。goroutines 是一个映射,键为任务类型字符串,值为 GoroutineInfo 指针。mu 是一个互斥锁,用于保证对 goroutines 的并发安全访问。
初始化 Goroutine 管理器
1 2 3 4 5
| func NewGoroutineManager() *GoroutineManager { return &GoroutineManager{ goroutines: make(map[string]*GoroutineInfo), } }
|
NewGoroutineManager 函数用于创建一个新的 GoroutineManager 实例,并初始化 goroutines 映射。
任务函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func SellWellTask(stopCh chan struct{}) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop()
for { select { case <-stopCh: return case <-ticker.C: log.Println("SellerWellTask running...") } } }
func WpsTaskS(stopCh chan struct{}) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop()
for { select { case <-stopCh: return case <-ticker.C: log.Println("WpsTask running...") } } }
|
SellWellTask 和 WpsTaskS 是两个具体的任务函数,它们会在一个无限循环中运行。使用 time.Ticker 每 5 秒触发一次操作,打印相应的日志信息。当接收到 stopCh 通道的信号时,函数会退出循环并返回。
启动任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func (m *GoroutineManager) StartTask(taskType string) error { m.mu.Lock() defer m.mu.Unlock()
if _, exists := m.goroutines[taskType]; exists { return fmt.Errorf("task already running") }
info := &GoroutineInfo{ stopCh: make(chan struct{}), } info.wg.Add(1)
m.goroutines[taskType] = info
go func(info *GoroutineInfo) { defer func() { log.Printf("%s task stopped", taskType) info.wg.Done() }()
switch taskType { case SellerWellOrder: SellWellTask(info.stopCh) case WpsTask: WpsTaskS(info.stopCh) } }(info)
return nil }
|
StartTask 方法用于启动一个新的任务。首先,它会加锁以保证并发安全。然后检查该任务是否已经在运行,如果是则返回错误。接着创建一个新的 GoroutineInfo 实例,并将其添加到 goroutines 映射中。最后,启动一个新的 Goroutine 来执行相应的任务。
停止任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (m *GoroutineManager) StopTask(taskType string) bool { m.mu.Lock() info, exists := m.goroutines[taskType] if !exists { m.mu.Unlock() return false } delete(m.goroutines, taskType) m.mu.Unlock()
close(info.stopCh) info.wg.Wait() return true }
|
StopTask 方法用于停止一个正在运行的任务。首先,它会加锁并检查该任务是否存在。如果存在,则从 goroutines 映射中删除该任务,并解锁。然后关闭 stopCh 通道,向 Goroutine 发送停止信号,并等待 Goroutine 执行完毕。
HTTP API 处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| func TestGoroutineManager(t *testing.T) { manager := NewGoroutineManager()
http.HandleFunc("/start", authMiddleware(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return }
var req struct { TaskType string `json:"task_type"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Invalid request body", http.StatusBadRequest) return }
err := manager.StartTask(req.TaskType) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return }
json.NewEncoder(w).Encode(map[string]string{"status": "started", "task_type": req.TaskType}) }))
http.HandleFunc("/stop", authMiddleware(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return }
var req struct { TaskType string `json:"task_type"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Invalid request body", http.StatusBadRequest) return }
stopped := manager.StopTask(req.TaskType) if !stopped { http.Error(w, "Task not found", http.StatusNotFound) return }
json.NewEncoder(w).Encode(map[string]string{"status": "stopped"}) }))
log.Println("Server starting on :8080") log.Fatal(http.ListenAndServe(":8080", nil)) }
|
/start 端点用于启动一个任务。它接收一个 JSON 格式的请求体,包含 task_type 字段。如果请求方法不是 POST 或者请求体解析失败,会返回相应的错误信息。如果任务启动成功,会返回一个 JSON 响应,包含任务状态和任务类型。
/stop 端点用于停止一个任务。同样接收一个 JSON 格式的请求体,包含 task_type 字段。如果任务不存在,会返回 Task not found 错误信息。如果任务停止成功,会返回一个 JSON 响应,包含任务状态。
总结
通过上述代码,我们实现了一个可以通过 HTTP API 启动和停止 Goroutine 的服务。核心要点包括使用通道来控制 Goroutine 的生命周期,使用互斥锁来保证并发安全,以及使用 HTTP 服务器来处理 API 请求。这种方法使得我们可以方便地管理多个并发任务,提高了系统的可维护性和灵活性。