From 0ea3e6dbe2288854d9d4a971fc6539c2e740a95a Mon Sep 17 00:00:00 2001 From: Kahrl Date: Thu, 29 Aug 2013 05:04:56 +0200 Subject: [PATCH] Implement httpfetch module and initialize it from main() Add curl_parallel_limit setting that will replace media_fetch_threads in a later commit. Fix a typo in MutexedQueue::pop_back() that made it impossible to compile code that used this function. (Noticed this while implementing httpfetch.) --- src/CMakeLists.txt | 11 +- src/defaultsettings.cpp | 1 + src/httpfetch.cpp | 718 ++++++++++++++++++++++++++++++++++++++++ src/httpfetch.h | 126 +++++++ src/main.cpp | 7 + src/util/container.h | 3 +- 6 files changed, 860 insertions(+), 6 deletions(-) create mode 100644 src/httpfetch.cpp create mode 100644 src/httpfetch.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a30dc985..104e5640 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -188,7 +188,7 @@ message (STATUS "LuaJIT library: ${LUA_LIBRARY}") message (STATUS "LuaJIT headers: ${LUA_INCLUDE_DIR}") set(USE_LUAJIT 0) -if(LUA_LIBRARY AND LUA_INCLUDE_DIR) +if(LUA_LIBRARY AND LUA_INCLUDE_DIR) message (STATUS "LuaJIT found.") set(USE_LUAJIT 1) else(LUA_LIBRARY AND LUA_INCLUDE_DIR) @@ -307,6 +307,7 @@ set(common_SRCS pathfinder.cpp convert_json.cpp gettext.cpp + httpfetch.cpp ${JTHREAD_SRCS} ${common_SCRIPT_SRCS} ${UTIL_SRCS} @@ -500,7 +501,7 @@ if(MSVC) # Flags for C files (sqlite) # /MT = Link statically with standard library stuff set(CMAKE_C_FLAGS_RELEASE "/O2 /Ob2 /MT") - + if(BUILD_SERVER) set_target_properties(${PROJECT_NAME}server PROPERTIES COMPILE_DEFINITIONS "SERVER") @@ -508,13 +509,13 @@ if(MSVC) else() # Probably GCC - + if(WARN_ALL) set(RELEASE_WARNING_FLAGS "-Wall") else() set(RELEASE_WARNING_FLAGS "") endif() - + if(NOT APPLE AND NOT "${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") CHECK_CXX_COMPILER_FLAG("-Wno-unused-but-set-variable" HAS_UNUSED_BUT_SET_VARIABLE_WARNING) if(HAS_UNUSED_BUT_SET_VARIABLE_WARNING) @@ -537,7 +538,7 @@ else() if(USE_GPROF) set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -pg") endif() - + if(BUILD_SERVER) set_target_properties(${PROJECT_NAME}server PROPERTIES COMPILE_DEFINITIONS "SERVER") diff --git a/src/defaultsettings.cpp b/src/defaultsettings.cpp index 9b407b1c..662717c8 100644 --- a/src/defaultsettings.cpp +++ b/src/defaultsettings.cpp @@ -140,6 +140,7 @@ void set_default_settings(Settings *settings) settings->setDefault("enable_particles", "true"); settings->setDefault("media_fetch_threads", "8"); + settings->setDefault("curl_parallel_limit", "8"); settings->setDefault("serverlist_url", "servers.minetest.net"); settings->setDefault("serverlist_file", "favoriteservers.txt"); diff --git a/src/httpfetch.cpp b/src/httpfetch.cpp new file mode 100644 index 00000000..4342a8b2 --- /dev/null +++ b/src/httpfetch.cpp @@ -0,0 +1,718 @@ +/* +Minetest +Copyright (C) 2013 celeron55, Perttu Ahola + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License along +with this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#include "httpfetch.h" +#include +#include +#include +#include +#include +#include "jthread/jevent.h" +#include "config.h" +#include "exceptions.h" +#include "debug.h" +#include "log.h" +#include "util/container.h" +#include "util/thread.h" +#include "socket.h" // for select() + +JMutex g_httpfetch_mutex; +std::map > g_httpfetch_results; + +static void httpfetch_deliver_result(const HTTPFetchResult &fetchresult) +{ + unsigned long caller = fetchresult.caller; + if (caller != HTTPFETCH_DISCARD) { + JMutexAutoLock lock(g_httpfetch_mutex); + g_httpfetch_results[caller].push_back(fetchresult); + } +} + +static void httpfetch_request_clear(unsigned long caller); + +unsigned long httpfetch_caller_alloc() +{ + JMutexAutoLock lock(g_httpfetch_mutex); + + // Check each caller ID except HTTPFETCH_DISCARD + const unsigned long discard = HTTPFETCH_DISCARD; + for (unsigned long caller = discard + 1; caller != discard; ++caller) { + std::map >::iterator + it = g_httpfetch_results.find(caller); + if (it == g_httpfetch_results.end()) { + verbosestream<<"httpfetch_caller_alloc: allocating " + < >::iterator + it = g_httpfetch_results.find(caller); + if (it == g_httpfetch_results.end()) + return false; + + // Check that result queue is nonempty + std::list &callerresults = it->second; + if (callerresults.empty()) + return false; + + // Pop first result + fetchresult = callerresults.front(); + callerresults.pop_front(); + return true; +} + +#if USE_CURL +#include + +/* + USE_CURL is on: use cURL based httpfetch implementation +*/ + +static size_t httpfetch_writefunction( + char *ptr, size_t size, size_t nmemb, void *userdata) +{ + std::ostringstream *stream = (std::ostringstream*)userdata; + size_t count = size * nmemb; + stream->write(ptr, count); + return count; +} + +static size_t httpfetch_discardfunction( + char *ptr, size_t size, size_t nmemb, void *userdata) +{ + return size * nmemb; +} + +class CurlHandlePool +{ + std::list handles; + +public: + CurlHandlePool() {} + ~CurlHandlePool() + { + for (std::list::iterator it = handles.begin(); + it != handles.end(); ++it) { + curl_easy_cleanup(*it); + } + } + CURL * alloc() + { + CURL *curl; + if (handles.empty()) { + curl = curl_easy_init(); + if (curl == NULL) { + errorstream<<"curl_easy_init returned NULL"<alloc(); + if (curl != NULL) { + // Set static cURL options + curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1); + curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1); + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 1); + +#if LIBCURL_VERSION_NUM >= 0x071304 + // Restrict protocols so that curl vulnerabilities in + // other protocols don't affect us. + // These settings were introduced in curl 7.19.4. + long protocols = + CURLPROTO_HTTP | + CURLPROTO_HTTPS | + CURLPROTO_FTP | + CURLPROTO_FTPS; + curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols); + curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols); +#endif + + // Set cURL options based on HTTPFetchRequest + curl_easy_setopt(curl, CURLOPT_URL, + request.url.c_str()); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, + request.timeout); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, + request.connect_timeout); + // Set up a write callback that writes to the + // ostringstream ongoing->oss, unless the data + // is to be discarded + if (request.caller == HTTPFETCH_DISCARD) { + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, + httpfetch_discardfunction); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL); + } + else { + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, + httpfetch_writefunction); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &oss); + } + // Set POST (or GET) data + if (request.post_fields.empty()) { + curl_easy_setopt(curl, CURLOPT_HTTPGET, 1); + } + else { + curl_easy_setopt(curl, CURLOPT_POST, 1); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, + request.post_fields.size()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, + request.post_fields.c_str()); + // request.post_fields must now *never* be + // modified until CURLOPT_POSTFIELDS is cleared + } + // Set additional HTTP headers + for (size_t i = 0; i < request.extra_headers.size(); ++i) { + httpheader = curl_slist_append( + httpheader, + request.extra_headers[i].c_str()); + } + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, httpheader); + } + } + + CURLcode start(CURLM *multi_) + { + if (curl == NULL) + return CURLE_FAILED_INIT; + + if (multi_) { + // Multi interface (async) + CURLMcode mres = curl_multi_add_handle(multi_, curl); + if (mres != CURLM_OK) { + errorstream<<"curl_multi_add_handle" + <<" returned error code "<free(curl); + } +}; + +class CurlFetchThread : public SimpleThread +{ +protected: + enum RequestType { + RT_FETCH, + RT_CLEAR, + RT_WAKEUP, + }; + + struct Request { + RequestType type; + HTTPFetchRequest fetchrequest; + Event *event; + }; + + CURLM *m_multi; + MutexedQueue m_requests; + size_t m_parallel_limit; + + // Variables exclusively used within thread + std::vector m_all_ongoing; + std::list m_queued_fetches; + +public: + CurlFetchThread(int parallel_limit) + { + if (parallel_limit >= 1) + m_parallel_limit = parallel_limit; + else + m_parallel_limit = 1; + } + + void requestFetch(const HTTPFetchRequest &fetchrequest) + { + Request req; + req.type = RT_FETCH; + req.fetchrequest = fetchrequest; + req.event = NULL; + m_requests.push_back(req); + } + + void requestClear(unsigned long caller, Event *event) + { + Request req; + req.type = RT_CLEAR; + req.fetchrequest.caller = caller; + req.event = event; + m_requests.push_back(req); + } + + void requestWakeUp() + { + Request req; + req.type = RT_WAKEUP; + req.event = NULL; + m_requests.push_back(req); + } + +protected: + // Handle a request from some other thread + // E.g. new fetch; clear fetches for one caller; wake up + void processRequest(const Request &req) + { + if (req.type == RT_FETCH) { + // New fetch, queue until there are less + // than m_parallel_limit ongoing fetches + m_queued_fetches.push_back(req.fetchrequest); + + // see processQueued() for what happens next + + } + else if (req.type == RT_CLEAR) { + unsigned long caller = req.fetchrequest.caller; + + // Abort all ongoing fetches for the caller + for (std::vector::iterator + it = m_all_ongoing.begin(); + it != m_all_ongoing.end();) { + if ((*it)->request.caller == caller) { + delete (*it); + it = m_all_ongoing.erase(it); + } + else + ++it; + } + + // Also abort all queued fetches for the caller + for (std::list::iterator + it = m_queued_fetches.begin(); + it != m_queued_fetches.end();) { + if ((*it).caller == caller) + it = m_queued_fetches.erase(it); + else + ++it; + } + } + else if (req.type == RT_WAKEUP) { + // Wakeup: Nothing to do, thread is awake at this point + } + + if (req.event != NULL) + req.event->signal(); + } + + // Start new ongoing fetches if m_parallel_limit allows + void processQueued(CurlHandlePool *pool) + { + while (m_all_ongoing.size() < m_parallel_limit && + !m_queued_fetches.empty()) { + HTTPFetchRequest request = m_queued_fetches.front(); + m_queued_fetches.pop_front(); + + // Create ongoing fetch data and make a cURL handle + // Set cURL options based on HTTPFetchRequest + HTTPFetchOngoing *ongoing = + new HTTPFetchOngoing(request, pool); + + // Initiate the connection (curl_multi_add_handle) + CURLcode res = ongoing->start(m_multi); + if (res == CURLE_OK) { + m_all_ongoing.push_back(ongoing); + } + else { + ongoing->complete(res); + httpfetch_deliver_result(ongoing->result); + delete ongoing; + } + } + } + + // Process CURLMsg (indicates completion of a fetch) + void processCurlMessage(CURLMsg *msg) + { + // Determine which ongoing fetch the message pertains to + size_t i = 0; + bool found = false; + for (i = 0; i < m_all_ongoing.size(); ++i) { + if (m_all_ongoing[i]->curl == msg->easy_handle) { + found = true; + break; + } + } + if (msg->msg == CURLMSG_DONE && found) { + // m_all_ongoing[i] succeeded or failed. + HTTPFetchOngoing *ongoing = m_all_ongoing[i]; + ongoing->complete(msg->data.result); + httpfetch_deliver_result(ongoing->result); + delete ongoing; + m_all_ongoing.erase(m_all_ongoing.begin() + i); + } + } + + // Wait for a request from another thread, or timeout elapses + void waitForRequest(long timeout) + { + if (m_queued_fetches.empty()) { + try { + Request req = m_requests.pop_front(timeout); + processRequest(req); + } + catch (ItemNotFoundException &e) {} + } + } + + // Wait until some IO happens, or timeout elapses + void waitForIO(long timeout) + { + fd_set read_fd_set; + fd_set write_fd_set; + fd_set exc_fd_set; + int max_fd; + long select_timeout = -1; + struct timeval select_tv; + CURLMcode mres; + + FD_ZERO(&read_fd_set); + FD_ZERO(&write_fd_set); + FD_ZERO(&exc_fd_set); + + mres = curl_multi_fdset(m_multi, &read_fd_set, + &write_fd_set, &exc_fd_set, &max_fd); + if (mres != CURLM_OK) { + errorstream<<"curl_multi_fdset" + <<" returned error code "< timeout) + select_timeout = timeout; + + if (select_timeout > 0) { + select_tv.tv_sec = select_timeout / 1000; + select_tv.tv_usec = (select_timeout % 1000) * 1000; + int retval = select(max_fd + 1, &read_fd_set, + &write_fd_set, &exc_fd_set, + &select_tv); + if (retval == -1) { + #ifdef _WIN32 + errorstream<<"select returned error code " + <setRun(false); + g_httpfetch_thread->requestWakeUp(); + g_httpfetch_thread->stop(); + delete g_httpfetch_thread; + + curl_global_cleanup(); +} + +void httpfetch_async(const HTTPFetchRequest &fetchrequest) +{ + g_httpfetch_thread->requestFetch(fetchrequest); + if (!g_httpfetch_thread->IsRunning()) + g_httpfetch_thread->Start(); +} + +static void httpfetch_request_clear(unsigned long caller) +{ + if (g_httpfetch_thread->IsRunning()) { + Event event; + g_httpfetch_thread->requestClear(caller, &event); + event.wait(); + } + else { + g_httpfetch_thread->requestClear(caller, NULL); + } +} + +void httpfetch_sync(const HTTPFetchRequest &fetchrequest, + HTTPFetchResult &fetchresult) +{ + // Create ongoing fetch data and make a cURL handle + // Set cURL options based on HTTPFetchRequest + CurlHandlePool pool; + HTTPFetchOngoing ongoing(fetchrequest, &pool); + // Do the fetch (curl_easy_perform) + CURLcode res = ongoing.start(NULL); + // Update fetchresult + ongoing.complete(res); + fetchresult = ongoing.result; +} + +#else // USE_CURL + +/* + USE_CURL is off: + + Dummy httpfetch implementation that always returns an error. +*/ + +void httpfetch_init(int parallel_limit) +{ +} + +void httpfetch_cleanup() +{ +} + +void httpfetch_async(const HTTPFetchRequest &fetchrequest) +{ + errorstream<<"httpfetch_async: unable to fetch "< + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License along +with this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#ifndef HTTPFETCH_HEADER +#define HTTPFETCH_HEADER + +#include +#include +#include "config.h" + +// Can be used in place of "caller" in asynchronous transfers to discard result +// (used as default value of "caller") +#define HTTPFETCH_DISCARD 0 + +struct HTTPFetchRequest +{ + std::string url; + + // Identifies the caller (for asynchronous requests) + // Ignored by httpfetch_sync + unsigned long caller; + + // Some number that identifies the request + // (when the same caller issues multiple httpfetch_async calls) + unsigned long request_id; + + // Timeout for the whole transfer, in milliseconds + long timeout; + + // Timeout for the connection phase, in milliseconds + long connect_timeout; + + // POST data (should be application/x-www-form-urlencoded + // unless a Content-Type header is specified in extra_headers) + // If this is empty a GET request is done instead. + std::string post_fields; + + // If not empty, should contain entries such as "Accept: text/html" + std::vector extra_headers; + + HTTPFetchRequest() + { + url = ""; + caller = HTTPFETCH_DISCARD; + request_id = 0; + timeout = 0; + connect_timeout = 0; + } +}; + +struct HTTPFetchResult +{ + bool succeeded; + bool timeout; + long response_code; + std::string data; + // The caller and request_id from the corresponding HTTPFetchRequest. + unsigned long caller; + unsigned long request_id; + + HTTPFetchResult() + { + succeeded = false; + timeout = false; + response_code = 0; + data = ""; + caller = HTTPFETCH_DISCARD; + request_id = 0; + } + + HTTPFetchResult(const HTTPFetchRequest &fetchrequest) + { + succeeded = false; + timeout = false; + response_code = 0; + data = ""; + caller = fetchrequest.caller; + request_id = fetchrequest.request_id; + } +}; + +// Initializes the httpfetch module +void httpfetch_init(int parallel_limit); + +// Stops the httpfetch thread and cleans up resources +void httpfetch_cleanup(); + +// Starts an asynchronous HTTP fetch request +void httpfetch_async(const HTTPFetchRequest &fetchrequest); + +// If any fetch for the given caller ID is complete, removes it from the +// result queue, sets fetchresult and returns true. Otherwise returns false. +bool httpfetch_async_get(unsigned long caller, HTTPFetchResult &fetchresult); + +// Allocates a caller ID for httpfetch_async +// Not required if you want to set caller = HTTPFETCH_DISCARD +unsigned long httpfetch_caller_alloc(); + +// Frees a caller ID allocated with httpfetch_caller_alloc +// Note: This can be expensive, because the httpfetch thread is told +// to stop any ongoing fetches for the given caller. +void httpfetch_caller_free(unsigned long caller); + +// Performs a synchronous HTTP request. This blocks and therefore should +// only be used from background threads. +void httpfetch_sync(const HTTPFetchRequest &fetchrequest, + HTTPFetchResult &fetchresult); + + +#endif // !HTTPFETCH_HEADER diff --git a/src/main.cpp b/src/main.cpp index 57c2f06a..239d6824 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -77,6 +77,7 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "subgame.h" #include "quicktune.h" #include "serverlist.h" +#include "httpfetch.h" #include "guiEngine.h" #include "mapsector.h" @@ -1001,6 +1002,9 @@ int main(int argc, char *argv[]) assert(res == CURLE_OK); #endif + // Initialize HTTP fetcher + httpfetch_init(g_settings->getS32("curl_parallel_limit")); + /* Run unit tests */ @@ -1858,6 +1862,9 @@ int main(int argc, char *argv[]) } } + // Stop httpfetch thread (if started) + httpfetch_cleanup(); + END_DEBUG_EXCEPTION_HANDLER(errorstream) debugstreams_deinit(); diff --git a/src/util/container.h b/src/util/container.h index fdd76cc5..e83c3cd3 100644 --- a/src/util/container.h +++ b/src/util/container.h @@ -297,7 +297,8 @@ public: if(!m_list.empty()) { - typename std::list::iterator last = m_list.back(); + typename std::list::iterator last = m_list.end(); + last--; T t = *last; m_list.erase(last); return t;