multiple concurrent threadsafe curl requests

This commit is contained in:
Sorunome 2019-10-28 14:37:31 +01:00
parent 4f37d6b23a
commit ec282dab0d
No known key found for this signature in database
GPG key ID: 63E31F7B5993A9C4
2 changed files with 48 additions and 191 deletions

View file

@ -38,19 +38,12 @@ enum struct RequestError : u8 {
timeout, timeout,
}; };
enum struct ForceRequest : u8 {
none,
curl,
httpc,
};
typedef void (*eventCallback)(std::string roomId, json_t* event); typedef void (*eventCallback)(std::string roomId, json_t* event);
typedef void (*roomInfoCallback)(std::string roomId, RoomInfo info); typedef void (*roomInfoCallback)(std::string roomId, RoomInfo info);
typedef void (*roomLimitedCallback)(std::string roomId, std::string prevBatch); typedef void (*roomLimitedCallback)(std::string roomId, std::string prevBatch);
class Client { class Client {
private: private:
public:
std::string hsUrl; std::string hsUrl;
std::string token; std::string token;
Store* store; Store* store;
@ -70,16 +63,14 @@ public:
void processSync(json_t* sync); void processSync(json_t* sync);
void registerFilter(); void registerFilter();
json_t* doSync(std::string token, std::string filter, u32 timeout); json_t* doSync(std::string token, std::string filter, u32 timeout);
void syncLoop(); json_t* doRequest(const char* method, std::string path, json_t* body = NULL, u32 timeout = 5);
json_t* doRequest(const char* method, std::string path, json_t* body = NULL, u32 timeout = 5, ForceRequest forceRequest = ForceRequest::none);
json_t* doRequestCurl(const char* method, std::string url, json_t* body, u32 timeout); json_t* doRequestCurl(const char* method, std::string url, json_t* body, u32 timeout);
json_t* doRequestHttpc(const char* method, std::string url, json_t* body, u32 timeout);
public: public:
Client(std::string homeserverUrl, std::string matrixToken = "", Store* clientStore = NULL); Client(std::string homeserverUrl, std::string matrixToken = "", Store* clientStore = NULL);
std::string getToken(); std::string getToken();
bool login(std::string username, std::string password); bool login(std::string username, std::string password);
void logout(); void logout();
std::string getUserId(ForceRequest forceRequest = ForceRequest::none); std::string getUserId();
std::string resolveRoom(std::string alias); std::string resolveRoom(std::string alias);
std::vector<std::string> getJoinedRooms(); std::vector<std::string> getJoinedRooms();
RoomInfo getRoomInfo(std::string roomId); RoomInfo getRoomInfo(std::string roomId);
@ -106,6 +97,7 @@ public:
void setInviteRoomCallback(eventCallback cb); void setInviteRoomCallback(eventCallback cb);
void setRoomInfoCallback(roomInfoCallback cb); void setRoomInfoCallback(roomInfoCallback cb);
void setRoomLimitedCallback(roomLimitedCallback cb); void setRoomLimitedCallback(roomLimitedCallback cb);
void syncLoop();
}; };
}; // namespace Matrix }; // namespace Matrix

View file

@ -10,6 +10,7 @@
#include "memorystore.h" #include "memorystore.h"
#include <string> #include <string>
#include <vector> #include <vector>
#include <map>
#include <sys/socket.h> #include <sys/socket.h>
@ -92,22 +93,11 @@ void Client::logout() {
} }
} }
std::string Client::getUserId(ForceRequest forceRequest) { std::string Client::getUserId() {
if (forceRequest == ForceRequest::none) {
if (userIdCache != "") { if (userIdCache != "") {
return userIdCache; return userIdCache;
} }
std::string userIdHttpc = getUserId(ForceRequest::httpc); json_t* ret = doRequest("GET", "/_matrix/client/r0/account/whoami");
std::string userIdCurl = getUserId(ForceRequest::curl);
haveHttpcSupport = userIdHttpc == userIdCurl;
if (haveHttpcSupport) {
printf_top("httpc support present\n");
} else {
printf_top("httpc support not present\n");
}
return userIdCurl;
}
json_t* ret = doRequest("GET", "/_matrix/client/r0/account/whoami", NULL, 5, forceRequest);
const char* userIdCStr = json_object_get_string_value(ret, "user_id"); const char* userIdCStr = json_object_get_string_value(ret, "user_id");
if (!userIdCStr) { if (!userIdCStr) {
if (ret) json_decref(ret); if (ret) json_decref(ret);
@ -726,6 +716,7 @@ json_t* Client::doSync(std::string token, std::string filter, u32 timeout) {
} }
size_t DoRequestWriteCallback(char *contents, size_t size, size_t nmemb, void *userp) { size_t DoRequestWriteCallback(char *contents, size_t size, size_t nmemb, void *userp) {
// printf_top("----\n%s\n", ((std::string*)userp)->c_str());
((std::string*)userp)->append((char*)contents, size * nmemb); ((std::string*)userp)->append((char*)contents, size * nmemb);
return size * nmemb; return size * nmemb;
} }
@ -733,50 +724,35 @@ size_t DoRequestWriteCallback(char *contents, size_t size, size_t nmemb, void *u
bool doingCurlRequest = false; bool doingCurlRequest = false;
bool doingHttpcRequest = false; bool doingHttpcRequest = false;
json_t* Client::doRequest(const char* method, std::string path, json_t* body, u32 timeout, ForceRequest forceRequest) { json_t* Client::doRequest(const char* method, std::string path, json_t* body, u32 timeout) {
std::string url = hsUrl + path; std::string url = hsUrl + path;
requestId++; requestId++;
if (forceRequest == ForceRequest::curl) { return doRequestCurl(method, url, body, timeout);
while(doingCurlRequest) { }
svcSleepThread((u64)1000000ULL);
CURLM* curl_multi_handle;
std::map<CURLM*, CURLcode> curl_handles_done;
Thread curl_multi_loop_thread;
void curl_multi_loop(void* p) {
int openHandles = 0;
while(true) {
CURLMcode mc = curl_multi_perform(curl_multi_handle, &openHandles);
if (mc != CURLM_OK) {
printf_top("curl multi fail: %u\n", mc);
} }
doingCurlRequest = true; // curl_multi_wait(curl_multi_handle, NULL, 0, 1000, &openHandles);
json_t* ret = doRequestCurl(method, url, body, timeout); if (!openHandles) {
doingCurlRequest = false; svcSleepThread((u64)1000000ULL * 100);
return ret;
} }
if (forceRequest == ForceRequest::httpc) { CURLMsg* msg;
while(doingHttpcRequest) { int msgsLeft;
svcSleepThread((u64)1000000ULL); while ((msg = curl_multi_info_read(curl_multi_handle, &msgsLeft))) {
if (msg->msg == CURLMSG_DONE) {
curl_handles_done[msg->easy_handle] = msg->data.result;
} }
doingHttpcRequest = true;
json_t* ret = doRequestHttpc(method, url, body, timeout);
doingHttpcRequest = false;
return ret;
} }
if (haveHttpcSupport) {
while(doingCurlRequest && doingHttpcRequest) {
svcSleepThread((u64)1000000ULL);
}
if (!doingCurlRequest) {
doingCurlRequest = true;
json_t* ret = doRequestCurl(method, url, body, timeout);
doingCurlRequest = false;
return ret;
} else {
doingHttpcRequest = true;
json_t* ret = doRequestHttpc(method, url, body, timeout);
doingHttpcRequest = false;
return ret;
}
} else {
while(doingCurlRequest) {
svcSleepThread((u64)1000000ULL);
}
doingCurlRequest = true;
json_t* ret = doRequestCurl(method, url, body, timeout);
doingCurlRequest = false;
return ret;
} }
} }
@ -791,10 +767,13 @@ json_t* Client::doRequestCurl(const char* method, std::string url, json_t* body,
if (socInit(SOC_buffer, POST_BUFFERSIZE) != 0) { if (socInit(SOC_buffer, POST_BUFFERSIZE) != 0) {
return NULL; return NULL;
} }
curl_multi_handle = curl_multi_init();
s32 prio = 0;
svcGetThreadPriority(&prio, CUR_THREAD_HANDLE);
curl_multi_loop_thread = threadCreate(curl_multi_loop, NULL, 8*1024, prio-1, -2, true);
} }
CURL* curl = curl_easy_init(); CURL* curl = curl_easy_init();
CURLcode res;
if (!curl) { if (!curl) {
printf_top("curl init failed\n"); printf_top("curl init failed\n");
return NULL; return NULL;
@ -824,9 +803,18 @@ json_t* Client::doRequestCurl(const char* method, std::string url, json_t* body,
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout); curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
curl_multi_add_handle(curl_multi_handle, curl);
while (curl_handles_done.count(curl) == 0) {
svcSleepThread((u64)1000000ULL * 1);
}
CURLcode res = curl_handles_done[curl];
curl_handles_done.erase(curl);
curl_multi_remove_handle(curl_multi_handle, curl);
// curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
// curl_easy_setopt(curl, CURLOPT_STDERR, stdout); // curl_easy_setopt(curl, CURLOPT_STDERR, stdout);
res = curl_easy_perform(curl);
curl_easy_cleanup(curl); curl_easy_cleanup(curl);
if (bodyStr) free(bodyStr); if (bodyStr) free(bodyStr);
if (res != CURLE_OK) { if (res != CURLE_OK) {
@ -837,7 +825,7 @@ json_t* Client::doRequestCurl(const char* method, std::string url, json_t* body,
return NULL; return NULL;
} }
// printf_top("%s\n", readBuffer.c_str()); // printf_top("++++\n%s\n", readBuffer.c_str());
printf_top("Body size: %d\n", readBuffer.length()); printf_top("Body size: %d\n", readBuffer.length());
json_error_t error; json_error_t error;
json_t* content = json_loads(readBuffer.c_str(), 0, &error); json_t* content = json_loads(readBuffer.c_str(), 0, &error);
@ -848,127 +836,4 @@ json_t* Client::doRequestCurl(const char* method, std::string url, json_t* body,
return content; return content;
} }
json_t* Client::doRequestHttpc(const char* method, std::string url, json_t* body, u32 timeout) {
printf_top("Opening Request %d with HTTPC\n%s\n", requestId, url.c_str());
if (!HTTPC_inited) {
if (httpcInit(POST_BUFFERSIZE) != 0) {
return NULL;
}
HTTPC_inited = true;
}
Result ret = 0;
httpcContext context;
HTTPC_RequestMethod methodReal = HTTPC_METHOD_GET;
u32 statusCode = 0;
u32 contentSize = 0, readsize = 0, size = 0;
u8* buf, *lastbuf;
if (strcmp(method, "GET") == 0) {
methodReal = HTTPC_METHOD_GET;
} else if (strcmp(method, "PUT") == 0) {
methodReal = HTTPC_METHOD_PUT;
} else if (strcmp(method, "POST") == 0) {
methodReal = HTTPC_METHOD_POST;
} else if (strcmp(method, "DELETE") == 0) {
methodReal = HTTPC_METHOD_DELETE;
}
do {
httpcOpenContext(&context, methodReal, url.c_str(), 1);
httpcSetSSLOpt(&context, SSLCOPT_DisableVerify);
httpcSetKeepAlive(&context, HTTPC_KEEPALIVE_ENABLED);
httpcAddRequestHeaderField(&context, "User-Agent", "3ds");
if (token != "") {
httpcAddRequestHeaderField(&context, "Authorization", ("Bearer " + token).c_str());
}
httpcAddRequestHeaderField(&context, "Connection", "Keep-Alive");
char* bodyStr = NULL;
if (body) {
httpcAddRequestHeaderField(&context, "Content-Type", "application/json");
bodyStr = json_dumps(body, JSON_ENSURE_ASCII | JSON_ESCAPE_SLASH);
httpcAddPostDataRaw(&context, (u32*)bodyStr, strlen(bodyStr));
}
ret = httpcBeginRequest(&context);
if (bodyStr) free(bodyStr);
if (ret) {
printf_top("Failed to perform request %lu\n", ret);
httpcCloseContext(&context);
return NULL;
}
httpcGetResponseStatusCode(&context, &statusCode);
if ((statusCode >= 301 && statusCode <= 303) || (statusCode >= 307 && statusCode <= 308)) {
char newUrl[0x100];
ret = httpcGetResponseHeader(&context, "Location", newUrl, 0x100);
url = std::string(newUrl);
}
} while ((statusCode >= 301 && statusCode <= 303) || (statusCode >= 307 && statusCode <= 308));
// printf_top("Status code: %lu\n", statusCode);
ret = httpcGetDownloadSizeState(&context, NULL, &contentSize);
if (ret != 0) {
printf_top("Failed getDownloadSizeState %lu\n", ret);
httpcCloseContext(&context);
return NULL;
}
// Start with a single page buffer
buf = (u8*)malloc(0x1000);
if (!buf) {
httpcCloseContext(&context);
return NULL;
}
u64 timeoutReal = timeout * 1000000000ULL;
do {
// This download loop resizes the buffer as data is read.
u64 timeStartMs = osGetTime();
ret = httpcDownloadDataTimeout(&context, buf+size, 0x1000, &readsize, timeoutReal);
u64 timeDifMs = osGetTime() - timeStartMs;
timeoutReal -= timeDifMs*1000000ULL;
size += readsize;
if (ret == (s32)HTTPC_RESULTCODE_DOWNLOADPENDING) {
lastbuf = buf; // Save the old pointer, in case realloc() fails.
buf = (u8*)realloc(buf, size + 0x1000);
if (!buf) {
httpcCloseContext(&context);
free(lastbuf);
return NULL;
}
}
} while (ret == (s32)HTTPC_RESULTCODE_DOWNLOADPENDING);
if (ret) {
printf_top("httpc res not ok %lu\n", ret);
// let's just assume it was a timeout...
// TODO: better detection
lastRequestError = RequestError::timeout;
httpcCloseContext(&context);
free(buf);
return NULL;
}
// Resize the buffer back down to our actual final size
lastbuf = buf;
buf = (u8*)realloc(buf, size + 1); // +1 for zero-termination
if (!buf) { // realloc() failed.
httpcCloseContext(&context);
free(lastbuf);
return NULL;
}
buf[size] = '\0'; // zero-terminate
httpcCloseContext(&context);
// printf_top("%s\n", buf);
printf_top("Body size: %lu\n", size);
json_error_t error;
json_t* content = json_loads((char*)buf, 0, &error);
free(buf);
if (!content) {
printf_top("Failed to parse json\n");
return NULL;
}
return content;
}
}; // namespace Matrix }; // namespace Matrix