Add TCP proxying to emsocket

This commit is contained in:
paradust7 2022-04-05 12:23:35 +00:00
parent 1e2e7d3b12
commit 1500883fa2
11 changed files with 1387 additions and 201 deletions

@ -1,6 +1,6 @@
add_library(emsocket emsocket.cpp)
add_library(emsocket emsocket.cpp emsocketctl.cpp ProxyLink.cpp VirtualSocket.cpp)
set_target_properties(emsocket PROPERTIES PUBLIC_HEADER "emsocket.h")
set_target_properties(emsocket PROPERTIES PUBLIC_HEADER "emsocket.h;emsocketctl.h")
INSTALL(TARGETS emsocket
LIBRARY DESTINATION lib
PUBLIC_HEADER DESTINATION include

38
src/emsocket/Link.h Normal file

@ -0,0 +1,38 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#include <functional>
namespace emsocket {
class Link {
public:
using Receiver = std::function<void(const void *, size_t)>;
virtual ~Link() { }
virtual void send(const void *data, size_t len) = 0;
};
} // namespace

239
src/emsocket/ProxyLink.cpp Normal file

@ -0,0 +1,239 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#define EMSOCKET_INTERNAL
#include <iostream>
#include <string>
#include <cassert>
#include <cstdint>
#include <memory.h>
#include <emscripten/websocket.h>
#include "VirtualSocket.h"
#include "ProxyLink.h"
#include "emsocketctl.h"
extern "C" {
// Callbacks to re-enter C/C++ from javascript
EMSCRIPTEN_KEEPALIVE
void proxylink_onopen(void *thisPtr);
EMSCRIPTEN_KEEPALIVE
void proxylink_onerror(void *thisPtr);
EMSCRIPTEN_KEEPALIVE
void proxylink_onclose(void *thisPtr);
EMSCRIPTEN_KEEPALIVE
void proxylink_onmessage(void *thisPtr, const void *buf, size_t n);
}
EM_JS(int, setup_proxylink_websocket, (const char* url, void *thisPtr), {
if (!self.hasOwnProperty('mywebsockets')) {
self.mywebsockets = [null];
self.w_proxylink_onopen = Module.cwrap('proxylink_onopen', null, ['number']);
self.w_proxylink_onerror = Module.cwrap('proxylink_onerror', null, ['number']);
self.w_proxylink_onclose = Module.cwrap('proxylink_onclose', null, ['number']);
self.w_proxylink_onmessage = Module.cwrap('proxylink_onmessage', null, ['number', 'number', 'number']);
}
const ws = new WebSocket(UTF8ToString(url));
const index = mywebsockets.length;
mywebsockets.push(ws);
ws.binaryType = "arraybuffer";
ws.onopen = (e) => {
w_proxylink_onopen(thisPtr);
};
ws.onerror = (e) => {
w_proxylink_onerror(thisPtr);
};
ws.onclose = (e) => {
w_proxylink_onclose(thisPtr);
};
ws.onmessage = (e) => {
var len = e.data.byteLength;
var buf = _malloc(len);
HEAPU8.set(new Uint8Array(e.data), buf);
w_proxylink_onmessage(thisPtr, buf, len);
_free(buf);
};
return index;
});
EM_JS(void, send_proxylink_websocket, (int index, const void *data, int len), {
const ws = mywebsockets[index];
if (ws) {
ws.send(new Uint8Array(HEAPU8.subarray(data, data + len)));
}
});
EM_JS(void, delete_proxylink_websocket, (int index), {
const ws = mywebsockets[index];
if (ws) {
delete mywebsockets[index];
ws.onopen = ws.onerror = ws.onclose = ws.onmessage = null;
ws.close();
}
});
static void* memdup(const void *data, size_t len) {
void* buf = malloc(len);
memcpy(buf, data, len);
return buf;
}
namespace emsocket {
class ProxyLink : public Link {
public:
ProxyLink() = delete;
ProxyLink(const ProxyLink &) = delete;
ProxyLink& operator=(const ProxyLink &) = delete;
ProxyLink(VirtualSocket *vs_, const std::string &proxyUrl, const SocketAddr &addr_, bool udp_)
: wsIndex(-1),
vs(vs_),
addr(addr_),
udp(udp_),
sentProxyRequest(false),
receivedProxyAuth(false)
{
//std::cerr << "Initialized proxy websocket" << std::endl;
emsocket_run_on_io_thread(true, [this, proxyUrl]() {
wsIndex = setup_proxylink_websocket(proxyUrl.c_str(), this);
});
assert(wsIndex > 0);
}
virtual ~ProxyLink() {
emsocket_run_on_io_thread(true, [this]() {
hangup();
});
}
// Called from external thread
virtual void send(const void *data, size_t len) {
// This can be called from another thread. Move it to the I/O thread.
int wsIndex_ = wsIndex;
void *dataCopy = memdup(data, len);
emsocket_run_on_io_thread(false, [wsIndex_, dataCopy, len]() {
send_proxylink_websocket(wsIndex_, dataCopy, len);
free(dataCopy);
});
}
public:
// Called from I/O thread
void onopen() {
// Send a proxy request
char buf[128];
sprintf(buf, "PROXY IPV4 %s %s %u", (udp ? "UDP" : "TCP"), addr.getIP().c_str(), addr.getPort());
send_proxylink_websocket(wsIndex, buf, strlen(buf));
//std::cerr << "Sent websocket PROXY handshake" << std::endl;
sentProxyRequest = true;
}
// Called from I/O thread
void onerror() {
//std::cerr << "ProxyLink got websocket error" << std::endl;
hangup();
}
// Called from I/O thread
void onclose() {
//std::cerr << "ProxyLink got websocket close" << std::endl;
hangup();
}
// Called from I/O thread
void onmessage(const void *buf, int n) {
if (!sentProxyRequest) {
//std::cerr << "ProxyLink got invalid message before proxy request" << std::endl;
hangup();
return;
}
if (!receivedProxyAuth) {
// Check for proxy auth
if (n > 16) {
//std::cerr << "ProxyLink unexpected auth message length (" << n << ")" << std::endl;
hangup();
return;
}
std::string response((const char*)buf, n);
if (response == "PROXY OK") {
receivedProxyAuth = true;
vs->linkConnected();
return;
}
//std::cerr << "ProxyLink received bad auth: '" << response << "' of length " << n << std::endl;
hangup();
return;
}
// Regular message
vs->linkReceived(addr, buf, n);
}
// Called from I/O thread
void hangup() {
delete_proxylink_websocket(wsIndex);
vs->linkShutdown();
}
private:
// vs is protected by vs_mutex
VirtualSocket *vs;
SocketAddr addr;
bool udp;
int wsIndex;
//EMSCRIPTEN_WEBSOCKET_T ws;
bool sentProxyRequest;
bool receivedProxyAuth;
};
Link* make_proxy_link(VirtualSocket* vs, const std::string &proxyUrl, const SocketAddr &addr, bool udp) {
return new ProxyLink(vs, proxyUrl, addr, udp);
}
} // namespace
using namespace emsocket;
EMSCRIPTEN_KEEPALIVE
void proxylink_onopen(void *thisPtr) {
return reinterpret_cast<ProxyLink*>(thisPtr)->onopen();
}
EMSCRIPTEN_KEEPALIVE
void proxylink_onerror(void *thisPtr) {
return reinterpret_cast<ProxyLink*>(thisPtr)->onerror();
}
EMSCRIPTEN_KEEPALIVE
void proxylink_onclose(void *thisPtr) {
return reinterpret_cast<ProxyLink*>(thisPtr)->onclose();
}
EMSCRIPTEN_KEEPALIVE
void proxylink_onmessage(void *thisPtr, const void *buf, size_t n) {
return reinterpret_cast<ProxyLink*>(thisPtr)->onmessage(buf, n);
}

35
src/emsocket/ProxyLink.h Normal file

@ -0,0 +1,35 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#include "SocketAddr.h"
#include "Link.h"
namespace emsocket {
class VirtualSocket;
Link* make_proxy_link(VirtualSocket* vs, const std::string &proxyUrl, const SocketAddr &addr, bool udp);
} // namespace

109
src/emsocket/SocketAddr.h Normal file

@ -0,0 +1,109 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#include <iostream>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string>
#include <cstring>
namespace emsocket {
class SocketAddr {
public:
SocketAddr() {
clear();
}
SocketAddr(const std::string &ip, uint16_t port) {
setIP(ip);
setPort(port);
}
SocketAddr(const struct sockaddr *addr, socklen_t addrlen) {
if (addr->sa_family == AF_INET && addrlen >= sizeof(sin)) {
memcpy(&sin, addr, sizeof(sin));
} else {
clear();
}
}
std::string getIP() const {
char buf[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(sin.sin_addr), buf, sizeof(buf));
return std::string(buf);
}
bool setIP(std::string ip) {
if (1 == inet_pton(AF_INET, ip.c_str(), &(sin.sin_addr))) {
return true; // success
}
return false; // fail
}
uint16_t getPort() const {
return ntohs(sin.sin_port);
}
void setPort(uint16_t port) {
sin.sin_port = htons(port);
}
SocketAddr(const SocketAddr &o) {
memcpy(&sin, &o.sin, sizeof(sin));
}
SocketAddr& operator=(const SocketAddr &o) {
memcpy(&sin, &o.sin, sizeof(sin));
return *this;
}
void clear() {
memset(&sin, 0, sizeof(sin));
}
bool isLocalHost() const {
return (sin.sin_addr.s_addr == htonl(INADDR_LOOPBACK));
}
void copyTo(struct sockaddr *addr, socklen_t *addr_len) const {
if (!addr) {
return;
}
memcpy(addr, &sin, std::min((size_t)*addr_len, sizeof(sin)));
*addr_len = sizeof(sin);
}
private:
sockaddr_in sin;
bool valid = true;
};
static inline std::ostream& operator<<(std::ostream &os, const SocketAddr &addr) {
os << addr.getIP() << ":" << addr.getPort();
return os;
}
} // namespace

@ -0,0 +1,268 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#define EMSOCKET_INTERNAL
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <random>
#include <cassert>
#include <emscripten/threading.h>
#include "VirtualSocket.h"
#include "ProxyLink.h"
namespace emsocket {
// This mutex protects:
// 1) Allocating a new socket
// 2) Adding/removing a port binding
// 3) Adding/removing packets from a receive buffer
//
// The VirtualSocket's themselves are not protected by a mutex.
// POSIX sockets are not thread-safe, so it is assumed that
// every fd will be used only by a single thread at a time.
//
static std::mutex vs_mutex;
static std::condition_variable vs_event;
VirtualSocket VirtualSocket::sockets[EMSOCKET_NSOCKETS];
static std::unordered_map<uint16_t, VirtualSocket*> vs_port_map;
#define VSLOCK() const std::lock_guard<std::mutex> lock(vs_mutex)
static std::random_device rd;
void VirtualSocket::open(int fd, bool udp) {
assert(fd_ == -1);
reset();
fd_ = fd;
is_udp = udp;
}
void VirtualSocket::reset() {
fd_ = -1;
is_connected = false;
is_shutdown = false;
is_blocking = true;
bindAddr.clear();
remoteAddr.clear();
recvbuf.clear();
link = nullptr;
}
void VirtualSocket::close() {
VSLOCK();
uint16_t port = bindAddr.getPort();
if (link) {
delete link;
link = nullptr;
}
if (port != 0) {
vs_port_map.erase(port);
}
reset();
}
void VirtualSocket::linkConnected() {
is_connected = true;
vs_event.notify_all();
}
void VirtualSocket::linkShutdown() {
is_shutdown = true;
vs_event.notify_all();
}
void VirtualSocket::linkReceived(const SocketAddr& addr, const void *buf, size_t n) {
{
VSLOCK();
recvbuf.emplace_back(addr, buf, n);
}
vs_event.notify_all();
}
VirtualSocket* VirtualSocket::allocate(bool udp) {
VSLOCK();
for (int idx = 0; idx < EMSOCKET_NSOCKETS; idx++) {
if (sockets[idx].fd_ == -1) {
sockets[idx].open(EMSOCKET_BASE_FD + idx, udp);
return &sockets[idx];
}
}
return nullptr;
}
VirtualSocket* VirtualSocket::get(int fd) {
if (fd < EMSOCKET_BASE_FD || fd >= EMSOCKET_MAX_FD) {
return nullptr;
}
int idx = fd - EMSOCKET_BASE_FD;
VirtualSocket* vs = &sockets[idx];
assert(vs && vs->fd_ == fd);
return vs;
}
bool VirtualSocket::bind(const SocketAddr& addr) {
assert(bindAddr.getPort() == 0);
VSLOCK();
// TODO: Separate out TCP and UDP ports?
uint16_t port = addr.getPort();
if (port == 0) {
std::default_random_engine engine(rd());
std::uniform_int_distribution<int> randport(4096, 16384);
do {
port = randport(engine);
} while (vs_port_map.count(port));
} else if (vs_port_map.count(port)) {
return false;
}
bindAddr = addr;
bindAddr.setPort(port);
vs_port_map[port] = this;
return true;
}
bool VirtualSocket::canBlock() const {
// Out of necessity, sockets in the main browser thread will
// always act like non-blocking sockets.
return is_blocking; // && !emscripten_is_main_browser_thread();
}
void VirtualSocket::waitForData() {
VirtualSocket::waitFor([&]() {
return !recvbuf.empty();
}, -1);
}
void VirtualSocket::waitForConnect() {
VirtualSocket::waitFor([&]() {
return isConnected() || isShutdown();
}, -1);
}
bool VirtualSocket::runWithLock(const std::function<bool(void)>& predicate) {
std::unique_lock<std::mutex> lock(vs_mutex);
return predicate();
}
void VirtualSocket::waitFor(const std::function<bool(void)>& predicate, int64_t timeout) {
std::unique_lock<std::mutex> lock(vs_mutex);
if (timeout < 0) {
vs_event.wait(lock, predicate);
} else {
vs_event.wait_for(lock, std::chrono::milliseconds(timeout), predicate);
}
}
bool VirtualSocket::startLocalConnect(uint16_t port) {
std::cerr << "emsocket local TCP not yet supported" << std::endl;
return false;
}
bool VirtualSocket::startProxyConnect(const std::string &proxyUrl, const SocketAddr &dest) {
assert(!is_udp);
assert(!link);
assert(!is_connected);
assert(!is_shutdown);
assert(proxyUrl.size() > 0);
if (!isBound()) {
bind(SocketAddr()); // bind to random port
}
remoteAddr = dest;
link = make_proxy_link(this, proxyUrl, dest, is_udp);
return true;
}
// Stream read/write. Always non-blocking.
ssize_t VirtualSocket::read(void *buf, size_t n) {
assert(!is_udp);
VSLOCK();
char *cbuf = (char*)buf;
size_t pos = 0;
while (!recvbuf.empty() && pos < n) {
Packet pkt = std::move(recvbuf.front());
recvbuf.pop_front();
size_t take = std::min(n - pos, pkt.data.size());
memcpy(&cbuf[pos], &pkt.data[0], take);
pos += take;
if (take < pkt.data.size()) {
recvbuf.emplace_front(pkt.from, &pkt.data[take], pkt.data.size() - take);
break;
}
}
return pos;
}
void VirtualSocket::write(const void *buf, size_t n) {
assert(!is_udp);
assert(is_connected);
if (link) {
link->send(buf, n);
}
}
// Datagram read/write. Always non-blocking
ssize_t VirtualSocket::recvfrom(void *buf, size_t n, SocketAddr *from) {
assert(is_udp);
VSLOCK();
char *cbuf = (char*)buf;
if (!recvbuf.empty()) {
Packet pkt = std::move(recvbuf.front());
recvbuf.pop_front();
size_t take = std::min(n, pkt.data.size());
memcpy(&cbuf[0], &pkt.data[0], take);
*from = pkt.from;
return take;
}
return 0;
}
void VirtualSocket::sendto(const void *buf, size_t n, const SocketAddr& to) {
assert(is_udp);
assert(isBound());
SocketAddr sourceAddr("127.0.0.1", bindAddr.getPort());
if (to.isLocalHost()) {
bool sent = false;
{
VSLOCK();
uint16_t port = to.getPort();
auto it = vs_port_map.find(port);
if (it != vs_port_map.end()) {
it->second->recvbuf.emplace_back(sourceAddr, buf, n);
sent = true;
}
}
if (sent) {
vs_event.notify_all();
} else {
std::cerr << "sendto going nowhere" << std::endl;
}
return;
}
// Proxying to external UDP not implemented yet
abort();
}
} // namespace

@ -0,0 +1,118 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#include <vector>
#include <list>
#include <string>
#include <mutex>
#include <atomic>
#include "SocketAddr.h"
#include "Link.h"
// This must match the same defines in emsocket.h
#define EMSOCKET_BASE_FD 512
#define EMSOCKET_MAX_FD 1024
#define EMSOCKET_NSOCKETS (EMSOCKET_MAX_FD - EMSOCKET_BASE_FD)
namespace emsocket {
struct Packet {
SocketAddr from;
std::vector<char> data;
Packet() = delete;
Packet(const Packet &) = delete;
Packet& operator=(const Packet &) = delete;
Packet(Packet &&p) : from(p.from), data(std::move(p.data)) { }
Packet(const SocketAddr &from_, const void *buf, size_t len)
: from(from_), data((char*)buf, ((char*)buf) + len) { }
};
class VirtualSocket {
public:
static VirtualSocket* allocate(bool udp);
static VirtualSocket* get(int fd);
int fd() const { return fd_; }
bool isUDP() const { return is_udp; }
bool isBlocking() const { return is_blocking; }
void setBlocking(bool value) { is_blocking = value; }
bool isBound() const { return bindAddr.getPort() != 0; }
bool bind(const SocketAddr &addr);
const SocketAddr& getBindAddr() { return bindAddr; }
const SocketAddr& getRemoteAddr() { return remoteAddr; }
bool isConnected() const { return is_connected; }
bool isShutdown() const { return is_shutdown; }
// This should only be called holding the lock
// (e.g. inside the predicate for waitFor)
bool hasData() const { return !recvbuf.empty(); }
bool canBlock() const;
void waitForData();
void waitForConnect();
static bool runWithLock(const std::function<bool(void)>& predicate);
static void waitFor(const std::function<bool(void)>& predicate, int64_t timeout);
bool startLocalConnect(uint16_t port);
bool startProxyConnect(const std::string &proxyUrl, const SocketAddr &dest);
// Stream read/write. Always non-blocking.
ssize_t read(void *buf, size_t n);
void write(const void *buf, size_t n);
// Datagram read/write. Always non-blocking
ssize_t recvfrom(void *buf, size_t n, SocketAddr *from);
void sendto(const void *buf, size_t n, const SocketAddr& to);
void close();
void linkConnected();
void linkShutdown();
void linkReceived(const SocketAddr &addr, const void *buf, size_t n);
private:
VirtualSocket()
: fd_(-1) {
}
VirtualSocket(const VirtualSocket &) = delete;
VirtualSocket& operator=(const VirtualSocket &) = delete;
void reset();
void open(int fd, bool udp);
int fd_;
bool is_udp;
bool is_blocking;
std::atomic<bool> is_connected;
std::atomic<bool> is_shutdown;
SocketAddr bindAddr;
SocketAddr remoteAddr;
std::list<Packet> recvbuf;
Link *link;
static VirtualSocket sockets[EMSOCKET_NSOCKETS];
};
} // namespace

@ -22,6 +22,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#define EMSOCKET_INTERNAL
#include <list>
#include <netinet/in.h>
#include <cassert>
@ -30,8 +32,6 @@ SOFTWARE.
#include <cstdlib>
#include <cstring>
#include <iomanip>
#include <chrono>
#include <random>
#include <unordered_map>
#include <cerrno>
#include <mutex>
@ -44,202 +44,270 @@ SOFTWARE.
#include <unistd.h>
#include <arpa/inet.h>
#define EMSOCKET_INTERNAL
#include "VirtualSocket.h"
#include "emsocket.h"
// Browsers don't allow websocket servers, so simulate servers on `localhost` with a buffer instead.
static bool is_localhost(const struct sockaddr *addr, socklen_t addrlen) {
assert(addr->sa_family == AF_INET);
if (((sockaddr_in*)addr)->sin_addr.s_addr == htonl(INADDR_LOOPBACK)) {
return true;
}
return false;
using namespace emsocket;
namespace emsocket {
extern std::string currentProxyUrl;
}
static uint16_t get_port(const struct sockaddr *addr, socklen_t addrlen) {
assert(addr->sa_family == AF_INET);
return ntohs(((sockaddr_in*)addr)->sin_port);
}
#if EMSOCKET_DEBUG
// sockfd -> receive packet buffer
struct pkt {
uint16_t source_port;
std::vector<char> data;
std::mutex dbg_mutex;
#define DBG(x) do { \
const std::lock_guard<std::mutex> lock(dbg_mutex); \
x \
} while(0)
pkt() = delete;
pkt(const pkt &) = delete;
pkt& operator=(const pkt &) = delete;
#else
pkt(uint16_t port, const char *buf, size_t len)
: source_port(port), data(&buf[0], &buf[len]) { }
};
#define DBG(x) do { } while (0)
struct VirtualSocket {
std::mutex mutex;
std::condition_variable cv;
bool open;
uint16_t sport;
std::list<pkt> recvbuf;
VirtualSocket() {
reset(false);
}
void reset(bool open_) {
open = open_;
sport = 0;
recvbuf.clear();
}
};
// Protects the maps and id generation
static std::mutex mutex;
static std::vector<VirtualSocket*> socket_map;
static std::unordered_map<uint16_t, VirtualSocket*> port_map;
static unsigned int seed = 0;
static std::default_random_engine generator;
static std::uniform_int_distribution<uint16_t> randport(4096, 32000);
// Must be called while holding mutex
static uint16_t random_port() {
if (seed == 0) {
seed = std::chrono::system_clock::now().time_since_epoch().count();
generator.seed(seed);
}
uint16_t port;
do {
port = randport(generator);
} while (port_map.count(port) != 0);
return port;
}
static inline void maybe_init_socket_map() {
if (socket_map.size() == 0) {
int count = EMSOCKET_MAX_FD - EMSOCKET_BASE_FD;
for (int idx = 0; idx < count; idx++) {
socket_map.push_back(new VirtualSocket());
}
}
}
// Must be called holding the mutex
static VirtualSocket *getvs(int fd) {
assert(fd >= EMSOCKET_BASE_FD && fd < EMSOCKET_MAX_FD);
maybe_init_socket_map();
int idx = fd - EMSOCKET_BASE_FD;
auto vs = socket_map[idx];
assert(vs && vs->open);
return vs;
}
#endif
/**************************************************************************************************/
#define VLOCK() const std::lock_guard<std::mutex> lock(mutex)
int emsocket_socket(int domain, int type, int protocol) {
VLOCK();
maybe_init_socket_map();
int idx = 0;
for (; idx < socket_map.size(); idx++) {
if (!socket_map[idx]->open) {
break;
}
DBG(std::cerr << "emsocket_socket " << domain << "," << type << "," << protocol << std::endl;);
if (domain != AF_INET) {
DBG(std::cerr << "emsocket_socket bad domain: " << domain << std::endl;);
errno = EINVAL;
return -1;
}
if (idx == socket_map.size()) {
if (type != SOCK_STREAM && type != SOCK_DGRAM) {
DBG(std::cerr << "emsocket_socket bad type: " << type << std::endl;);
errno = EINVAL;
return -1;
}
if (protocol == 0) {
if (type == SOCK_STREAM) protocol = IPPROTO_TCP;
if (type == SOCK_DGRAM) protocol = IPPROTO_UDP;
}
if (type == SOCK_DGRAM && protocol != IPPROTO_UDP) {
DBG(std::cerr << "emsocket_socket bad dgram protocol: " << protocol << std::endl;);
errno = EINVAL;
return -1;
}
if (type == SOCK_STREAM && protocol != IPPROTO_TCP) {
DBG(std::cerr << "emsocket_socket bad stream protocol: " << protocol << std::endl;);
errno = EINVAL;
return -1;
}
bool is_udp = (type == SOCK_DGRAM);
auto vs = VirtualSocket::allocate(is_udp);
if (!vs) {
errno = EMFILE;
return -1;
}
socket_map[idx]->reset(true);
return EMSOCKET_BASE_FD + idx;
DBG(std::cerr << "emsocket_socket returns fd=" << vs->fd() << ", udp=" << vs->isUDP() << std::endl;);
return vs->fd();
}
int emsocket_socketpair(int domain, int type, int protocol, int fds[2]);
int emsocket_bind(int fd, const struct sockaddr *addr, socklen_t len) {
VLOCK();
auto vs = getvs(fd);
assert(vs->sport == 0);
uint16_t port = get_port(addr, len);
if (port == 0) {
port = random_port();
}
if (port_map.count(port)) {
auto vs = VirtualSocket::get(fd);
if (!vs->bind(SocketAddr(addr, len))) {
DBG(std::cerr << "emsocket_bind failed" << std::endl;);
errno = EADDRINUSE;
return -1;
}
vs->sport = port;
port_map[port] = vs;
DBG(std::cerr << "emsocket_bind success fd=" << fd << " bindAddr=" << vs->getBindAddr() << std::endl;);
return 0;
}
int emsocket_getsockname(int fd, struct sockaddr *addr, socklen_t *len);
int emsocket_getsockname(int fd, struct sockaddr *addr, socklen_t *len) {
DBG(std::cerr << "emsocket_getsockname fd=" << fd << std::endl;);
auto vs = VirtualSocket::get(fd);
vs->getBindAddr().copyTo(addr, len);
DBG(std::cerr << " --> " << vs->getBindAddr() << std::endl;);
return 0;
}
int emsocket_connect(int fd, const struct sockaddr *addr, socklen_t len);
int emsocket_connect(int fd, const struct sockaddr *addr, socklen_t len) {
SocketAddr dest(addr, len);
DBG(std::cerr << "emsocket_connect: fd=" << fd << ", " << dest << std::endl;);
auto vs = VirtualSocket::get(fd);
if (vs->isUDP()) {
// connect() on a UDP socket actually has a particular meaning...
// but this is not implemented here.
DBG(std::cerr << "emsocket_connect: Unexpected UDP" << std::endl;);
errno = EPROTOTYPE;
return -1;
}
if (vs->isConnected() || vs->isShutdown()) {
DBG(std::cerr << "emsocket_connect: Already connected or shutdown" << std::endl;);
errno = EISCONN;
return -1;
}
if (dest.isLocalHost()) {
if (!vs->startLocalConnect(dest.getPort())) {
DBG(std::cerr << "emsocket_connect: StartLocalConnect failed" << std::endl;);
errno = ECONNREFUSED;
return -1;
}
} else if (currentProxyUrl.size() > 0) {
if (!vs->startProxyConnect(currentProxyUrl, dest)) {
DBG(std::cerr << "emsocket_connect: StartProxyConnect failed" << std::endl;);
errno = ECONNREFUSED;
return -1;
}
} else {
DBG(std::cerr << "emsocket_connect: Unreachable address" << std::endl;);
errno = ENETUNREACH;
return -1;
}
int emsocket_getpeername(int fd, struct sockaddr *addr, socklen_t *len);
if (vs->isConnected() && !vs->isShutdown()) {
return 0;
}
ssize_t emsocket_send(int fd, const void *buf, size_t n, int flags);
if (!vs->canBlock()) {
DBG(std::cerr << "emsocket_connect: Connection in progress" << std::endl;);
errno = EINPROGRESS;
return -1;
}
vs->waitForConnect();
if (!vs->isConnected() || vs->isShutdown()) {
DBG(std::cerr << "emsocket_connect: Connection failed after wait" << std::endl;);
errno = ECONNREFUSED;
return -1;
}
return 0;
}
ssize_t emsocket_recv(int fd, void *buf, size_t n, int flags);
int emsocket_getpeername(int fd, struct sockaddr *addr, socklen_t *len) {
DBG(std::cerr << "emsocket_getpeername: fd=" << fd << std::endl;);
auto vs = VirtualSocket::get(fd);
if (!vs->isConnected()) {
errno = ENOTCONN;
return -1;
}
vs->getRemoteAddr().copyTo(addr, len);
return 0;
}
ssize_t emsocket_send(int fd, const void *buf, size_t n, int flags) {
DBG(std::cerr << "emsocket_send: fd=" << fd << ", n = " << n << std::endl;);
auto vs = VirtualSocket::get(fd);
if (!vs->isConnected() || vs->isShutdown()) {
DBG(std::cerr << " --> not connected" << std::endl;);
errno = ENOTCONN;
return -1;
}
flags &= ~MSG_NOSIGNAL; // no signals anyway
if (flags != 0) {
DBG(std::cerr << "Unsupported flags in emsocket_send: " << flags << std::endl;);
errno = EOPNOTSUPP;
return -1;
}
vs->write(buf, n);
return n;
}
ssize_t emsocket_recv(int fd, void *buf, size_t n, int flags) {
DBG(std::cerr << "emsocket_recv: fd=" << fd << ", n=" << n << std::endl;);
auto vs = VirtualSocket::get(fd);
if (flags != 0) {
DBG(std::cerr << "Unsupported flags in emsocket_recv: " << flags << std::endl;);
errno = EOPNOTSUPP;
return -1;
}
// If there's data ready to go, give it back.
ssize_t bytes = vs->read(buf, n);
if (bytes > 0) {
DBG(std::cerr << " --> read " << bytes << std::endl;);
return bytes;
}
if (!vs->isConnected() || vs->isShutdown()) {
DBG(std::cerr << " --> not connected or shutdown" << std::endl;);
errno = ENOTCONN;
return -1;
}
if (!vs->canBlock()) {
DBG(std::cerr << " --> would block" << std::endl;);
errno = EWOULDBLOCK;
return -1;
}
vs->waitForData();
bytes = vs->read(buf, n);
DBG(std::cerr << " --> read " << bytes << std::endl;);
return bytes;
}
ssize_t emsocket_sendto(int fd, const void *buf, size_t n, int flags, const struct sockaddr *addr, socklen_t addr_len) {
if (!is_localhost(addr, addr_len)) {
// Sending to other than localhost not yet implemented
return 0;
}
uint16_t source_port;
uint16_t dest_port = get_port(addr, addr_len);
VirtualSocket* dest_vs = nullptr;
{
VLOCK();
source_port = getvs(fd)->sport;
auto it = port_map.find(dest_port);
if (it != port_map.end()) {
dest_vs = it->second;
}
}
assert(source_port && "sendto before bind()");
if (!dest_vs) {
// Nothing is listening on localhost?
return 0;
}
// Lock destination vs
{
const std::lock_guard<std::mutex> lock(dest_vs->mutex);
dest_vs->recvbuf.emplace_back(source_port, (const char*)buf, n);
}
dest_vs->cv.notify_all();
return n;
DBG(std::cerr << "emsocket_sendto fd=" << fd << ", n=" << n << std::endl;);
if (addr == NULL || addr_len < sizeof(sockaddr_in)) {
DBG(std::cerr << " --> EDESTADDRREQ" << std::endl;);
errno = EDESTADDRREQ;
return -1;
}
SocketAddr dest(addr, addr_len);
auto vs = VirtualSocket::get(fd);
if (flags != 0) {
DBG(std::cerr << " --> EOPNOTSUPP" << std::endl;);
errno = EOPNOTSUPP;
return -1;
}
if (!vs->isUDP()) {
if (addr != NULL || addr_len != 0) {
DBG(std::cerr << " --> EISCONN" << std::endl;);
errno = EISCONN;
return -1;
}
DBG(std::cerr << " --> forwarding to emsocket_send" << std::endl;);
return emsocket_send(fd, buf, n, flags);
}
if (!vs->isBound()) {
DBG(std::cerr << " --> autobinding" << std::endl;);
// Autobind to random port
if (!vs->bind(SocketAddr())) {
DBG(std::cerr << " --> EADDRINUSE" << std::endl;);
errno = EADDRINUSE;
return -1;
}
}
vs->sendto(buf, n, dest);
DBG(std::cerr << " --> sent" << std::endl;);
return n;
}
ssize_t emsocket_recvfrom(int fd, void *buf, size_t n, int flags, struct sockaddr *addr, socklen_t *addr_len) {
VirtualSocket *vs;
{
VLOCK();
vs = getvs(fd);
}
// For now, this should never be called in a blocking situation.
assert(vs->recvbuf.size() > 0);
const std::lock_guard<std::mutex> lock(vs->mutex);
const pkt &p = vs->recvbuf.front();
ssize_t written = std::min(p.data.size(), n);
bool truncated = (written != (ssize_t)p.data.size());
memcpy(buf, &p.data[0], written);
if (addr) {
struct sockaddr_in ai = {0};
ai.sin_family = AF_INET;
ai.sin_port = htons(p.source_port);
ai.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
memcpy(addr, &ai, std::min((size_t)*addr_len, sizeof(ai)));
*addr_len = sizeof(ai);
}
vs->recvbuf.pop_front();
if (truncated) errno = EMSGSIZE;
//std::cout << "sockfd=" << sockfd << " Received packet of size " << written << std::endl;
return written;
DBG(std::cerr << "emsocket_recvfrom fd=" << fd << ", n = " << n << std::endl;);
auto vs = VirtualSocket::get(fd);
if (flags != 0) {
DBG(std::cerr << "emsocket: unsupported flags in recvfrom" << std::endl;);
errno = EOPNOTSUPP;
return -1;
}
if (!vs->isUDP()) {
DBG(std::cerr << " --> forwarding to emsocket_recv" << std::endl;);
vs->getRemoteAddr().copyTo(addr, addr_len);
return emsocket_recv(fd, buf, n, 0);
}
// Common case: UDP
SocketAddr dest;
ssize_t bytes = vs->recvfrom(buf, n, &dest); // nonblock
if (bytes > 0) {
DBG(std::cerr << " --> got " << bytes << " from " << dest << std::endl;);
dest.copyTo(addr, addr_len);
return bytes;
}
if (!vs->canBlock()) {
DBG(std::cerr << " --> EWOULDBLOCK" << std::endl;);
errno = EWOULDBLOCK;
return -1;
}
vs->waitForData();
bytes = vs->recvfrom(buf, n, &dest);
dest.copyTo(addr, addr_len);
DBG(std::cerr << " --> got " << bytes << " from " << dest << " after wait" << std::endl;);
return bytes;
}
ssize_t emsocket_sendmsg(int fd, const struct msghdr *message, int flags);
int emsocket_sendmmsg(int fd, struct mmsghdr *vmessages, unsigned int vlen, int flags);
@ -248,15 +316,44 @@ ssize_t emsocket_recvmsg(int fd, struct msghdr *message, int flags);
int emsocket_recvmmsg(int fd, struct mmsghdr *vmessages, unsigned int vlen, int flags, struct timespec *tmo);
int emsocket_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen);
int emsocket_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen) {
return -1;
int emsocket_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen) {
std::cerr << "emsocket_getsockopt: level=" << level << ", optname=" << optname << std::endl;
if (level == SOL_SOCKET) {
if (optname == SO_ERROR) {
if (optval && optlen && *optlen == sizeof(int)) {
int *val = (int*)optval;
auto vs = VirtualSocket::get(fd);
if (!vs->isConnected() || vs->isShutdown()) {
*val = ECONNREFUSED;
} else {
*val = 0;
}
return 0;
} else {
errno = EINVAL;
return -1;
}
} else {
errno = ENOPROTOOPT;
return -1;
}
}
errno = ENOPROTOOPT;
return -1;
}
int emsocket_listen(int fd, int n);
int emsocket_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen) {
std::cerr << "emsocket_setsockopt: level=" << level << ", optname=" << optname << std::endl;
return -1;
}
int emsocket_accept(int fd, struct sockaddr *addr, socklen_t *addr_len);
int emsocket_listen(int fd, int n) {
abort();
}
int emsocket_accept(int fd, struct sockaddr *addr, socklen_t *addr_len) {
abort();
}
int emsocket_accept4(int fd, struct sockaddr *addr, socklen_t *addr_len, int flags);
@ -266,9 +363,14 @@ int emsocket_sockatmark(int fd);
int emsocket_isfdtype(int fd, int fdtype);
int emsocket_getaddrinfo(const char *node, const char *service, const struct addrinfo *hints, struct addrinfo **res);
int emsocket_getaddrinfo(const char *node, const char *service, const struct addrinfo *hints, struct addrinfo **res) {
/* There's no DNS lookup function in javascript, so it would need to be provided */
return EAI_FAIL;
}
void emsocket_freeaddrinfo(struct addrinfo *res);
void emsocket_freeaddrinfo(struct addrinfo *res) {
return;
}
const char* emsocket_gai_strerror(int errcode);
@ -296,34 +398,139 @@ int emsocket_gethostbyname_r(const char *name, struct hostent *ret, char *buf, s
int emsocket_gethostbyname2_r(const char *name, int af, struct hostent *ret, char *buf, size_t buflen, struct hostent **result, int *h_errnop);
// Cheap hack
// Only supports select on a single fd
int emsocket_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) {
int sockfd = nfds - 1;
assert(FD_ISSET(sockfd, readfds));
VirtualSocket *vs;
{
VLOCK();
vs = getvs(sockfd);
}
std::unique_lock<std::mutex> lock(vs->mutex);
#define MAX_SELECT_FDS 64
if (timeout == NULL) {
vs->cv.wait(lock, [&]{ return vs->recvbuf.size() > 0; });
} else {
long ms = (timeout->tv_sec * 1000) + (timeout->tv_usec / 1000);
auto cvtimeout = std::chrono::milliseconds(ms);
vs->cv.wait_for(lock, cvtimeout, [&]{ return vs->recvbuf.size() > 0; });
}
if (vs->recvbuf.size() == 0) {
return 0;
}
return 1;
static void print_fd_set(int nfds, fd_set *x) {
if (x == NULL) {
std::cerr << "NULL";
} else {
std::cerr << "[ ";
for (int fd = 0; fd < nfds; fd++) {
if (FD_ISSET(fd, x)) {
std::cerr << fd << " ";
}
}
std::cerr << "]";
}
}
int emsocket_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) {
#if EMSOCKET_DEBUG
DBG(
std::cerr << "emsocket_select nfds=" << nfds;
std::cerr << ", readfds="; print_fd_set(nfds, readfds);
std::cerr << ", writefs="; print_fd_set(nfds, writefds);
std::cerr << ", exceptfds="; print_fd_set(nfds, exceptfds);
std::cerr << ", timeout=";
if (timeout) {
std::cerr << timeout->tv_sec << "s " << timeout->tv_usec << "us";
} else {
std::cerr << "NULL";
}
std::cerr << std::endl;
);
#endif
// Convert to a call to poll() instead
struct pollfd fds[MAX_SELECT_FDS] = { 0 };
nfds_t count = 0;
for (int fd = 0; fd < nfds; fd++) {
bool check_read = readfds && FD_ISSET(fd, readfds);
bool check_write = writefds && FD_ISSET(fd, writefds);
bool check_except = exceptfds && FD_ISSET(fd, exceptfds);
if (check_read || check_write || check_except) {
if (count == MAX_SELECT_FDS) {
DBG(std::cerr << "emsocket select() called with too many fds" << std::endl;);
errno = EINVAL;
return -1;
}
fds[count].fd = fd;
fds[count].events = (check_read ? POLLIN : 0) | (check_write ? POLLOUT : 0);
count++;
}
}
if (readfds) FD_ZERO(readfds);
if (writefds) FD_ZERO(writefds);
if (exceptfds) FD_ZERO(exceptfds);
int poll_timeout = -1;
if (timeout) {
poll_timeout = (timeout->tv_sec * 1000) + (timeout->tv_usec / 1000);
}
int ret = emsocket_poll(fds, count, poll_timeout);
if (ret <= 0) {
//DBG(std::cerr << " --> returning " << ret << std::endl;);
return ret;
}
int bitcount = 0;
for (int i = 0; i < count; i++) {
int fd = fds[i].fd;
short revents = fds[i].revents;
if (readfds && (revents & (POLLIN|POLLHUP))) {
bitcount++;
FD_SET(fd, readfds);
}
if (writefds && (revents & POLLOUT)) {
bitcount++;
FD_SET(fd, writefds);
}
if (exceptfds && (revents & (POLLERR|POLLHUP))) {
bitcount++;
FD_SET(fd, exceptfds);
}
}
#if EMSOCKET_DEBUG
DBG(
std::cerr << " --> returning " << bitcount;
std::cerr << ", readfds="; print_fd_set(nfds, readfds);
std::cerr << ", writefs="; print_fd_set(nfds, writefds);
std::cerr << ", exceptfds="; print_fd_set(nfds, exceptfds);
std::cerr << std::endl;
);
#endif
return bitcount;
}
int emsocket_pselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask);
int emsocket_poll(struct pollfd *fds, nfds_t nfds, int timeout);
#define MAX_POLL_FDS 64
int emsocket_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
int count = 0;
std::function<bool(void)> predicate = [&]() {
count = 0;
for (int i = 0; i < nfds; i++) {
if (fds[i].fd < 0) continue;
auto vs = VirtualSocket::get(fds[i].fd);
short revents = 0;
if (fds[i].events & POLLIN) {
if (vs->hasData()) {
revents |= POLLIN;
}
if (vs->isShutdown()) {
revents |= POLLIN | POLLHUP;
}
}
if (fds[i].events & POLLOUT) {
if (vs->isUDP() || vs->isConnected() || vs->isShutdown()) {
revents |= POLLOUT;
}
}
fds[i].revents = revents;
if (revents) {
count += 1;
}
}
// Keep waiting until count > 0
return (count > 0);
};
//if (emscripten_is_main_browser_thread()) {
// //DBG(std::cerr << "emsocket_poll fast exit due to being on main thread" << std::endl;);
// VirtualSocket::runWithLock(predicate);
// return count;
//}
VirtualSocket::waitFor(predicate, timeout);
return count;
}
int emsocket_ppoll(struct pollfd *fds, nfds_t nfds, const struct timespec *tmo_p, const sigset_t *sigmask);
@ -337,24 +544,46 @@ int emsocket_epoll_wait(int, struct epoll_event *, int, int);
int emsocket_epoll_pwait(int, struct epoll_event *, int, int, const sigset_t *);
ssize_t emsocket_read(int fd, void *buf, size_t count) {
if (fd < EMSOCKET_BASE_FD) return read(fd, buf, count);
return emsocket_recv(fd, buf, count, 0);
}
ssize_t emsocket_write(int fd, const void *buf, size_t count) {
if (fd < EMSOCKET_BASE_FD) return write(fd, buf, count);
return emsocket_send(fd, buf, count, 0);
}
int emsocket_close(int fd) {
if (fd < EMSOCKET_BASE_FD) {
return ::close(fd);
}
VLOCK();
auto vs = getvs(fd);
if (vs->sport) {
port_map.erase(vs->sport);
vs->sport = 0;
}
vs->reset(false);
auto vs = VirtualSocket::get(fd);
vs->close();
return 0;
}
int emsocket_fcntl(int fd, int cmd, ...) {
abort();
auto vs = VirtualSocket::get(fd);
if (cmd == F_GETFL) {
return vs->isBlocking() ? O_NONBLOCK : 0;
} else if (cmd == F_SETFL) {
va_list ap;
va_start(ap, cmd);
int flags = va_arg(ap, int);
vs->setBlocking((flags & O_NONBLOCK) ? false : true);
flags &= ~O_NONBLOCK;
if (flags) {
std::cerr << "emsocket_fcntl unrecognized flags=" << flags << std::endl;
}
va_end(ap);
return 0;
}
std::cerr << "emsocket_fcntl unknown fcntl cmd=" << cmd << std::endl;
return -1;
}
int emsocket_ioctl(int fd, unsigned long request, ...) {
std::cerr << "emsocket_ioctl not implemented, request=" << request << std::endl;
abort();
}

@ -37,6 +37,9 @@ SOFTWARE.
// but it redeclares ioctl(), so it needs to be here.
#include <stropts.h>
#include <emscripten/threading.h>
// This must match the same defines in VirtualSocket.h
#define EMSOCKET_BASE_FD 512
#define EMSOCKET_MAX_FD 1024
@ -118,6 +121,8 @@ extern int emsocket_epoll_wait(int, struct epoll_event *, int, int);
extern int emsocket_epoll_pwait(int, struct epoll_event *, int, int, const sigset_t *);
/* From unistd.h */
extern ssize_t emsocket_read(int fd, void *buf, size_t count);
extern ssize_t emsocket_write(int fd, const void *buf, size_t count);
extern int emsocket_close(int fd);
/* From fcntl.h */
@ -177,6 +182,8 @@ extern int emsocket_ioctl(int fd, unsigned long request, ...);
#define epoll_ctl emsocket_epoll_ctl
#define epoll_wait emsocket_epoll_wait
#define epoll_pwait emsocket_epoll_pwait
#define read emsocket_read
#define write emsocket_write
#define close emsocket_close
// Special macros needed to handle __VA_ARGS__ forwarding

@ -0,0 +1,98 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#define EMSOCKET_INTERNAL
#include "emsocketctl.h"
#include <iostream>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <pthread.h>
#include <emscripten.h>
namespace emsocket {
std::string currentProxyUrl;
bool didInit;
pthread_t ioThread;
std::mutex ioMutex;
std::condition_variable ioCv;
std::vector<std::function<void()> > ioCallbacks;
uint64_t ioCounter = 0;
}
using namespace emsocket;
static void *io_thread_main(void *);
void emsocket_init() {
if (didInit) return;
didInit = true;
// Launch dedicated i/o thread
int rc = pthread_create(&ioThread, NULL, io_thread_main, NULL);
if (rc != 0) {
std::cerr << "emsocket_init: Failed to launch I/O thread" << std::endl;
abort();
}
}
void emsocket_set_proxy(const char *proxyUrl) {
currentProxyUrl = proxyUrl ? proxyUrl : "";
}
static void io_thread_reenter(void) {
std::vector<std::function<void()> > callbacks;
{
const std::lock_guard<std::mutex> lock(ioMutex);
callbacks = std::move(ioCallbacks);
ioCallbacks.clear();
ioCounter += 1;
}
ioCv.notify_all();
for (const auto &callback : callbacks) {
callback();
}
}
static void *io_thread_main(void *) {
emscripten_set_main_loop(io_thread_reenter, 100, EM_TRUE);
abort(); // unreachable
}
namespace emsocket {
// Returns the id of the callback.
// Use this id in emsocket_remove_io_callback()
void emsocket_run_on_io_thread(bool sync, std::function<void()> && callback) {
std::unique_lock<std::mutex> lock(ioMutex);
ioCallbacks.emplace_back(std::move(callback));
if (sync) {
// Wait for 2 counter clicks, that'll guarantee the callback has run.
uint64_t ioTarget = ioCounter + 2;
ioCv.wait(lock, [&](){ return ioCounter >= ioTarget; });
}
}
} // namespace emsocket

@ -0,0 +1,45 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
void emsocket_init();
void emsocket_set_proxy(const char *proxyUrl);
#ifdef __cplusplus
}
#endif
#ifdef EMSOCKET_INTERNAL
#include <functional>
namespace emsocket {
// Will be called exactly once in the I/O thread.
void emsocket_run_on_io_thread(bool sync, std::function<void()> && callback);
} // namespace
#endif