Skip to content

Commit

Permalink
Use fiber.cond instead of fiber.wakeup, if possible
Browse files Browse the repository at this point in the history
  • Loading branch information
bigbes committed May 18, 2017
1 parent 94cd455 commit 078784d
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 34 deletions.
32 changes: 21 additions & 11 deletions queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ local fun = require('fun')
local fiber = require('fiber')

local state = require('queue.abstract.state')
local num_type = require('queue.compat').num_type
local str_type = require('queue.compat').str_type

local qc = require('queue.compat')
local num_type = qc.num_type
local str_type = qc.str_type

local session = box.session

Expand Down Expand Up @@ -70,6 +72,8 @@ function tube.put(self, data, opts)
return self.raw:normalize_task(task)
end

local conds = {}

function tube.take(self, timeout)
timeout = time(timeout or TIMEOUT_INFINITY)
local task = self.raw:take()
Expand All @@ -80,12 +84,15 @@ function tube.take(self, timeout)
while timeout > 0 do
local started = fiber.time64()
local time = event_time(timeout)
local tube_id = self.tube_id
local tid = self.tube_id
local fid = fiber.id()
local sid = session.id()

box.space._queue_consumers:insert{
session.id(), fiber.id(), tube_id, time, fiber.time64() }
fiber.sleep(tonumber(timeout) / 1000000)
box.space._queue_consumers:delete{ session.id(), fiber.id() }
box.space._queue_consumers:insert{sid, fid, tid, time, started}
conds[fid] = qc.waiter()
conds[fid]:wait(tonumber(timeout) / 1000000)
conds[fid]:free()
box.space._queue_consumers:delete{ sid, fid }

task = self.raw:take()

Expand Down Expand Up @@ -295,8 +302,11 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)

if consumer ~= nil then
if consumer[3] == tube_id then
fiber.find(consumer[2]):wakeup()
queue_consumers:delete{consumer[1], consumer[2]}
local cond = conds[consumer[2]]
if cond then
cond:signal(consumer[2])
end
end
end
-- task swicthed to taken - registry in taken space
Expand Down Expand Up @@ -340,9 +350,9 @@ function method._on_consumer_disconnect()
break
end
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
fb = fiber.find(waiter[2])
if fb ~= nil and fb:status() ~= 'dead' then
fb:wakeup()
local cond = conds[waiter[2]]
if cond then
cond:signal(waiter[2])
end
end

Expand Down
17 changes: 7 additions & 10 deletions queue/abstract/driver/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ local log = require('log')
local fiber = require('fiber')
local state = require('queue.abstract.state')

local num_type = require('queue.compat').num_type
local str_type = require('queue.compat').str_type
local qc = require('queue.compat')
local num_type = qc.num_type
local str_type = qc.str_type

local tube = {}
local method = {}
Expand Down Expand Up @@ -147,8 +148,7 @@ local function fifottl_fiber_iteration(self, processed)
if estimated > 0 or processed > 1000 then
-- free refcounter
estimated = estimated > 0 and estimated or 0
processed = 0
fiber.sleep(estimated)
self.cond:wait(estimated)
end

return processed
Expand Down Expand Up @@ -183,18 +183,15 @@ function tube.new(space, on_task_change, opts)
space = space,
on_task_change = function(self, task, stats_data)
-- wakeup fiber
if task ~= nil then
if self.fiber ~= nil then
if self.fiber:id() ~= fiber.id() then
self.fiber:wakeup()
end
end
if task ~= nil and self.fiber ~= nil then
self.cond:signal(self.fiber:id())
end
on_task_change(task, stats_data)
end,
opts = opts,
}, { __index = method })

self.cond = qc.waiter()
self.fiber = fiber.create(fifottl_fiber, self)

return self
Expand Down
17 changes: 8 additions & 9 deletions queue/abstract/driver/utubettl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ local log = require('log')
local fiber = require('fiber')

local state = require('queue.abstract.state')
local num_type = require('queue.compat').num_type
local str_type = require('queue.compat').str_type

local qc = require('queue.compat')
local num_type = qc.num_type
local str_type = qc.str_type

local tube = {}
local method = {}
Expand Down Expand Up @@ -155,7 +157,7 @@ local function utubettl_fiber_iteration(self, processed)
-- free refcounter
estimated = processed > 1000 and 0 or estimated
estimated = estimated > 0 and estimated or 0
fiber.sleep(estimated)
self.cond:wait(estimated)
end

return processed
Expand Down Expand Up @@ -190,18 +192,15 @@ function tube.new(space, on_task_change, opts)
space = space,
on_task_change = function(self, task, stat_data)
-- wakeup fiber
if task ~= nil then
if self.fiber ~= nil then
if self.fiber:id() ~= fiber.id() then
self.fiber:wakeup()
end
end
if task ~= nil and self.fiber ~= nil then
self.cond:signal(self.fiber:id())
end
on_task_change(task, stat_data)
end,
opts = opts,
}, { __index = method })

self.cond = qc.waiter()
self.fiber = fiber.create(utubettl_fiber, self)

return self
Expand Down
61 changes: 57 additions & 4 deletions queue/compat.lua
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
local fun = require('fun')
local log = require('log')
local json = require('json')
local fun = require('fun')
local log = require('log')
local json = require('json')
local fiber = require('fiber')

local iter, op = fun.iter, fun.operator

function split(self, sep)
local function split(self, sep)
local sep, fields = sep or ":", {}
local pattern = string.format("([^%s]+)", sep)
self:gsub(pattern, function(c) table.insert(fields, c) end)
Expand Down Expand Up @@ -56,6 +57,57 @@ local function pack_args(...)
return check_version({1, 7}) and { ... } or ...
end

local waiter_list = {}

local function waiter_new()
return setmetatable({
cond = fiber.cond()
}, {
__index = {
wait = function(self, timeout)
self.cond:wait(timeout)
end,
signal = function(self, wfiber)
self.cond:signal()
end,
free = function(self)
if #waiter_list < 100 then
table.insert(waiter_list, self)
end
end
}
})
end

local function waiter_old()
return setmetatable({}, {
__index = {
wait = function(self, timeout)
fiber.sleep(timeout)
end,
signal = function(self, fid)
local wfiber = fiber.find(fid)
if wfiber ~= nil and
wfiber:status() ~= 'dead' and
wfiber:id() ~= fiber.id() then
wfiber:wakeup()
end
end,
free = function(self)
if #waiter_list < 100 then
table.insert(waiter_list, self)
end
end
}
})
end

local waiter_actual = check_version({1, 7, 2}) and waiter_new or waiter_old

local function waiter()
return table.remove(waiter_list) or waiter_actual()
end

return {
split_version = split_version,
check_version = check_version,
Expand All @@ -65,4 +117,5 @@ return {
snapdir_optname = get_optname_snapdir,
logger_optname = get_optname_logger,
pack_args = pack_args,
waiter = waiter
}

0 comments on commit 078784d

Please sign in to comment.