-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjob_queue.go
More file actions
139 lines (124 loc) · 3.15 KB
/
Copy pathjob_queue.go
File metadata and controls
139 lines (124 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package executor
import (
"fmt"
"sync"
"time"
)
type Handler func(key string, payloads []any)
// to reduce lock contention
const QUEUESHARD = 128
// JobQueue runs jobs in order
//
// Examples:
//
// queue := New(func(key string, jobs []any) {
// for _, job := range jobs {
// fmt.Println("EXECUTE", job, "FOR", key)
// time.Sleep(1 * time.Second)
// }
// })
//
// queue.Add("user1", "job1")
// queue.Add("user1", "job2")
// queue.Add("user2", "job3")
// queue.Add("user2", "job4")
type JobQueue struct {
state string // active || dead (for testing)
autoCleanDisabled bool // for testing
locks [QUEUESHARD]*sync.Mutex
workers [QUEUESHARD]map[string][]any
pendingCounts [QUEUESHARD]int
handler Handler
}
// Add pushs a new job to the queue
// when the job get executed, the handler f will be called.
// - two jobs with the same key will be executed in order, handler
// for the later job only called after handler for the former job
// finished
// - two jobs with difference keys are executed concurrently
func (queue *JobQueue) Add(key string, job any) {
shard := Fnv32(key) % QUEUESHARD
lock := queue.locks[shard]
lock.Lock()
defer lock.Unlock()
for queue.pendingCounts[shard] > 100_000 {
// too fast
lock.Unlock()
time.Sleep(1 * time.Second)
fmt.Println("TOO FAST")
lock.Lock()
}
jobs, has := queue.workers[shard][key]
jobs = append(jobs, job)
queue.pendingCounts[shard]++
queue.workers[shard][key] = jobs
if has {
return
}
// trigger only once for the first job the make queue non-nil
go func(key string) {
for {
lock.Lock()
workers := queue.workers[shard]
jobs := workers[key]
if len(jobs) == 0 {
// this queue is out of job, we should release the go-routine
// but before that, we must mark the queue as slept, this makes
// sure the next incomming job will start (trigger) a new go routine
delete(workers, key)
lock.Unlock()
return
}
workers[key] = []any{}
queue.pendingCounts[shard] -= len(jobs)
lock.Unlock()
if queue.state != "active" {
return
}
queue.handler(key, jobs)
}
}(key)
}
// for testing purpose
func (queue *JobQueue) Shutdown() {
queue.state = "dead"
}
// shrink map to save memory
// https://github.com/golang/go/issues/20135
func (queue *JobQueue) clean() {
for shard := range QUEUESHARD {
queue.locks[shard].Lock()
oldmap := queue.workers[shard]
newmap := map[string][]any{}
for k, v := range oldmap {
newmap[k] = v
}
queue.workers[shard] = newmap
queue.locks[shard].Unlock()
}
}
// New creates a new JobQueue object
// f will be call (almost immediately) after a new job is added to the queue
func New(f Handler) *JobQueue {
queue := &JobQueue{
state: "active",
locks: [QUEUESHARD]*sync.Mutex{},
workers: [QUEUESHARD]map[string][]any{},
handler: f,
}
for shard := range QUEUESHARD {
queue.locks[shard] = &sync.Mutex{}
queue.workers[shard] = map[string][]any{}
}
go func() {
time.Sleep(1 * time.Second)
for !queue.autoCleanDisabled && queue.state == "active" {
time.Sleep(30 * time.Minute)
if queue.state != "active" {
return
}
queue.clean()
}
}()
return queue
}