diff --git a/src/network/core/http_curl.cpp b/src/network/core/http_curl.cpp --- a/src/network/core/http_curl.cpp +++ b/src/network/core/http_curl.cpp @@ -17,6 +17,7 @@ #include "../network_internal.h" #include "http.h" +#include "http_shared.h" #include #include @@ -44,6 +45,11 @@ static auto _certificate_directories = { }; #endif /* UNIX */ +static std::vector _http_callbacks; +static std::vector _new_http_callbacks; +static std::mutex _http_callback_mutex; +static std::mutex _new_http_callback_mutex; + /** Single HTTP request. */ class NetworkHTTPRequest { public: @@ -59,11 +65,19 @@ public: callback(callback), data(data) { + std::lock_guard lock(_new_http_callback_mutex); + _new_http_callbacks.push_back(&this->callback); } - const std::string uri; ///< URI to connect to. - HTTPCallback *callback; ///< Callback to send data back on. - const std::string data; ///< Data to send, if any. + ~NetworkHTTPRequest() + { + std::lock_guard lock(_http_callback_mutex); + _http_callbacks.erase(std::remove(_http_callbacks.begin(), _http_callbacks.end(), &this->callback), _http_callbacks.end()); + } + + const std::string uri; ///< URI to connect to. + HTTPThreadSafeCallback callback; ///< Callback to send data back on. + const std::string data; ///< Data to send, if any. }; static std::thread _http_thread; @@ -92,6 +106,20 @@ static std::string _http_ca_path = ""; /* static */ void NetworkHTTPSocketHandler::HTTPReceive() { + std::lock_guard lock(_http_callback_mutex); + + { + std::lock_guard lock_new(_new_http_callback_mutex); + if (!_new_http_callbacks.empty()) { + /* We delay adding new callbacks, as HandleQueue() below might add a new callback. */ + _http_callbacks.insert(_http_callbacks.end(), _new_http_callbacks.begin(), _new_http_callbacks.end()); + _new_http_callbacks.clear(); + } + } + + for (auto &callback : _http_callbacks) { + callback->HandleQueue(); + } } void HttpThread() @@ -163,11 +191,16 @@ void HttpThread() /* Setup our (C-style) callback function which we pipe back into the callback. */ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, +[](char *ptr, size_t size, size_t nmemb, void *userdata) -> size_t { Debug(net, 4, "HTTP callback: {} bytes", size * nmemb); - HTTPCallback *callback = static_cast(userdata); - callback->OnReceiveData(ptr, size * nmemb); + HTTPThreadSafeCallback *callback = static_cast(userdata); + + /* Copy the buffer out of CURL. OnReceiveData() will free it when done. */ + std::unique_ptr buffer = std::make_unique(size * nmemb); + memcpy(buffer.get(), ptr, size * nmemb); + callback->OnReceiveData(std::move(buffer), size * nmemb); + return size * nmemb; }); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, request->callback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &request->callback); /* Create a callback from which we can cancel. Sadly, there is no other * thread-safe way to do this. If the connection went idle, it can take @@ -175,10 +208,10 @@ void HttpThread() * do about this. */ curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L); curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, +[](void *userdata, curl_off_t /*dltotal*/, curl_off_t /*dlnow*/, curl_off_t /*ultotal*/, curl_off_t /*ulnow*/) -> int { - const HTTPCallback *callback = static_cast(userdata); - return (callback->IsCancelled() || _http_thread_exit) ? 1 : 0; + const HTTPThreadSafeCallback *callback = static_cast(userdata); + return (callback->cancelled || _http_thread_exit) ? 1 : 0; }); - curl_easy_setopt(curl, CURLOPT_XFERINFODATA, request->callback); + curl_easy_setopt(curl, CURLOPT_XFERINFODATA, &request->callback); /* Perform the request. */ CURLcode res = curl_easy_perform(curl); @@ -187,15 +220,18 @@ void HttpThread() if (res == CURLE_OK) { Debug(net, 1, "HTTP request succeeded"); - request->callback->OnReceiveData(nullptr, 0); + request->callback.OnReceiveData(nullptr, 0); } else { long status_code = 0; curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status_code); /* No need to be verbose about rate limiting. */ - Debug(net, (request->callback->IsCancelled() || _http_thread_exit || status_code == HTTP_429_TOO_MANY_REQUESTS) ? 1 : 0, "HTTP request failed: status_code: {}, error: {}", status_code, curl_easy_strerror(res)); - request->callback->OnFailure(); + Debug(net, (request->callback.cancelled || _http_thread_exit || status_code == HTTP_429_TOO_MANY_REQUESTS) ? 1 : 0, "HTTP request failed: status_code: {}, error: {}", status_code, curl_easy_strerror(res)); + request->callback.OnFailure(); } + + /* Wait till the callback tells us all data is dequeued. */ + request->callback.WaitTillEmpty(); } curl_easy_cleanup(curl);