-
Notifications
You must be signed in to change notification settings - Fork 2
/
index.js
61 lines (48 loc) · 1.16 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
function namedQueue(processor, concurrency) {
concurrency = concurrency || 1
var waiting = []
var inProg = { }
var count = 0
//var paused = false
function update() {
while (waiting.length && count < concurrency) (function() {
var t = waiting.shift()
if (inProg[t.task.id]) {
inProg[t.task.id].push(t.cb)
return
} else {
inProg[t.task.id] = [t.cb]
count++
processor(t.task, function() {
var args = arguments
if (!inProg[t.task.id]) return // probably callback called twice
count--
inProg[t.task.id].forEach(function(cb) { cb.apply(null, args) })
delete inProg[t.task.id]
setImmediate(update)
})
}
})()
}
this.push = function(task, cb) {
if (!task.hasOwnProperty('id')) throw new Error('no task.id')
if (inProg[task.id]) {
inProg[task.id].push(cb)
return
}
waiting.push({ task: task, cb: cb })
setImmediate(update)
}
this.unshift = function(task, cb) {
if (inProg[task.id]) {
inProg[task.id].push(cb)
return
}
waiting.unshift({ task: task, cb: cb })
setImmediate(update)
}
this.length = function() {
return waiting.length
}
}
module.exports = namedQueue