417 lines
19 KiB
Lua
Executable File

--------------------------------------------------------------------------------
---------------------- ## ##### ##### ###### -----------------------
---------------------- ## ## ## ## ## ## ## -----------------------
---------------------- ## ## ## ## ## ###### -----------------------
---------------------- ## ## ## ## ## ## -----------------------
---------------------- ###### ##### ##### ## -----------------------
---------------------- -----------------------
----------------------- Lua Object-Oriented Programming ------------------------
--------------------------------------------------------------------------------
-- Project: LOOP Class Library --
-- Release: 2.3 beta --
-- Title : Lua Socket Wrapper for Cooperative Scheduling --
-- Author : Renato Maia <maia@inf.puc-rio.br> --
--------------------------------------------------------------------------------
--[[VERBOSE]] local verbose = require("loop.thread.Scheduler").verbose
--[[VERBOSE]] verbose.groups.concurrency[#verbose.groups.concurrency+1] = "cosocket"
--[[VERBOSE]] verbose:newlevel{"cosocket"}
local ipairs = ipairs
local assert = assert
local setmetatable = setmetatable
local type = type
local next = next
local coroutine = require "coroutine"
local oo = require "loop.base"
local Wrapper = require "loop.object.Wrapper"
module("loop.thread.CoSocket", oo.class)
--------------------------------------------------------------------------------
-- Initialization Code ---------------------------------------------------------
--------------------------------------------------------------------------------
function __init(class, self, scheduler)
self = oo.rawnew(class, self)
self.readlocks = {}
self.writelocks = {}
if not self.scheduler then
self.scheduler = scheduler
end
return self
end
function __index(self, field)
return _M[field] or self.socketapi[field]
end
--------------------------------------------------------------------------------
-- Wrapping functions ----------------------------------------------------------
--------------------------------------------------------------------------------
local function wrappedsettimeout(self, timeout)
self.timeout = timeout or false
end
--------------------------------------------------------------------------------
local function wrappedconnect(self, host, port) --[[VERBOSE]] local verbose = self.cosocket.scheduler.verbose
local socket = self.__object --[[VERBOSE]] verbose:cosocket(true, "performing blocking connect")
socket:settimeout(-1)
local result, errmsg = socket:connect(host, port)
socket:settimeout(0) --[[VERBOSE]] verbose:cosocket(false, "blocking connect done")
return result, errmsg
end
--------------------------------------------------------------------------------
local function wrappedaccept(self)
local socket = self.__object
local timeout = self.timeout
local cosocket = self.cosocket
local readlocks = cosocket.readlocks
local scheduler = cosocket.scheduler --[[VERBOSE]] local verbose = scheduler.verbose
local current = scheduler:checkcurrent() --[[VERBOSE]] verbose:cosocket(true, "performing wrapped accept")
assert(socket, "bad argument #1 to `accept' (wrapped socket expected)")
assert(readlocks[socket] == nil, "attempt to read a socket in use")
local conn, errmsg = socket:accept()
if conn then --[[VERBOSE]] verbose:cosocket(false, "connection accepted without waiting")
return cosocket:wrap(conn)
elseif timeout == 0 or errmsg ~= "timeout" then --[[VERBOSE]] verbose:cosocket(false, "returning error ",errmsg," without waiting")
return nil, errmsg
end --[[VERBOSE]] verbose:cosocket(true, "waiting for results")
local sleeping = scheduler.sleeping
local reading = scheduler.reading
-- subscribing current thread for reading signal
reading:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for read signal")
-- lock socket for reading and wait for signal until timeout
readlocks[socket] = current
scheduler:suspend(timeout) --[[VERBOSE]] verbose:cosocket(false, "wrapped accept resumed")
readlocks[socket] = nil
-- if thread is still blocked for reading then waiting timed out
if reading[socket] == current then
reading:remove(socket) --[[VERBOSE]] verbose:threads(current," unsubscribed for read signal")
return nil, "timeout" --[[VERBOSE]] , verbose:cosocket(false, "waiting timed out")
elseif timeout then
sleeping:remove(current) --[[VERBOSE]] verbose:threads(current," removed from sleeping queue")
end --[[VERBOSE]] verbose:cosocket(false, "returing results after waiting")
return cosocket:wrap(socket:accept())
end
--------------------------------------------------------------------------------
local function wrappedreceive(self, pattern)
local socket = self.__object
local timeout = self.timeout
local readlocks = self.cosocket.readlocks
local scheduler = self.cosocket.scheduler --[[VERBOSE]] local verbose = scheduler.verbose
local current = scheduler:checkcurrent() --[[VERBOSE]] verbose:cosocket(true, "performing wrapped receive")
assert(socket, "bad argument #1 to `receive' (wrapped socket expected)")
assert(readlocks[socket] == nil, "attempt to read a socket in use")
-- get data already avaliable
local result, errmsg, partial = socket:receive(pattern)
-- check if job has completed
if not result and errmsg == "timeout" and timeout ~= 0 then --[[VERBOSE]] verbose:cosocket(true, "waiting for remaining of results")
local running = scheduler.running
local sleeping = scheduler.sleeping
local reading = scheduler.reading
-- set to be waken at timeout, if specified
if timeout and timeout > 0 then
sleeping:enqueue(current, scheduler:time() + timeout) --[[VERBOSE]] verbose:threads(current," registered for signal in ",timeout," seconds")
end
-- lock socket to avoid use by other coroutines
readlocks[socket] = true
-- block current thread on the socket
reading:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for read signal")
-- reduce the number of required bytes
if type(pattern) == "number" then
pattern = pattern - #partial --[[VERBOSE]] verbose:cosocket("amount of required bytes reduced to ",pattern)
end
repeat
-- stop current thread
running:remove(current, self.currentkey) --[[VERBOSE]] verbose:threads(current," suspended")
coroutine.yield() --[[VERBOSE]] verbose:cosocket(false, "wrapped receive resumed")
-- check if the socket is ready
if reading[socket] == current then
reading:remove(socket) --[[VERBOSE]] verbose:threads(current," unsubscribed for read signal")
errmsg = "timeout" --[[VERBOSE]] verbose:cosocket(false, "wrapped send timed out")
else --[[VERBOSE]] verbose:cosocket "reading more data from socket"
local newdata
result, errmsg, newdata = socket:receive(pattern)
if result then --[[VERBOSE]] verbose:cosocket "received all requested data"
result, errmsg, partial = partial..result, nil, nil --[[VERBOSE]] verbose:cosocket(false, "returning results after waiting")
else --[[VERBOSE]] verbose:cosocket "received only partial data"
partial = partial..newdata
if errmsg == "timeout" then
-- block current thread on the socket for more data
reading:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for another read signal")
-- reduce the number of required bytes
if type(pattern) == "number" then
pattern = pattern - #newdata --[[VERBOSE]] verbose:cosocket("amount of required bytes reduced to ",pattern)
end
-- cancel error message
errmsg = nil --[[VERBOSE]] else verbose:cosocket(false, "returning error ",errmsg," after waiting")
end
end
end
until result or errmsg
-- remove from sleeping queue if it was waken because of data on socket.
if timeout and timeout > 0 and errmsg ~= "timeout" then
sleeping:remove(current) --[[VERBOSE]] verbose:threads(current," removed from sleeping queue")
end
-- unlock socket to allow use by other coroutines
readlocks[socket] = nil --[[VERBOSE]] else verbose:cosocket(false, "returning results without waiting")
end
return result, errmsg, partial
end
--------------------------------------------------------------------------------
local function wrappedsend(self, data, i, j) --[[VERBOSE]] local verbose = self.cosocket.scheduler.verbose
local socket = self.__object --[[VERBOSE]] verbose:cosocket(true, "performing wrapped send")
local timeout = self.timeout
local writelocks = self.cosocket.writelocks
local scheduler = self.cosocket.scheduler
local current = scheduler:checkcurrent()
assert(socket, "bad argument #1 to `send' (wrapped socket expected)")
assert(writelocks[socket] == nil, "attempt to write a socket in use")
-- fill buffer space already avaliable
local sent, errmsg, lastbyte = socket:send(data, i, j)
-- check if job has completed
if not sent and errmsg == "timeout" and timeout ~= 0 then --[[VERBOSE]] verbose:cosocket(true, "waiting to send remaining data")
local running = scheduler.running
local sleeping = scheduler.sleeping
local writing = scheduler.writing
-- set to be waken at timeout, if specified
if timeout and timeout > 0 then
sleeping:enqueue(current, scheduler:time() + timeout) --[[VERBOSE]] verbose:threads(current," registered for signal in ",timeout," seconds")
end
-- lock socket to avoid use by other coroutines
writelocks[socket] = true
-- block current thread on the socket
writing:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for write signal")
repeat
-- stop current thread
running:remove(current, self.currentkey) --[[VERBOSE]] verbose:threads(current," suspended")
coroutine.yield() --[[VERBOSE]] verbose:cosocket "wrapped send resumed"
-- check if the socket is ready
if writing[socket] == current then
writing:remove(socket) --[[VERBOSE]] verbose:threads(current," unsubscribed for write signal")
errmsg = "timeout" --[[VERBOSE]] verbose:cosocket "wrapped send timed out"
else --[[VERBOSE]] verbose:cosocket "writing remaining data into socket"
sent, errmsg, lastbyte = socket:send(data, lastbyte+1, j)
if not sent and errmsg == "timeout" then
-- block current thread on the socket to write data
writing:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for another write signal")
-- cancel error message
errmsg = nil --[[VERBOSE]] elseif sent then verbose:cosocket "sent all supplied data" else verbose:cosocket("returning error ",errmsg," after waiting")
end
end
until sent or errmsg
-- remove from sleeping queue, if it was waken because of data on socket.
if timeout and timeout > 0 and errmsg ~= "timeout" then
sleeping:remove(current) --[[VERBOSE]] verbose:threads(current," removed from sleeping queue")
end
-- unlock socket to allow use by other coroutines
writelocks[socket] = nil --[[VERBOSE]] verbose:cosocket "send done after waiting" else verbose:cosocket(false, "send done without waiting")
end
return sent, errmsg, lastbyte
end
--------------------------------------------------------------------------------
-- Wrapped Socket API ----------------------------------------------------------
--------------------------------------------------------------------------------
function select(self, recvt, sendt, timeout)
local scheduler = self.scheduler --[[VERBOSE]] local verbose = scheduler.verbose
local current = scheduler:checkcurrent() --[[VERBOSE]] verbose:cosocket(true, "performing wrapped select")
if (recvt and #recvt > 0) or (sendt and #sendt > 0) then
local readlocks = self.readlocks
local writelocks = self.writelocks
-- assert that no thread is already blocked on these sockets
if recvt then
local new = {}
for index, wrapper in ipairs(recvt) do
local socket = wrapper.__object
assert(readlocks[socket] == nil, "attempt to read a socket in use")
new[index] = socket
new[socket] = wrapper
end
recvt = new
end
if sendt then
local new = {}
for index, wrapper in ipairs(sendt) do
local socket = wrapper.__object
assert(writelocks[socket] == nil, "attempt to write a socket in use")
new[index] = socket
new[socket] = wrapper
end
sendt = new
end
local readok, writeok, errmsg = scheduler.select(recvt, sendt, 0)
if
timeout ~= 0 and
errmsg == "timeout" and
next(readok) == nil and
next(writeok) == nil
then --[[VERBOSE]] verbose:cosocket(true, "waiting for ready socket selection")
local running = scheduler.running
local sleeping = scheduler.sleeping
local reading = scheduler.reading
local writing = scheduler.writing
-- block current thread on the sockets and lock them
if recvt then
for _, socket in ipairs(recvt) do
readlocks[socket] = current
reading:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for read signal")
end
end
if sendt then
for _, socket in ipairs(sendt) do
writelocks[socket] = current
writing:add(socket, current) --[[VERBOSE]] verbose:threads(current," subscribed for write signal")
end
end
-- set to be waken at timeout, if specified
if timeout and timeout > 0 then
sleeping:enqueue(current, scheduler:time() + timeout) --[[VERBOSE]] verbose:threads(current," registered for signal in ",timeout," seconds")
end
-- stop current thread
running:remove(current, self.currentkey) --[[VERBOSE]] verbose:threads(current," suspended")
coroutine.yield() --[[VERBOSE]] verbose:cosocket(false, "wrapped select resumed")
-- remove from sleeping queue, if it was waken because of data on socket.
if timeout and timeout > 0 then
if sleeping:remove(current)
then errmsg = nil --[[VERBOSE]] verbose:threads(current," removed from sleeping queue")
else errmsg = "timeout" --[[VERBOSE]] verbose:cosocket "wrapped select timed out"
end
end
-- check which sockets are ready and remove block for other sockets
if recvt then
for _, socket in ipairs(recvt) do
readlocks[socket] = nil
if reading[socket] == current then
reading:remove(socket) --[[VERBOSE]] verbose:threads(current," unsubscribed for read signal")
else
local wrapper = recvt[socket]
readok[#readok+1] = wrapper
readok[wrapper] = true
end
end
end
if sendt then
for _, socket in ipairs(sendt) do
writelocks[socket] = nil
if writing[socket] == current then
writing:remove(socket) --[[VERBOSE]] verbose:threads(current," unsubscribed for write signal")
else
local wrapper = sendt[socket]
writeok[#writeok+1] = wrapper
writeok[wrapper] = true
end
end
end
else
for index, socket in ipairs(readok) do
local wrapper = recvt[socket]
readok[index] = wrapper
readok[socket] = nil
readok[wrapper] = true
end
for index, socket in ipairs(writeok) do
local wrapper = sendt[socket]
writeok[index] = wrapper
writeok[socket] = nil
writeok[wrapper] = true
end
end --[[VERBOSE]] verbose:cosocket(false, "returning selected sockets after waiting")
return readok, writeok, errmsg
else --[[VERBOSE]] verbose:cosocket(false, "no sockets for selection")
return {}, {}
end
end
function sleep(self, timeout)
assert(timeout, "bad argument #1 to `sleep' (number expected)")
return self.scheduler:suspend(timeout)
end
function tcp(self)
return self:wrap(self.socketapi.tcp())
end
function udp(self)
return self:wrap(self.socketapi.udp())
end
function connect(self, address, port)
return self:wrap(self.socketapi.connect(address, port))
end
function bind(self, address, port)
return self:wrap(self.socketapi.bind(address, port))
end
function wrap(self, socket, ...) --[[VERBOSE]] self.scheduler.verbose:cosocket "new wrapped socket"
if socket then
socket:settimeout(0)
socket = Wrapper {
__object = socket,
cosocket = self,
timeout = false,
settimeout = wrappedsettimeout,
connect = wrappedconnect,
accept = wrappedaccept,
send = wrappedsend,
receive = wrappedreceive,
}
end
return socket, ...
end