golang使用api启动或停止goroutine
ClearSky Drizzle Lv4

在 Go 语言中,Goroutine 是一种轻量级的线程实现,它使得并发编程变得非常容易。然而,在实际应用中,我们常常需要对这些 Goroutine 进行管理,例如启动和停止它们。本文将详细介绍如何通过 API 来启动和停止 Goroutine,同时会对提供的示例代码进行深入分析。

代码功能概述

我们的目标是创建一个可以通过 HTTP API 启动和停止特定任务的服务。这些任务以 Goroutine 的形式运行,并且会按照一定的时间间隔执行特定的操作。具体来说,我们有两个任务:SellerWellOrderWpsTask,它们会每 5 秒打印一条日志信息。

代码详细分析

数据结构定义

1
2
3
4
5
6
7
8
9
type GoroutineInfo struct {
stopCh chan struct{}
wg sync.WaitGroup
}

type GoroutineManager struct {
goroutines map[string]*GoroutineInfo
mu sync.Mutex
}
  • GoroutineInfo 结构体用于存储每个 Goroutine 的停止通道 stopCh 和等待组 wgstopCh 用于向 Goroutine 发送停止信号,wg 用于等待 Goroutine 执行完毕。
  • GoroutineManager 结构体用于管理所有的 Goroutine。goroutines 是一个映射,键为任务类型字符串,值为 GoroutineInfo 指针。mu 是一个互斥锁,用于保证对 goroutines 的并发安全访问。

初始化 Goroutine 管理器

1
2
3
4
5
func NewGoroutineManager() *GoroutineManager {
return &GoroutineManager{
goroutines: make(map[string]*GoroutineInfo),
}
}

NewGoroutineManager 函数用于创建一个新的 GoroutineManager 实例,并初始化 goroutines 映射。

任务函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func SellWellTask(stopCh chan struct{}) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-stopCh:
return
case <-ticker.C:
log.Println("SellerWellTask running...")
}
}
}

func WpsTaskS(stopCh chan struct{}) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-stopCh:
return
case <-ticker.C:
log.Println("WpsTask running...")
}
}
}
  • SellWellTaskWpsTaskS 是两个具体的任务函数,它们会在一个无限循环中运行。使用 time.Ticker 每 5 秒触发一次操作,打印相应的日志信息。当接收到 stopCh 通道的信号时,函数会退出循环并返回。

启动任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (m *GoroutineManager) StartTask(taskType string) error {
m.mu.Lock()
defer m.mu.Unlock()

if _, exists := m.goroutines[taskType]; exists {
return fmt.Errorf("task already running")
}

info := &GoroutineInfo{
stopCh: make(chan struct{}),
}
info.wg.Add(1)

m.goroutines[taskType] = info

go func(info *GoroutineInfo) {
defer func() {
log.Printf("%s task stopped", taskType)
info.wg.Done()
}()

switch taskType {
case SellerWellOrder:
SellWellTask(info.stopCh)
case WpsTask:
WpsTaskS(info.stopCh)
}
}(info)

return nil
}
  • StartTask 方法用于启动一个新的任务。首先,它会加锁以保证并发安全。然后检查该任务是否已经在运行,如果是则返回错误。接着创建一个新的 GoroutineInfo 实例,并将其添加到 goroutines 映射中。最后,启动一个新的 Goroutine 来执行相应的任务。

停止任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (m *GoroutineManager) StopTask(taskType string) bool {
m.mu.Lock()
info, exists := m.goroutines[taskType]
if !exists {
m.mu.Unlock()
return false
}
delete(m.goroutines, taskType)
m.mu.Unlock()

close(info.stopCh)
info.wg.Wait()
return true
}
  • StopTask 方法用于停止一个正在运行的任务。首先,它会加锁并检查该任务是否存在。如果存在,则从 goroutines 映射中删除该任务,并解锁。然后关闭 stopCh 通道,向 Goroutine 发送停止信号,并等待 Goroutine 执行完毕。

HTTP API 处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func TestGoroutineManager(t *testing.T) {
manager := NewGoroutineManager()

http.HandleFunc("/start", authMiddleware(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

var req struct {
TaskType string `json:"task_type"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}

err := manager.StartTask(req.TaskType)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

json.NewEncoder(w).Encode(map[string]string{"status": "started", "task_type": req.TaskType})
}))

http.HandleFunc("/stop", authMiddleware(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

var req struct {
TaskType string `json:"task_type"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}

stopped := manager.StopTask(req.TaskType)
if !stopped {
http.Error(w, "Task not found", http.StatusNotFound)
return
}

json.NewEncoder(w).Encode(map[string]string{"status": "stopped"})
}))

log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
  • /start 端点用于启动一个任务。它接收一个 JSON 格式的请求体,包含 task_type 字段。如果请求方法不是 POST 或者请求体解析失败,会返回相应的错误信息。如果任务启动成功,会返回一个 JSON 响应,包含任务状态和任务类型。
  • /stop 端点用于停止一个任务。同样接收一个 JSON 格式的请求体,包含 task_type 字段。如果任务不存在,会返回 Task not found 错误信息。如果任务停止成功,会返回一个 JSON 响应,包含任务状态。

总结

通过上述代码,我们实现了一个可以通过 HTTP API 启动和停止 Goroutine 的服务。核心要点包括使用通道来控制 Goroutine 的生命周期,使用互斥锁来保证并发安全,以及使用 HTTP 服务器来处理 API 请求。这种方法使得我们可以方便地管理多个并发任务,提高了系统的可维护性和灵活性。

 Comments
Comment plugin failed to load
Loading comment plugin
Powered by Hexo & Theme Keep
This site is deployed on
Unique Visitor Page View