Update multi-uv example.

master
Alexey Melnichuk 2014-12-22 16:11:33 +04:00
parent 1738369579
commit b4a7579929
1 changed files with 161 additions and 79 deletions

View File

@ -1,12 +1,15 @@
local curl = require "cURL"
local uv = require "lluv"
local ut = require "lluv.utils"
local fprintf = function(f, ...) f:write((string.format(...))) end
local printf = function(...) fprintf(io.stdout, ...) end
local stderr = io.stderr
local timeout, curl_handle
local trace = false
trace = trace and print or function() end
local ACTION_NAMES = {
[curl.POLL_IN ] = "POLL_IN";
@ -15,39 +18,84 @@ local ACTION_NAMES = {
[curl.POLL_NONE ] = "POLL_NONE";
[curl.POLL_REMOVE ] = "POLL_REMOVE";
}
local EVENT_NAMES = {
[ uv.READABLE ] = "READABLE";
[ uv.WRITABLE ] = "WRITABLE";
[ uv.READABLE + uv.WRITABLE ] = "READABLE + WRITABLE";
}
local FLAGS = {
[ uv.READABLE ] = curl.CSELECT_IN;
[ uv.WRITABLE ] = curl.CSELECT_OUT;
[ uv.READABLE + uv.WRITABLE ] = curl.CSELECT_IN + curl.CSELECT_OUT;
local FLAGS = {
[ uv.READABLE ] = curl.CSELECT_IN;
[ uv.WRITABLE ] = curl.CSELECT_OUT;
[ uv.READABLE + uv.WRITABLE ] = curl.CSELECT_IN + curl.CSELECT_OUT;
}
local trace = true
local Context = ut.class() do
trace = trace and print or function() end
function Context:__init(fd)
self._fd = assert(fd)
self._poll = uv.poll_socket(fd)
self._poll.data = {context = self}
local CONTEXT = {}
assert(self._poll:fileno() == fd)
function create_curl_context(sockfd)
local context = {
sockfd = sockfd;
poll_handle = uv.poll_socket(sockfd);
}
context.poll_handle.data = context
return context
return self
end
function destroy_curl_context(context)
context.poll_handle:close()
end
function Context:close()
if not self._poll then return end
self._poll.data = nil
self._poll:close()
self._poll, self._fd = nil
end
function Context:poll(...)
self._poll:start(...)
end
function Context:fileno()
return self._fd
end
end
-- Number of parallel request
local MAX_REQUESTS
local timer, multi
local qtask = ut.Queue.new() -- wait tasks
local qfree = ut.Queue.new() -- avaliable easy handles
local qeasy = {} -- all easy handles
local function cleanup()
timer:close()
for i, easy in ipairs(qeasy) do
multi:remove_handle(easy)
easy:close()
end
multi:close()
end
local proceed_queue, add_download do
proceed_queue = function()
if qtask:empty() then return end
if qfree:empty() then
if #qeasy < MAX_REQUESTS then
local easy = assert(curl.easy())
qeasy[#qeasy + 1] = easy
qfree:push(easy)
else
return
end
end
local task = assert(qtask:pop())
local url, num = task[1], task[2]
function add_download(url, num)
local filename = tostring(num) .. ".download"
local file = io.open(filename, "w")
if not file then
@ -55,101 +103,133 @@ function add_download(url, num)
return
end
local handle = curl.easy{
local handle = assert(qfree:pop())
handle:setopt{
url = url;
writefunction = file;
}
handle.data = file
handle.data = { file = file }
multi:add_handle(handle)
curl_handle:add_handle(handle)
fprintf(stderr, "Added download %s -> %s\n", url, filename);
end
function check_multi_info()
add_download = function(url, num)
qtask:push{url, num}
proceed_queue()
end
end
local on_libuv_poll, on_libuv_timeout
local on_curl_timeout, on_curl_action do
on_curl_timeout = function(ms)
-- calls by curl --
trace("CURL::TIMEOUT", ms)
if ms <= 0 then ms = 1 end
timer:start(ms, 0, on_libuv_timeout)
end
on_curl_action = function(easy, fd, action)
local ok, err = pcall(function()
trace("CURL::SOCKET", easy, s, ACTION_NAMES[action] or action)
local context = easy.data.context
if (action == curl.POLL_IN) or (action == curl.POLL_OUT) then
if not context then
context = Context.new(fd)
easy.data.context = context
end
end
assert(context:fileno() == fd)
if action == curl.POLL_IN then context:poll(uv.READABLE, on_libuv_poll)
elseif action == curl.POLL_OUT then context:poll(uv.WRITABLE, on_libuv_poll)
elseif action == curl.POLL_REMOVE then
if context then
easy.data.context = nil
context:close()
end
end
end)
if not ok then uv.defer(function() error(err) end) end
end
end
-- on_libuv_poll, on_libuv_timeout
do
local curl_check_multi_info = function()
while true do
local easy, ok, err = curl_handle:info_read(true)
if not easy then curl_handle:close() error(err) end
local easy, ok, err = multi:info_read(true)
if not easy then
multi:close()
error(err)
end
if easy == 0 then break end
local context = CONTEXT[e]
if context then destroy_curl_context(context) end
local file = assert(easy.data)
file:close()
local done_url = easy:getinfo_effective_url()
easy:close()
local context = easy.data.context
if context then context:close() end
local file = assert(easy.data.file)
file:close()
easy.data = nil
qfree:push(easy)
if ok then
printf("%s DONE\n", done_url);
elseif data == "error" then
printf("%s ERROR - %s\n", done_url, tostring(err));
end
proceed_queue()
end
end
function curl_perform(handle, err, events)
-- calls by libuv --
on_libuv_poll = function(handle, err, events)
trace("UV::POLL", handle, err, EVENT_NAMES[events] or events)
local flags = assert(FLAGS[events], ("unknown event:" .. events))
context = handle.data
context = handle.data.context
curl_handle:socket_action(context.sockfd, flags)
multi:socket_action(context:fileno(), flags)
check_multi_info()
curl_check_multi_info()
end
function on_timeout(timer)
-- calls by libuv --
on_libuv_timeout = function(timer)
trace("UV::TIMEOUT", timer)
local running_handles, err = curl_handle:socket_action()
local running_handles, err = multi:socket_action()
check_multi_info()
curl_check_multi_info()
end
function start_timeout(timeout_ms)
-- calls by curl --
trace("CURL::TIMEOUT", timeout_ms)
-- 0 means directly call socket_action, but we'll do it in a bit
if timeout_ms <= 0 then timeout_ms = 1 end
timeout:stop():start(timeout_ms, 0, on_timeout)
end
function handle_socket(easy, s, action)
local ok, err = pcall(function()
-- calls by curl --
trace("CURL::SOCKET", easy, s, ACTION_NAMES[action] or action)
MAX_REQUESTS = 64
local curl_context = CONTEXT[easy] or create_curl_context(s)
CONTEXT[easy] = curl_context
timer = uv.timer()
assert(curl_context.sockfd == s)
if action == curl.POLL_IN then
curl_context.poll_handle:start(uv.READABLE, curl_perform)
elseif action == curl.POLL_OUT then
curl_context.poll_handle:start(uv.WRITABLE, curl_perform)
elseif action == curl.POLL_REMOVE then
CONTEXT[easy] = nil
destroy_curl_context(curl_context)
end
end)
if not ok then uv.defer(function() error(err) end) end
end
timeout = uv.timer()
curl_handle = curl.multi{
socketfunction = handle_socket;
timerfunction = start_timeout;
}
curl_handle = curl.multi{
socketfunction = handle_socket;
timerfunction = start_timeout;
multi = curl.multi{
timerfunction = on_curl_timeout;
socketfunction = on_curl_action;
}
for i = 1, math.huge do
@ -158,4 +238,6 @@ for i = 1, math.huge do
add_download(url, i)
end
uv.run(loop, UV_RUN_DEFAULT)
uv.run()
cleanup()