Compare commits

...

10 Commits

Author SHA1 Message Date
paradust7 91c3fe85d2 Add 'VPN' capability for playing with friends feature 2022-11-07 16:23:19 +00:00
paradust7 93a1dac259 Emscripten removed epoll.h 2022-10-05 11:57:57 +00:00
paradust7 01b29b520f Handle IP addresses in emsocket_getaddrinfo 2022-05-22 11:34:38 -07:00
paradust7 d380e360ab Move proxy logic into a separate js class file 2022-05-08 03:11:29 +00:00
paradust7 da642dfda0 Fix waitForData deadlock 2022-04-09 07:02:32 +00:00
paradust7 e33fe749b5 Better waiting mechanism, fixes deadlock 2022-04-08 00:49:40 +00:00
paradust7 9e804f1449 emsocket: Add DNS and UDP proxying 2022-04-06 02:13:05 +00:00
paradust7 1500883fa2 Add TCP proxying to emsocket 2022-04-05 12:23:35 +00:00
paradust7 1e2e7d3b12 Wrap almost all socket functions 2022-04-01 01:52:45 +00:00
paradust7 12679e7ef5 Add CMakeLists.txt project 2022-03-30 02:06:54 +00:00
21 changed files with 2246 additions and 240 deletions

View File

@ -1,4 +1,6 @@
cmake_minimum_required(VERSION 3.10)
project(webshims)
add_subdirectory(src)

View File

@ -1,2 +1 @@
add_subdirectory(emsocket)
add_subdirectory(emcurl)

View File

@ -1,7 +0,0 @@
add_library(emcurl curl.cpp)
set_target_properties(emcurl PROPERTIES PUBLIC_HEADER "curl.h")
INSTALL(TARGETS emcurl
LIBRARY DESTINATION lib
PUBLIC_HEADER DESTINATION include/curl
)

View File

@ -1,9 +0,0 @@
#include "curl.h"
struct curl_t {
int abc;
};
CURL *curl_easy_init() {
return nullptr;
}

View File

@ -1,9 +0,0 @@
#pragma once
extern "C" {
typedef struct curl_t CURL;
CURL *curl_easy_init();
}

View File

@ -1,7 +1,19 @@
add_library(emsocket emsocket.cpp)
set_target_properties(emsocket PROPERTIES PUBLIC_HEADER "emsocket.h")
INSTALL(TARGETS emsocket
LIBRARY DESTINATION lib
PUBLIC_HEADER DESTINATION include
add_custom_command(
OUTPUT proxyjs.gen.h
COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/gen.sh
ARGS ${CMAKE_CURRENT_SOURCE_DIR}/proxy.js proxyjs.gen.h
MAIN_DEPENDENCY ${CMAKE_CURRENT_SOURCE_DIR}/proxy.js
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/gen.sh
)
add_custom_target(proxyjs ALL DEPENDS proxyjs.gen.h)
add_library(emsocket emsocket.cpp emsocketctl.cpp ProxyLink.cpp VirtualSocket.cpp)
set_target_properties(emsocket PROPERTIES PUBLIC_HEADER "emsocket.h;emsocketctl.h")
target_include_directories(emsocket PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
add_dependencies(emsocket proxyjs)
INSTALL(
TARGETS emsocket
LIBRARY DESTINATION lib
PUBLIC_HEADER DESTINATION include
)

41
src/emsocket/Link.h Normal file
View File

@ -0,0 +1,41 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#include <functional>
namespace emsocket {
class Link {
public:
using Receiver = std::function<void(const void *, size_t)>;
virtual ~Link() { }
virtual void connect(const SocketAddr &addr) = 0;
virtual void sendto(const void *data, size_t len, const SocketAddr &addr) = 0;
virtual void send(const void *data, size_t len) = 0;
};
} // namespace

43
src/emsocket/Packet.h Normal file
View File

@ -0,0 +1,43 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#include <vector>
#include "SocketAddr.h"
namespace emsocket {
struct Packet {
SocketAddr from;
std::vector<char> data;
Packet() = delete;
Packet(const Packet &) = delete;
Packet& operator=(const Packet &) = delete;
Packet(Packet &&p) : from(p.from), data(std::move(p.data)) { }
Packet(const SocketAddr &from_, const void *buf, size_t len)
: from(from_), data((char*)buf, ((char*)buf) + len) { }
};
} // namespace

228
src/emsocket/ProxyLink.cpp Normal file
View File

@ -0,0 +1,228 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#define EMSOCKET_INTERNAL
#include <iostream>
#include <string>
#include <cassert>
#include <cstdint>
#include <mutex>
#include <memory.h>
#include <emscripten/websocket.h>
#include "VirtualSocket.h"
#include "Packet.h"
#include "ProxyLink.h"
#include "emsocketctl.h"
extern "C" {
// Callbacks to re-enter C/C++ from javascript
EMSCRIPTEN_KEEPALIVE
void proxylink_onopen(void *thisPtr);
EMSCRIPTEN_KEEPALIVE
void proxylink_onerror(void *thisPtr);
EMSCRIPTEN_KEEPALIVE
void proxylink_onclose(void *thisPtr);
EMSCRIPTEN_KEEPALIVE
void proxylink_onmessage(void *thisPtr, const void *buf, size_t n, const char *ip, uint16_t port);
}
EM_JS(int, proxylink_new, (uint16_t bind_port, bool udp, void *thisPtr), {
if (!self.hasOwnProperty('w_proxylink_onopen')) {
self.w_proxylink_onopen = Module.cwrap('proxylink_onopen', null, ['number']);
self.w_proxylink_onerror = Module.cwrap('proxylink_onerror', null, ['number']);
self.w_proxylink_onclose = Module.cwrap('proxylink_onclose', null, ['number']);
self.w_proxylink_onmessage = Module.cwrap('proxylink_onmessage', null, ['number', 'number', 'number', 'number', 'number']);
}
const link = new ProxyLink(bind_port, udp);
link.onopen = () => { w_proxylink_onopen(thisPtr); };
link.onerror = () => { w_proxylink_onerror(thisPtr); };
link.onclose = () => { w_proxylink_onclose(thisPtr); };
link.onmessage = (data, ip, port) => {
var len = data.byteLength;
// TODO: Get rid of these allocations
var buf = _malloc(len);
HEAPU8.set(new Uint8Array(data), buf);
var ip_length = lengthBytesUTF8(ip) + 1;
var ip_buf = _malloc(ip_length);
stringToUTF8(ip, ip_buf, ip_length);
w_proxylink_onmessage(thisPtr, buf, len, ip_buf, port);
_free(buf);
_free(ip_buf);
};
return link.index;
});
EM_JS(void, proxylink_connect, (int index, const char* ip, uint16_t port), {
const link = ProxyLink.get(index);
if (link) {
link.connect(UTF8ToString(ip), port);
}
});
EM_JS(void, proxylink_sendto, (int index, const void *data, int len, const char *dest_ip, uint16_t dest_port), {
const link = ProxyLink.get(index);
if (link) {
link.sendto(HEAPU8.subarray(data, data + len), UTF8ToString(dest_ip), dest_port);
}
});
EM_JS(void, proxylink_send, (int index, const void *data, int len), {
const link = ProxyLink.get(index);
if (link) {
link.send(HEAPU8.subarray(data, data + len));
}
});
EM_JS(void, proxylink_close, (int index), {
const link = ProxyLink.get(index);
if (link) {
link.close();
}
});
namespace emsocket {
/*
* Wrapper around the javascript class of the same name
*/
class ProxyLink : public Link {
public:
ProxyLink() = delete;
ProxyLink(const ProxyLink &) = delete;
ProxyLink& operator=(const ProxyLink &) = delete;
ProxyLink(VirtualSocket *vs_, uint16_t bind_port_, bool udp_)
: vs(vs_),
bind_port(bind_port_),
udp(udp_),
wsIndex(-1)
{
emsocket_run_on_io_thread(true, [this]() {
wsIndex = proxylink_new(bind_port, udp, this);
});
assert(wsIndex > 0);
}
virtual ~ProxyLink() {
emsocket_run_on_io_thread(true, [this]() {
hangup();
});
}
// Called from external thread
virtual void connect(const SocketAddr &addr) {
// Move to I/O thread.
int wsIndex_ = wsIndex;
emsocket_run_on_io_thread(false, [wsIndex_, addr]() {
proxylink_connect(wsIndex_, addr.getIP().c_str(), addr.getPort());
});
}
// Called from external thread
virtual void sendto(const void *data, size_t len, const SocketAddr &addr) {
// Move to I/O thread.
int wsIndex_ = wsIndex;
Packet *pkt = new Packet(SocketAddr(), data, len);
emsocket_run_on_io_thread(false, [wsIndex_, pkt, addr]() {
proxylink_sendto(wsIndex_, &pkt->data[0], pkt->data.size(), addr.getIP().c_str(), addr.getPort());
delete pkt;
});
}
// Called from external thread
virtual void send(const void *data, size_t len) {
// Move to I/O thread.
int wsIndex_ = wsIndex;
Packet *pkt = new Packet(SocketAddr(), data, len);
emsocket_run_on_io_thread(false, [wsIndex_, pkt]() {
proxylink_send(wsIndex_, &pkt->data[0], pkt->data.size());
delete pkt;
});
}
public:
// Called from I/O thread
void onopen() {
vs->linkConnected();
}
// Called from I/O thread
void onerror() {
hangup();
}
// Called from I/O thread
void onclose() {
hangup();
}
// Called from I/O thread
void onmessage(const void *buf, int n, const char *ip, uint16_t port) {
SocketAddr addr(ip, port);
vs->linkReceived(addr, buf, n);
}
// Called from I/O thread
void hangup() {
proxylink_close(wsIndex);
vs->linkShutdown();
}
private:
VirtualSocket *vs;
uint16_t bind_port;
bool udp;
int wsIndex;
};
Link* make_proxy_link(VirtualSocket* vs, uint16_t bindport, bool udp) {
return new ProxyLink(vs, bindport, udp);
}
} // namespace
using namespace emsocket;
EMSCRIPTEN_KEEPALIVE
void proxylink_onopen(void *thisPtr) {
return reinterpret_cast<ProxyLink*>(thisPtr)->onopen();
}
EMSCRIPTEN_KEEPALIVE
void proxylink_onerror(void *thisPtr) {
return reinterpret_cast<ProxyLink*>(thisPtr)->onerror();
}
EMSCRIPTEN_KEEPALIVE
void proxylink_onclose(void *thisPtr) {
return reinterpret_cast<ProxyLink*>(thisPtr)->onclose();
}
EMSCRIPTEN_KEEPALIVE
void proxylink_onmessage(void *thisPtr, const void *buf, size_t n, const char *ip, uint16_t port) {
return reinterpret_cast<ProxyLink*>(thisPtr)->onmessage(buf, n, ip, port);
}

35
src/emsocket/ProxyLink.h Normal file
View File

@ -0,0 +1,35 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#include "SocketAddr.h"
#include "Link.h"
namespace emsocket {
class VirtualSocket;
Link* make_proxy_link(VirtualSocket* vs, uint16_t bindport, bool udp);
} // namespace

141
src/emsocket/SocketAddr.h Normal file
View File

@ -0,0 +1,141 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#include <iostream>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string>
#include <cstring>
namespace emsocket {
class SocketAddr {
public:
SocketAddr() {
clear();
}
SocketAddr(const std::string &ip, uint16_t port) {
clear();
setIP(ip);
setPort(port);
}
SocketAddr(const struct sockaddr *addr, socklen_t addrlen) {
clear();
if (addr->sa_family == AF_INET && addrlen >= sizeof(sin)) {
memcpy(&sin, addr, sizeof(sin));
}
}
bool isIPv4() const {
return true; // Only one supported at the moment
}
bool isIPv6() const {
return false;
}
std::string getIP() const {
char buf[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(sin.sin_addr), buf, sizeof(buf));
return std::string(buf);
}
bool setIP(std::string ip) {
if (1 == inet_pton(AF_INET, ip.c_str(), &(sin.sin_addr))) {
sin.sin_family = AF_INET;
return true; // success
}
return false; // fail
}
uint16_t getPort() const {
return ntohs(sin.sin_port);
}
void setPort(uint16_t port) {
sin.sin_port = htons(port);
}
SocketAddr(const SocketAddr &o) {
memcpy(&sin, &o.sin, sizeof(sin));
}
SocketAddr& operator=(const SocketAddr &o) {
memcpy(&sin, &o.sin, sizeof(sin));
return *this;
}
void clear() {
memset(&sin, 0, sizeof(sin));
}
bool isLocalHost() const {
return (sin.sin_addr.s_addr == htonl(INADDR_LOOPBACK));
}
void copyTo(struct sockaddr *addr, socklen_t *addr_len) const {
if (!addr) {
return;
}
memcpy(addr, &sin, std::min((size_t)*addr_len, sizeof(sin)));
*addr_len = sizeof(sin);
}
bool operator==(const SocketAddr &o) const {
return (
sin.sin_family == o.sin.sin_family &&
sin.sin_port == o.sin.sin_port &&
sin.sin_addr.s_addr == o.sin.sin_addr.s_addr);
}
bool operator!=(const SocketAddr &o) const {
return !(*this == o);
}
const struct sockaddr* sockaddr_ptr() {
return (const struct sockaddr*)&sin;
}
socklen_t sockaddr_len() {
return sizeof(sin);
}
uint32_t addr32() const {
assert(sin.sin_family == AF_INET);
return sin.sin_addr.s_addr;
}
private:
sockaddr_in sin;
};
static inline std::ostream& operator<<(std::ostream &os, const SocketAddr &addr) {
os << addr.getIP() << ":" << addr.getPort();
return os;
}
} // namespace

View File

@ -0,0 +1,272 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#define EMSOCKET_INTERNAL
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <random>
#include <cassert>
#include <emscripten/threading.h>
#include "emsocketctl.h"
#include "VirtualSocket.h"
#include "ProxyLink.h"
#include "WaitList.h"
namespace emsocket {
// This mutex protects:
// 1) Allocating a new socket
// 2) Adding/removing a port binding
// 3) Adding/removing packets from a receive buffer
//
// The VirtualSocket's themselves are not protected by a mutex.
// POSIX sockets are not thread-safe, so it is assumed that
// every fd will be used only by a single thread at a time.
//
static std::mutex vs_mutex;
static std::condition_variable vs_event;
VirtualSocket VirtualSocket::sockets[EMSOCKET_NSOCKETS];
static std::unordered_map<uint16_t, VirtualSocket*> vs_port_map;
static WaitList<VirtualSocket*> wait_list;
#define VSLOCK() const std::lock_guard<std::mutex> lock(vs_mutex)
#define RECVBUFLOCK() const std::lock_guard<std::mutex> rblock(recvbufMutex);
static std::random_device rd;
void VirtualSocket::open(int fd, bool udp) {
assert(fd_ == -1);
reset();
fd_ = fd;
is_udp = udp;
}
void VirtualSocket::reset() {
fd_ = -1;
is_connected = false;
is_shutdown = false;
is_blocking = true;
bindAddr.clear();
remoteAddr.clear();
link = nullptr;
RECVBUFLOCK();
recvbuf.clear();
}
void VirtualSocket::close() {
if (link) {
delete link;
link = nullptr;
}
VSLOCK();
uint16_t port = bindAddr.getPort();
if (port != 0) {
vs_port_map.erase(port);
}
reset();
}
void VirtualSocket::linkConnected() {
is_connected = true;
wait_list.notify(this);
}
void VirtualSocket::linkShutdown() {
is_shutdown = true;
wait_list.notify(this);
}
void VirtualSocket::linkReceived(const SocketAddr& addr, const void *buf, size_t n) {
{
RECVBUFLOCK();
recvbuf.emplace_back(addr, buf, n);
}
wait_list.notify(this);
}
VirtualSocket* VirtualSocket::allocate(bool udp) {
VSLOCK();
for (int idx = 0; idx < EMSOCKET_NSOCKETS; idx++) {
if (sockets[idx].fd_ == -1) {
sockets[idx].open(EMSOCKET_BASE_FD + idx, udp);
return &sockets[idx];
}
}
return nullptr;
}
VirtualSocket* VirtualSocket::get(int fd) {
if (fd < EMSOCKET_BASE_FD || fd >= EMSOCKET_MAX_FD) {
return nullptr;
}
int idx = fd - EMSOCKET_BASE_FD;
VirtualSocket* vs = &sockets[idx];
assert(vs && vs->fd_ == fd);
return vs;
}
bool VirtualSocket::bind(const SocketAddr& addr) {
assert(bindAddr.getPort() == 0);
uint16_t port = addr.getPort();
std::default_random_engine engine(rd());
std::uniform_int_distribution<int> randport(4096, 16384);
{
VSLOCK();
// TODO: Separate out TCP and UDP ports?
if (port == 0) {
do {
port = randport(engine);
} while (vs_port_map.count(port));
} else if (vs_port_map.count(port)) {
return false;
}
vs_port_map[port] = this;
bindAddr = addr;
bindAddr.setPort(port);
}
link = make_proxy_link(this, port, is_udp);
return true;
}
bool VirtualSocket::hasData() const {
RECVBUFLOCK();
return !recvbuf.empty();
}
bool VirtualSocket::canBlock() const {
return is_blocking;
}
void VirtualSocket::waitForData() {
VirtualSocket::waitFor({this}, [&]() {
return isShutdown() || hasData();
}, -1);
}
void VirtualSocket::waitForConnect() {
VirtualSocket::waitFor({this}, [&]() {
return isConnected() || isShutdown();
}, -1);
}
void VirtualSocket::waitFor(
const std::vector<VirtualSocket*> &vslist,
const std::function<bool(void)>& predicate,
int64_t timeout) {
wait_list.waitFor(vslist, predicate, timeout);
}
bool VirtualSocket::startConnect(const SocketAddr &dest) {
if (dest.isLocalHost()) {
std::cerr << "emsocket local TCP not yet supported" << std::endl;
return false;
} else {
assert(!is_udp);
assert(!is_connected);
assert(!is_shutdown);
if (!isBound()) {
bind(SocketAddr()); // bind to random port
}
remoteAddr = dest;
link->connect(dest);
return true;
}
std::cerr << "emsocket no proxy set" << std::endl;
return false;
}
// Stream read/write. Always non-blocking.
ssize_t VirtualSocket::read(void *buf, size_t n) {
assert(!is_udp);
RECVBUFLOCK();
char *cbuf = (char*)buf;
size_t pos = 0;
while (!recvbuf.empty() && pos < n) {
Packet pkt = std::move(recvbuf.front());
recvbuf.pop_front();
size_t take = std::min(n - pos, pkt.data.size());
memcpy(&cbuf[pos], &pkt.data[0], take);
pos += take;
if (take < pkt.data.size()) {
recvbuf.emplace_front(pkt.from, &pkt.data[take], pkt.data.size() - take);
break;
}
}
return pos;
}
void VirtualSocket::write(const void *buf, size_t n) {
assert(!is_udp);
assert(is_connected);
if (link) {
link->send(buf, n);
}
}
// Datagram read/write. Always non-blocking
ssize_t VirtualSocket::recvfrom(void *buf, size_t n, SocketAddr *from) {
assert(is_udp);
char *cbuf = (char*)buf;
{
RECVBUFLOCK();
if (!recvbuf.empty()) {
Packet pkt = std::move(recvbuf.front());
recvbuf.pop_front();
size_t take = std::min(n, pkt.data.size());
memcpy(&cbuf[0], &pkt.data[0], take);
*from = pkt.from;
return take;
}
}
return 0;
}
void VirtualSocket::sendto(const void *buf, size_t n, const SocketAddr& to) {
assert(is_udp);
assert(isBound());
if (to.isLocalHost()) {
SocketAddr sourceAddr("127.0.0.1", bindAddr.getPort());
bool sent = false;
{
VSLOCK();
uint16_t port = to.getPort();
auto it = vs_port_map.find(port);
if (it != vs_port_map.end()) {
it->second->linkReceived(sourceAddr, buf, n);
sent = true;
}
}
if (!sent) {
std::cerr << "sendto going nowhere" << std::endl;
}
return;
}
link->sendto(buf, n, to);
}
} // namespace

View File

@ -0,0 +1,110 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#include <vector>
#include <list>
#include <string>
#include <mutex>
#include <atomic>
#include "SocketAddr.h"
#include "Link.h"
#include "Packet.h"
// This must match the same defines in emsocket.h
#define EMSOCKET_BASE_FD 512
#define EMSOCKET_MAX_FD 1024
#define EMSOCKET_NSOCKETS (EMSOCKET_MAX_FD - EMSOCKET_BASE_FD)
namespace emsocket {
class VirtualSocket {
public:
static VirtualSocket* allocate(bool udp);
static VirtualSocket* get(int fd);
int fd() const { return fd_; }
bool isUDP() const { return is_udp; }
bool isBlocking() const { return is_blocking; }
void setBlocking(bool value) { is_blocking = value; }
bool isBound() const { return bindAddr.getPort() != 0; }
bool bind(const SocketAddr &addr);
const SocketAddr& getBindAddr() { return bindAddr; }
const SocketAddr& getRemoteAddr() { return remoteAddr; }
bool isConnected() const { return is_connected; }
bool isShutdown() const { return is_shutdown; }
// This should only be called holding the lock
// (e.g. inside the predicate for waitFor)
bool hasData() const;
bool canBlock() const;
void waitForData();
void waitForConnect();
static void waitFor(
const std::vector<VirtualSocket*> &waitlist,
const std::function<bool(void)>& predicate,
int64_t timeout);
bool startConnect(const SocketAddr &dest);
// Stream read/write. Always non-blocking.
ssize_t read(void *buf, size_t n);
void write(const void *buf, size_t n);
// Datagram read/write. Always non-blocking
ssize_t recvfrom(void *buf, size_t n, SocketAddr *from);
void sendto(const void *buf, size_t n, const SocketAddr& to);
void close();
void linkConnected();
void linkShutdown();
void linkReceived(const SocketAddr &addr, const void *buf, size_t n);
private:
VirtualSocket()
: fd_(-1) {
}
VirtualSocket(const VirtualSocket &) = delete;
VirtualSocket& operator=(const VirtualSocket &) = delete;
void reset();
void open(int fd, bool udp);
int fd_;
bool is_udp;
bool is_blocking;
std::atomic<bool> is_connected;
std::atomic<bool> is_shutdown;
SocketAddr bindAddr;
SocketAddr remoteAddr;
Link *link;
mutable std::mutex recvbufMutex; // only protects recvbuf
std::list<Packet> recvbuf;
static VirtualSocket sockets[EMSOCKET_NSOCKETS];
};
} // namespace

93
src/emsocket/WaitList.h Normal file
View File

@ -0,0 +1,93 @@
#pragma once
#include <chrono>
#include <condition_variable>
#include <list>
#include <mutex>
namespace emsocket {
template<typename T>
struct Waiter {
std::vector<T> ids;
std::condition_variable cv;
Waiter() = delete;
Waiter(const Waiter&) = delete;
Waiter& operator=(const Waiter&) = delete;
Waiter(const std::vector<T>& ids_) :
ids(ids_) { }
bool isWatching(const T id) const {
for (const auto &e : ids) {
if (e == id) {
return true;
}
}
return false;
}
void notify() {
cv.notify_all();
}
};
template<typename T>
class WaitList {
public:
WaitList() { }
WaitList(const WaitList&) = delete;
WaitList& operator=(const WaitList&) = delete;
void notify(const T id) {
const std::lock_guard<std::mutex> lock(mutex);
for (auto it = waiters.begin(), ie = waiters.end(); it != ie; ++it) {
auto waiter = *it;
if (waiter->isWatching(id)) {
waiter->notify();
}
}
}
bool waitFor(const std::vector<T>& ids,
const std::function<bool(void)>& predicate,
int64_t timeout) {
bool timedOut = (timeout == 0);
bool useUntil = (timeout > 0);
std::chrono::time_point<std::chrono::system_clock> until;
if (useUntil) {
until = std::chrono::system_clock::now() + std::chrono::milliseconds(timeout);
}
return waitInternal(ids, predicate, timedOut, useUntil, until);
}
bool waitInternal(const std::vector<T>& ids,
const std::function<bool(void)>& predicate,
bool timedOut,
bool useUntil,
const std::chrono::time_point<std::chrono::system_clock> &until) {
Waiter<T> self(ids);
std::unique_lock<std::mutex> ul(mutex);
for (;;) {
bool stop_waiting = predicate();
if (stop_waiting || timedOut) {
return stop_waiting;
}
auto it = waiters.insert(waiters.begin(), &self);
if (useUntil) {
timedOut = (self.cv.wait_until(ul, until) == std::cv_status::timeout);
} else {
self.cv.wait(ul);
}
waiters.erase(it);
}
}
private:
std::mutex mutex;
std::list<Waiter<T>*> waiters;
};
} // namespace

View File

@ -22,7 +22,9 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include <deque>
#define EMSOCKET_INTERNAL
#include <list>
#include <netinet/in.h>
#include <cassert>
#include <cstdio>
@ -30,8 +32,6 @@ SOFTWARE.
#include <cstdlib>
#include <cstring>
#include <iomanip>
#include <chrono>
#include <random>
#include <unordered_map>
#include <cerrno>
#include <mutex>
@ -44,205 +44,604 @@ SOFTWARE.
#include <unistd.h>
#include <arpa/inet.h>
#define EMSOCKET_INTERNAL
#include "VirtualSocket.h"
#include "emsocket.h"
// Browsers don't allow websocket servers, so simulate servers on `localhost` with a buffer instead.
static bool is_localhost(const struct sockaddr *addr, socklen_t addrlen) {
assert(addr->sa_family == AF_INET);
if (((sockaddr_in*)addr)->sin_addr.s_addr == htonl(INADDR_LOOPBACK)) {
return true;
}
return false;
}
using namespace emsocket;
static uint16_t get_port(const struct sockaddr *addr, socklen_t addrlen) {
assert(addr->sa_family == AF_INET);
return ntohs(((sockaddr_in*)addr)->sin_port);
}
#if EMSOCKET_DEBUG
// sockfd -> receive packet buffer
struct pkt {
uint16_t source_port;
std::vector<char> data;
std::mutex dbg_mutex;
#define DBG(x) do { \
const std::lock_guard<std::mutex> lock(dbg_mutex); \
x \
} while(0)
pkt() = delete;
pkt(const pkt &) = delete;
pkt& operator=(const pkt &) = delete;
#else
pkt(uint16_t port, const char *buf, size_t len)
: source_port(port), data(&buf[0], &buf[len]) { }
};
#define DBG(x) do { } while (0)
struct VirtualSocket {
std::mutex mutex;
std::condition_variable cv;
bool open = true;
uint16_t sport = 0;
std::deque<pkt> recvbuf;
};
// TODO: Reuse socket ids to avoid blowing up select
#define BASE_SOCKET_ID 100
// Protects the maps and id generation
static std::mutex mutex;
static std::vector<VirtualSocket*> socket_map;
static std::unordered_map<uint16_t, VirtualSocket*> port_map;
static unsigned int seed = 0;
static std::default_random_engine generator;
static std::uniform_int_distribution<uint16_t> randport(4096, 32000);
// Must be called while holding mutex
static uint16_t random_port() {
if (seed == 0) {
seed = std::chrono::system_clock::now().time_since_epoch().count();
generator.seed(seed);
}
uint16_t port;
do {
port = randport(generator);
} while (port_map.count(port) != 0);
return port;
}
static VirtualSocket *getvs(int sockfd) {
assert(sockfd >= BASE_SOCKET_ID);
int id = sockfd - BASE_SOCKET_ID;
assert(id < socket_map.size());
VirtualSocket* vs = socket_map[id];
assert(vs && vs->open);
return vs;
}
#endif
/**************************************************************************************************/
int emsocket_socket(int domain, int type, int protocol) {
const std::lock_guard<std::mutex> lock(mutex);
VirtualSocket* vs = new VirtualSocket();
int id = socket_map.size();
socket_map.push_back(vs);
return BASE_SOCKET_ID + id;
}
int emsocket_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen) {
return -1;
}
int emsocket_bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
const std::lock_guard<std::mutex> lock(mutex);
auto vs = getvs(sockfd);
assert(vs->sport == 0);
uint16_t port = get_port(addr, addrlen);
if (port == 0) {
port = random_port();
DBG(std::cerr << "emsocket_socket " << domain << "," << type << "," << protocol << std::endl;);
if (domain != AF_INET) {
DBG(std::cerr << "emsocket_socket bad domain: " << domain << std::endl;);
errno = EINVAL;
return -1;
}
if (port_map.count(port)) {
if (type != SOCK_STREAM && type != SOCK_DGRAM) {
DBG(std::cerr << "emsocket_socket bad type: " << type << std::endl;);
errno = EINVAL;
return -1;
}
if (protocol == 0) {
if (type == SOCK_STREAM) protocol = IPPROTO_TCP;
if (type == SOCK_DGRAM) protocol = IPPROTO_UDP;
}
if (type == SOCK_DGRAM && protocol != IPPROTO_UDP) {
DBG(std::cerr << "emsocket_socket bad dgram protocol: " << protocol << std::endl;);
errno = EINVAL;
return -1;
}
if (type == SOCK_STREAM && protocol != IPPROTO_TCP) {
DBG(std::cerr << "emsocket_socket bad stream protocol: " << protocol << std::endl;);
errno = EINVAL;
return -1;
}
bool is_udp = (type == SOCK_DGRAM);
auto vs = VirtualSocket::allocate(is_udp);
if (!vs) {
errno = EMFILE;
return -1;
}
DBG(std::cerr << "emsocket_socket returns fd=" << vs->fd() << ", udp=" << vs->isUDP() << std::endl;);
return vs->fd();
}
int emsocket_socketpair(int domain, int type, int protocol, int fds[2]);
int emsocket_bind(int fd, const struct sockaddr *addr, socklen_t len) {
auto vs = VirtualSocket::get(fd);
if (!vs->bind(SocketAddr(addr, len))) {
DBG(std::cerr << "emsocket_bind failed" << std::endl;);
errno = EADDRINUSE;
return -1;
}
vs->sport = port;
port_map[port] = vs;
DBG(std::cerr << "emsocket_bind success fd=" << fd << " bindAddr=" << vs->getBindAddr() << std::endl;);
return 0;
}
ssize_t emsocket_sendto(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen) {
if (!is_localhost(dest_addr, addrlen)) {
// Sending to other than localhost not yet implemented
return 0;
}
uint16_t source_port;
uint16_t dest_port = get_port(dest_addr, addrlen);
VirtualSocket* dest_vs = nullptr;
{
const std::lock_guard<std::mutex> lock(mutex);
source_port = getvs(sockfd)->sport;
auto it = port_map.find(dest_port);
if (it != port_map.end()) {
dest_vs = it->second;
}
}
assert(source_port && "sendto before bind()");
if (!dest_vs) {
// Nothing is listening on localhost?
return 0;
}
// Lock destination vs
{
const std::lock_guard<std::mutex> lock(dest_vs->mutex);
dest_vs->recvbuf.emplace_back(source_port, (const char*)buf, len);
}
dest_vs->cv.notify_all();
//std::cout << "sockfd=" << sockfd << " Sent packet of size " << len << std::endl;
return len;
}
ssize_t emsocket_recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen) {
VirtualSocket *vs;
{
const std::lock_guard<std::mutex> lock(mutex);
vs = getvs(sockfd);
}
// For now, this should never be called in a blocking situation.
assert(vs->recvbuf.size() > 0);
const std::lock_guard<std::mutex> lock(vs->mutex);
const pkt &p = vs->recvbuf.front();
ssize_t written = std::min(p.data.size(), len);
bool truncated = (written != (ssize_t)p.data.size());
memcpy(buf, &p.data[0], written);
if (src_addr) {
struct sockaddr_in ai = {0};
ai.sin_family = AF_INET;
ai.sin_port = htons(p.source_port);
ai.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
memcpy(src_addr, &ai, std::min((size_t)*addrlen, sizeof(ai)));
*addrlen = sizeof(ai);
}
vs->recvbuf.pop_front();
if (truncated) errno = EMSGSIZE;
//std::cout << "sockfd=" << sockfd << " Received packet of size " << written << std::endl;
return written;
}
// Cheap hack
// Only supports select on a single fd
int emsocket_select(
int nfds,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout) {
int sockfd = nfds - 1;
assert(FD_ISSET(sockfd, readfds));
VirtualSocket *vs;
{
const std::lock_guard<std::mutex> lock(mutex);
vs = getvs(sockfd);
}
std::unique_lock<std::mutex> lock(vs->mutex);
if (timeout == NULL) {
vs->cv.wait(lock, [&]{ return vs->recvbuf.size() > 0; });
} else {
long ms = (timeout->tv_sec * 1000) + (timeout->tv_usec / 1000);
auto cvtimeout = std::chrono::milliseconds(ms);
vs->cv.wait_for(lock, cvtimeout, [&]{ return vs->recvbuf.size() > 0; });
}
if (vs->recvbuf.size() == 0) {
return 0;
}
return 1;
}
int emsocket_close(int sockfd) {
const std::lock_guard<std::mutex> lock(mutex);
auto vs = getvs(sockfd);
if (vs->sport) {
port_map.erase(vs->sport);
vs->sport = 0;
}
vs->open = false;
vs->recvbuf.clear();
int emsocket_getsockname(int fd, struct sockaddr *addr, socklen_t *len) {
DBG(std::cerr << "emsocket_getsockname fd=" << fd << std::endl;);
auto vs = VirtualSocket::get(fd);
vs->getBindAddr().copyTo(addr, len);
DBG(std::cerr << " --> " << vs->getBindAddr() << std::endl;);
return 0;
}
int emsocket_connect(int fd, const struct sockaddr *addr, socklen_t len) {
SocketAddr dest(addr, len);
DBG(std::cerr << "emsocket_connect: fd=" << fd << ", " << dest << std::endl;);
auto vs = VirtualSocket::get(fd);
if (vs->isUDP()) {
// connect() on a UDP socket actually has a particular meaning...
// but this is not implemented here.
DBG(std::cerr << "emsocket_connect: Unexpected UDP" << std::endl;);
errno = EPROTOTYPE;
return -1;
}
if (vs->isConnected() || vs->isShutdown()) {
DBG(std::cerr << "emsocket_connect: Already connected or shutdown" << std::endl;);
errno = EISCONN;
return -1;
}
if (!vs->startConnect(dest)) {
DBG(std::cerr << "emsocket_connect: startConnect failed" << std::endl;);
errno = ECONNREFUSED;
return -1;
}
if (vs->isConnected() && !vs->isShutdown()) {
return 0;
}
if (!vs->canBlock()) {
DBG(std::cerr << "emsocket_connect: Connection in progress" << std::endl;);
errno = EINPROGRESS;
return -1;
}
vs->waitForConnect();
if (!vs->isConnected() || vs->isShutdown()) {
DBG(std::cerr << "emsocket_connect: Connection failed after wait" << std::endl;);
errno = ECONNREFUSED;
return -1;
}
return 0;
}
int emsocket_getpeername(int fd, struct sockaddr *addr, socklen_t *len) {
DBG(std::cerr << "emsocket_getpeername: fd=" << fd << std::endl;);
auto vs = VirtualSocket::get(fd);
if (!vs->isConnected()) {
errno = ENOTCONN;
return -1;
}
vs->getRemoteAddr().copyTo(addr, len);
return 0;
}
ssize_t emsocket_send(int fd, const void *buf, size_t n, int flags) {
DBG(std::cerr << "emsocket_send: fd=" << fd << ", n = " << n << std::endl;);
auto vs = VirtualSocket::get(fd);
if (!vs->isConnected() || vs->isShutdown()) {
DBG(std::cerr << " --> not connected" << std::endl;);
errno = ENOTCONN;
return -1;
}
flags &= ~MSG_NOSIGNAL; // no signals anyway
if (flags != 0) {
DBG(std::cerr << "Unsupported flags in emsocket_send: " << flags << std::endl;);
errno = EOPNOTSUPP;
return -1;
}
vs->write(buf, n);
return n;
}
ssize_t emsocket_recv(int fd, void *buf, size_t n, int flags) {
DBG(std::cerr << "emsocket_recv: fd=" << fd << ", n=" << n << std::endl;);
auto vs = VirtualSocket::get(fd);
if (flags != 0) {
DBG(std::cerr << "Unsupported flags in emsocket_recv: " << flags << std::endl;);
errno = EOPNOTSUPP;
return -1;
}
// If there's data ready to go, give it back.
ssize_t bytes = vs->read(buf, n);
if (bytes > 0) {
DBG(std::cerr << " --> read " << bytes << std::endl;);
return bytes;
}
if (!vs->isConnected() || vs->isShutdown()) {
DBG(std::cerr << " --> not connected or shutdown" << std::endl;);
errno = ENOTCONN;
return -1;
}
if (!vs->canBlock()) {
DBG(std::cerr << " --> would block" << std::endl;);
errno = EWOULDBLOCK;
return -1;
}
vs->waitForData();
bytes = vs->read(buf, n);
DBG(std::cerr << " --> read " << bytes << std::endl;);
return bytes;
}
ssize_t emsocket_sendto(int fd, const void *buf, size_t n, int flags, const struct sockaddr *addr, socklen_t addr_len) {
DBG(std::cerr << "emsocket_sendto fd=" << fd << ", n=" << n << std::endl;);
if (addr == NULL || addr_len < sizeof(sockaddr_in)) {
DBG(std::cerr << " --> EDESTADDRREQ" << std::endl;);
errno = EDESTADDRREQ;
return -1;
}
SocketAddr dest(addr, addr_len);
auto vs = VirtualSocket::get(fd);
if (flags != 0) {
DBG(std::cerr << " --> EOPNOTSUPP" << std::endl;);
errno = EOPNOTSUPP;
return -1;
}
if (!vs->isUDP()) {
if (addr != NULL || addr_len != 0) {
DBG(std::cerr << " --> EISCONN" << std::endl;);
errno = EISCONN;
return -1;
}
DBG(std::cerr << " --> forwarding to emsocket_send" << std::endl;);
return emsocket_send(fd, buf, n, flags);
}
if (!vs->isBound()) {
DBG(std::cerr << " --> autobinding" << std::endl;);
// Autobind to random port
if (!vs->bind(SocketAddr())) {
DBG(std::cerr << " --> EADDRINUSE" << std::endl;);
errno = EADDRINUSE;
return -1;
}
}
vs->sendto(buf, n, dest);
DBG(std::cerr << " --> sent" << std::endl;);
return n;
}
ssize_t emsocket_recvfrom(int fd, void *buf, size_t n, int flags, struct sockaddr *addr, socklen_t *addr_len) {
DBG(std::cerr << "emsocket_recvfrom fd=" << fd << ", n = " << n << std::endl;);
auto vs = VirtualSocket::get(fd);
if (flags != 0) {
DBG(std::cerr << "emsocket: unsupported flags in recvfrom" << std::endl;);
errno = EOPNOTSUPP;
return -1;
}
if (!vs->isUDP()) {
DBG(std::cerr << " --> forwarding to emsocket_recv" << std::endl;);
vs->getRemoteAddr().copyTo(addr, addr_len);
return emsocket_recv(fd, buf, n, 0);
}
// Common case: UDP
SocketAddr dest;
ssize_t bytes = vs->recvfrom(buf, n, &dest); // nonblock
if (bytes > 0) {
DBG(std::cerr << " --> got " << bytes << " from " << dest << std::endl;);
dest.copyTo(addr, addr_len);
return bytes;
}
if (!vs->canBlock()) {
DBG(std::cerr << " --> EWOULDBLOCK" << std::endl;);
errno = EWOULDBLOCK;
return -1;
}
vs->waitForData();
bytes = vs->recvfrom(buf, n, &dest);
dest.copyTo(addr, addr_len);
DBG(std::cerr << " --> got " << bytes << " from " << dest << " after wait" << std::endl;);
return bytes;
}
ssize_t emsocket_sendmsg(int fd, const struct msghdr *message, int flags);
int emsocket_sendmmsg(int fd, struct mmsghdr *vmessages, unsigned int vlen, int flags);
ssize_t emsocket_recvmsg(int fd, struct msghdr *message, int flags);
int emsocket_recvmmsg(int fd, struct mmsghdr *vmessages, unsigned int vlen, int flags, struct timespec *tmo);
int emsocket_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen) {
DBG(std::cerr << "emsocket_getsockopt: level=" << level << ", optname=" << optname << std::endl;);
if (level == SOL_SOCKET) {
if (optname == SO_ERROR) {
if (optval && optlen && *optlen == sizeof(int)) {
int *val = (int*)optval;
auto vs = VirtualSocket::get(fd);
if (!vs->isConnected() || vs->isShutdown()) {
*val = ECONNREFUSED;
} else {
*val = 0;
}
return 0;
} else {
errno = EINVAL;
return -1;
}
} else {
errno = ENOPROTOOPT;
return -1;
}
}
errno = ENOPROTOOPT;
return -1;
}
int emsocket_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen) {
DBG(std::cerr << "emsocket_setsockopt: level=" << level << ", optname=" << optname << std::endl;);
return -1;
}
int emsocket_listen(int fd, int n) {
abort();
}
int emsocket_accept(int fd, struct sockaddr *addr, socklen_t *addr_len) {
abort();
}
int emsocket_accept4(int fd, struct sockaddr *addr, socklen_t *addr_len, int flags);
int emsocket_shutdown(int fd, int how);
int emsocket_sockatmark(int fd);
int emsocket_isfdtype(int fd, int fdtype);
static void _proxy_dns_query(const char *name, uint32_t *addr) {
*addr = 0;
SocketAddr dnsAddr("10.0.0.1", 53);
size_t namelen = strlen(name);
int fd = emsocket_socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
std::cerr << "emsocket_getaddrinfo: emsocket_socket failed, errno = " << errno << std::endl;
goto cleanup;
}
DBG(std::cerr << "CONNECTING TO DNS=" << dnsAddr << std::endl;);
if (emsocket_connect(fd, dnsAddr.sockaddr_ptr(), dnsAddr.sockaddr_len()) != 0) {
std::cerr << "emsocket_getaddrinfo: emsocket_connect failed, errno = " << errno << std::endl;
goto cleanup;
}
if (emsocket_send(fd, name, namelen, 0) != namelen) {
std::cerr << "emsocket_getaddrinfo: emsocket_send failed, errno = " << errno << std::endl;
goto cleanup;
}
if (emsocket_read(fd, addr, 4) != 4) {
std::cerr << "emsocket_getaddrinfo: emsocket_read failed, errno = " << errno << std::endl;
goto cleanup;
}
cleanup:
if (fd >= 0 && emsocket_close(fd) != 0) {
std::cerr << "emsocket_getaddrinfo: emsocket_close failed, errno = " << errno << std::endl;
}
}
int emsocket_getaddrinfo(const char *node, const char *service, const struct addrinfo *hints, struct addrinfo **res) {
DBG(std::cerr << "emsocket_getaddrinfo: node=" << (node ? node : "NULL") << ", service=" << (service ? service : "NULL") << std::endl;);
if (service != NULL) {
// Not supported.
std::cerr << "emsocket_getaddrinfo: service field not supported" << std::endl;
return EAI_SERVICE;
}
if (hints && hints->ai_family != AF_INET) {
// Not supported
std::cerr << "emsocket_getaddrinfo: only AF_INET supported" << std::endl;
return EAI_FAIL;
}
if (hints && hints->ai_flags != 0) {
// Not supported
std::cerr << "emsocket_getaddrinfo: ai_flags not supported" << std::endl;
return EAI_BADFLAGS;
}
uint32_t addr = 0;
// Check for IPv4 address
SocketAddr simpleAddr;
if (simpleAddr.setIP(node)) {
addr = simpleAddr.addr32();
} else {
_proxy_dns_query(node, &addr);
}
if (addr == 0) {
return EAI_AGAIN;
}
struct addrinfo *result = (struct addrinfo*)malloc(sizeof(struct addrinfo));
memset(result, 0, sizeof(struct addrinfo));
result->ai_family = AF_INET;
result->ai_socktype = hints ? hints->ai_socktype : 0;
result->ai_protocol = hints ? hints->ai_protocol : 0;
result->ai_addrlen = sizeof(sockaddr_in);
struct sockaddr_in* sin = (struct sockaddr_in*)malloc(sizeof(sockaddr_in));
memset(sin, 0, sizeof(sockaddr_in));
sin->sin_family = AF_INET;
sin->sin_addr.s_addr = addr;
result->ai_addr = (struct sockaddr*)sin;
*res = result;
return 0;
}
void emsocket_freeaddrinfo(struct addrinfo *res) {
if (res) {
free(res->ai_addr);
free(res);
}
return;
}
struct hostent *emsocket_gethostbyname(const char *name);
struct hostent *emsocket_gethostbyaddr(const void *addr, socklen_t len, int type);
void emsocket_sethostent(int stayopen);
void emsocket_endhostent(void);
void emsocket_herror(const char *s);
const char *emsocket_hstrerror(int err);
struct hostent *emsocket_gethostent(void);
struct hostent *emsocket_gethostbyname2(const char *name, int af);
int emsocket_gethostent_r(struct hostent *ret, char *buf, size_t buflen, struct hostent **result, int *h_errnop);
int emsocket_gethostbyaddr_r(const void *addr, socklen_t len, int type, struct hostent *ret, char *buf, size_t buflen, struct hostent **result, int *h_errnop);
int emsocket_gethostbyname_r(const char *name, struct hostent *ret, char *buf, size_t buflen, struct hostent **result, int *h_errnop);
int emsocket_gethostbyname2_r(const char *name, int af, struct hostent *ret, char *buf, size_t buflen, struct hostent **result, int *h_errnop);
#define MAX_SELECT_FDS 64
static void print_fd_set(int nfds, fd_set *x) {
if (x == NULL) {
std::cerr << "NULL";
} else {
std::cerr << "[ ";
for (int fd = 0; fd < nfds; fd++) {
if (FD_ISSET(fd, x)) {
std::cerr << fd << " ";
}
}
std::cerr << "]";
}
}
int emsocket_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) {
#if EMSOCKET_DEBUG
DBG(
std::cerr << "emsocket_select nfds=" << nfds;
std::cerr << ", readfds="; print_fd_set(nfds, readfds);
std::cerr << ", writefs="; print_fd_set(nfds, writefds);
std::cerr << ", exceptfds="; print_fd_set(nfds, exceptfds);
std::cerr << ", timeout=";
if (timeout) {
std::cerr << timeout->tv_sec << "s " << timeout->tv_usec << "us";
} else {
std::cerr << "NULL";
}
std::cerr << std::endl;
);
#endif
// Convert to a call to poll() instead
struct pollfd fds[MAX_SELECT_FDS] = { 0 };
nfds_t count = 0;
for (int fd = 0; fd < nfds; fd++) {
bool check_read = readfds && FD_ISSET(fd, readfds);
bool check_write = writefds && FD_ISSET(fd, writefds);
bool check_except = exceptfds && FD_ISSET(fd, exceptfds);
if (check_read || check_write || check_except) {
if (count == MAX_SELECT_FDS) {
DBG(std::cerr << "emsocket select() called with too many fds" << std::endl;);
errno = EINVAL;
return -1;
}
fds[count].fd = fd;
fds[count].events = (check_read ? POLLIN : 0) | (check_write ? POLLOUT : 0);
count++;
}
}
if (readfds) FD_ZERO(readfds);
if (writefds) FD_ZERO(writefds);
if (exceptfds) FD_ZERO(exceptfds);
int poll_timeout = -1;
if (timeout) {
poll_timeout = (timeout->tv_sec * 1000) + (timeout->tv_usec / 1000);
}
int ret = emsocket_poll(fds, count, poll_timeout);
if (ret <= 0) {
//DBG(std::cerr << " --> returning " << ret << std::endl;);
return ret;
}
int bitcount = 0;
for (int i = 0; i < count; i++) {
int fd = fds[i].fd;
short revents = fds[i].revents;
if (readfds && (revents & (POLLIN|POLLHUP))) {
bitcount++;
FD_SET(fd, readfds);
}
if (writefds && (revents & POLLOUT)) {
bitcount++;
FD_SET(fd, writefds);
}
if (exceptfds && (revents & (POLLERR|POLLHUP))) {
bitcount++;
FD_SET(fd, exceptfds);
}
}
#if EMSOCKET_DEBUG
DBG(
std::cerr << " --> returning " << bitcount;
std::cerr << ", readfds="; print_fd_set(nfds, readfds);
std::cerr << ", writefs="; print_fd_set(nfds, writefds);
std::cerr << ", exceptfds="; print_fd_set(nfds, exceptfds);
std::cerr << std::endl;
);
#endif
return bitcount;
}
int emsocket_pselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask);
#define MAX_POLL_FDS 64
int emsocket_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
int count = 0;
std::function<bool(void)> predicate = [&]() {
count = 0;
for (int i = 0; i < nfds; i++) {
if (fds[i].fd < 0) continue;
auto vs = VirtualSocket::get(fds[i].fd);
short revents = 0;
if (fds[i].events & POLLIN) {
if (vs->hasData()) {
revents |= POLLIN;
}
if (vs->isShutdown()) {
revents |= POLLIN | POLLHUP;
}
}
if (fds[i].events & POLLOUT) {
if (vs->isUDP() || vs->isConnected() || vs->isShutdown()) {
revents |= POLLOUT;
}
}
fds[i].revents = revents;
if (revents) {
count += 1;
}
}
// Keep waiting until count > 0
return (count > 0);
};
std::vector<VirtualSocket*> vslist;
for (int i = 0; i < nfds; i++) {
if (fds[i].fd < 0) continue;
auto vs = VirtualSocket::get(fds[i].fd);
vslist.push_back(vs);
}
VirtualSocket::waitFor(vslist, predicate, timeout);
return count;
}
int emsocket_ppoll(struct pollfd *fds, nfds_t nfds, const struct timespec *tmo_p, const sigset_t *sigmask);
int emsocket_epoll_create(int);
int emsocket_epoll_create1(int);
int emsocket_epoll_ctl(int, int, int, struct epoll_event *);
int emsocket_epoll_wait(int, struct epoll_event *, int, int);
int emsocket_epoll_pwait(int, struct epoll_event *, int, int, const sigset_t *);
ssize_t emsocket_read(int fd, void *buf, size_t count) {
if (fd < EMSOCKET_BASE_FD) return read(fd, buf, count);
return emsocket_recv(fd, buf, count, 0);
}
ssize_t emsocket_write(int fd, const void *buf, size_t count) {
if (fd < EMSOCKET_BASE_FD) return write(fd, buf, count);
return emsocket_send(fd, buf, count, 0);
}
int emsocket_close(int fd) {
if (fd < EMSOCKET_BASE_FD) {
return ::close(fd);
}
auto vs = VirtualSocket::get(fd);
vs->close();
return 0;
}
int emsocket_fcntl(int fd, int cmd, ...) {
auto vs = VirtualSocket::get(fd);
if (cmd == F_GETFL) {
return vs->isBlocking() ? O_NONBLOCK : 0;
} else if (cmd == F_SETFL) {
va_list ap;
va_start(ap, cmd);
int flags = va_arg(ap, int);
vs->setBlocking((flags & O_NONBLOCK) ? false : true);
flags &= ~O_NONBLOCK;
if (flags) {
std::cerr << "emsocket_fcntl unrecognized flags=" << flags << std::endl;
}
va_end(ap);
return 0;
}
std::cerr << "emsocket_fcntl unknown fcntl cmd=" << cmd << std::endl;
return -1;
}
int emsocket_ioctl(int fd, unsigned long request, ...) {
std::cerr << "emsocket_ioctl not implemented, request=" << request << std::endl;
abort();
}

View File

@ -23,36 +23,189 @@ SOFTWARE.
*/
#pragma once
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <poll.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/ioctl.h>
// No idea what this is (it appears emscripten or musl specific)
// but it redeclares ioctl(), so it needs to be here.
#include <stropts.h>
#include <emscripten/threading.h>
// This must match the same defines in VirtualSocket.h
#define EMSOCKET_BASE_FD 512
#define EMSOCKET_MAX_FD 1024
#ifdef __cplusplus
extern "C" {
#endif
int emsocket_socket(int domain, int type, int protocol);
int emsocket_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen);
int emsocket_bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
ssize_t emsocket_sendto(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t emsocket_recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen);
int emsocket_select(
int nfds,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout);
int emsocket_close(int sockfd);
struct mmsghdr;
}
/* From sys/socket.h */
extern int emsocket_socket(int domain, int type, int protocol);
extern int emsocket_socketpair(int domain, int type, int protocol, int fds[2]);
extern int emsocket_bind(int fd, const struct sockaddr *addr, socklen_t len);
extern int emsocket_getsockname(int fd, struct sockaddr *addr, socklen_t *len);
extern int emsocket_connect(int fd, const struct sockaddr *addr, socklen_t len);
extern int emsocket_getpeername(int fd, struct sockaddr *addr, socklen_t *len);
extern ssize_t emsocket_send(int fd, const void *buf, size_t n, int flags);
extern ssize_t emsocket_recv(int fd, void *buf, size_t n, int flags);
extern ssize_t emsocket_sendto(int fd, const void *buf, size_t n, int flags, const struct sockaddr *addr, socklen_t addr_len);
extern ssize_t emsocket_recvfrom(int fd, void *buf, size_t n, int flags, struct sockaddr *addr, socklen_t *addr_len);
extern ssize_t emsocket_sendmsg(int fd, const struct msghdr *message, int flags);
/* GNU extension */
extern int emsocket_sendmmsg(int fd, struct mmsghdr *vmessages, unsigned int vlen, int flags);
extern ssize_t emsocket_recvmsg(int fd, struct msghdr *message, int flags);
/* GNU extension */
extern int emsocket_recvmmsg(int fd, struct mmsghdr *vmessages, unsigned int vlen, int flags, struct timespec *tmo);
extern int emsocket_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen);
extern int emsocket_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen);
extern int emsocket_listen(int fd, int n);
extern int emsocket_accept(int fd, struct sockaddr *addr, socklen_t *addr_len);
/* GNU extension */
extern int emsocket_accept4(int fd, struct sockaddr *addr, socklen_t *addr_len, int flags);
extern int emsocket_shutdown(int fd, int how);
extern int emsocket_sockatmark(int fd);
extern int emsocket_isfdtype(int fd, int fdtype);
/* From netdb.h */
extern int emsocket_getaddrinfo(const char *node, const char *service, const struct addrinfo *hints, struct addrinfo **res);
extern void emsocket_freeaddrinfo(struct addrinfo *res);
extern struct hostent *emsocket_gethostbyname(const char *name);
extern struct hostent *emsocket_gethostbyaddr(const void *addr, socklen_t len, int type);
extern void emsocket_sethostent(int stayopen);
extern void emsocket_endhostent(void);
extern void emsocket_herror(const char *s);
extern const char *emsocket_hstrerror(int err);
/* System V/POSIX extension */
extern struct hostent *emsocket_gethostent(void);
/* GNU extensions */
extern struct hostent *emsocket_gethostbyname2(const char *name, int af);
extern int emsocket_gethostent_r(struct hostent *ret, char *buf, size_t buflen, struct hostent **result, int *h_errnop);
extern int emsocket_gethostbyaddr_r(const void *addr, socklen_t len, int type, struct hostent *ret, char *buf, size_t buflen, struct hostent **result, int *h_errnop);
extern int emsocket_gethostbyname_r(const char *name, struct hostent *ret, char *buf, size_t buflen, struct hostent **result, int *h_errnop);
extern int emsocket_gethostbyname2_r(const char *name, int af, struct hostent *ret, char *buf, size_t buflen, struct hostent **result, int *h_errnop);
/* From sys/select.h */
extern int emsocket_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
extern int emsocket_pselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask);
/* From poll.h */
extern int emsocket_poll(struct pollfd *fds, nfds_t nfds, int timeout);
/* GNU extension */
extern int emsocket_ppoll(struct pollfd *fds, nfds_t nfds, const struct timespec *tmo_p, const sigset_t *sigmask);
/* From sys/epoll.h */
extern int emsocket_epoll_create(int);
extern int emsocket_epoll_create1(int);
extern int emsocket_epoll_ctl(int, int, int, struct epoll_event *);
extern int emsocket_epoll_wait(int, struct epoll_event *, int, int);
extern int emsocket_epoll_pwait(int, struct epoll_event *, int, int, const sigset_t *);
/* From unistd.h */
extern ssize_t emsocket_read(int fd, void *buf, size_t count);
extern ssize_t emsocket_write(int fd, const void *buf, size_t count);
extern int emsocket_close(int fd);
/* From fcntl.h */
extern int emsocket_fcntl(int fd, int cmd, ...);
/* From sys/ioctl.h */
extern int emsocket_ioctl(int fd, unsigned long request, ...);
#ifdef __cplusplus
} // extern "C"
#endif
#ifndef EMSOCKET_INTERNAL
#define socket emsocket_socket
#define socketpair emsocket_socketpair
#define bind emsocket_bind
#define getsockname emsocket_getsockname
#define connect emsocket_connect
#define getpeername emsocket_getpeername
#define send emsocket_send
#define recv emsocket_recv
#define sendto emsocket_sendto
#define recvfrom emsocket_recvfrom
#define sendmsg emsocket_sendmsg
#define sendmmsg emsocket_sendmmsg
#define recvmsg emsocket_recvmsg
#define recvmmsg emsocket_recvmmsg
#define getsockopt emsocket_getsockopt
#define setsockopt emsocket_setsockopt
#define listen emsocket_listen
#define accept emsocket_accept
#define accept4 emsocket_accept4
#define shutdown emsocket_shutdown
#define sockatmark emsocket_sockatmark
#define isfdtype emsocket_isfdtype
#define getaddrinfo emsocket_getaddrinfo
#define freeaddrinfo emsocket_freeaddrinfo
#define gethostbyname emsocket_gethostbyname
#define gethostbyaddr emsocket_gethostbyaddr
#define sethostent emsocket_sethostent
#define endhostent emsocket_endhostent
#define herror emsocket_herror
#define hstrerror emsocket_hstrerror
#define gethostent emsocket_gethostent
#define gethostbyname2 emsocket_gethostbyname2
#define gethostent_r emsocket_gethostent_r
#define gethostbyaddr_r emsocket_gethostbyaddr_r
#define gethostbyname_r emsocket_gethostbyname_r
#define gethostbyname2_r emsocket_gethostbyname2_r
#define select emsocket_select
#define pselect emsocket_pselect
#define poll emsocket_poll
#define ppoll emsocket_ppoll
#define epoll_create emsocket_epoll_create
#define epoll_create1 emsocket_epoll_create1
#define epoll_ctl emsocket_epoll_ctl
#define epoll_wait emsocket_epoll_wait
#define epoll_pwait emsocket_epoll_pwait
#define read emsocket_read
#define write emsocket_write
#define close emsocket_close
#define socket emsocket_socket
#define setsockopt emsocket_setsockopt
#define bind emsocket_bind
#define recvfrom emsocket_recvfrom
#define sendto emsocket_sendto
#define select emsocket_select
#define close emsocket_close
// Special macros needed to handle __VA_ARGS__ forwarding
#define fcntl(_fd, ...) ({ \
int __fcntl_fd = (_fd); \
int __fcntl_rc; \
if (__fcntl_fd < EMSOCKET_BASE_FD) { \
__fcntl_rc = fcntl(__fcntl_fd, __VA_ARGS__); \
} else { \
__fcntl_rc = emsocket_fcntl(__fcntl_fd, __VA_ARGS__); \
} \
__fcntl_rc; \
})
#define ioctl(_fd, ...) ({ \
int __ioctl_fd = (_fd); \
int __ioctl_rc; \
if (__ioctl_fd < EMSOCKET_BASE_FD) { \
__ioctl_rc = ioctl(__ioctl_fd, __VA_ARGS__); \
} else { \
__ioctl_rc = emsocket_ioctl(__ioctl_fd, __VA_ARGS__); \
} \
__ioctl_rc; \
})
#endif

View File

@ -0,0 +1,128 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#define EMSOCKET_INTERNAL
#include "emsocketctl.h"
#include <iostream>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <pthread.h>
#include <emscripten.h>
#include "proxyjs.gen.h"
namespace emsocket {
bool didInit;
pthread_t ioThread;
std::mutex ioMutex;
std::condition_variable ioCv;
std::vector<std::function<void()> > ioCallbacks;
uint64_t ioCounter = 0;
}
using namespace emsocket;
static void *io_thread_main(void *);
EMSCRIPTEN_KEEPALIVE
extern "C"
void emsocket_init(void) {
if (didInit) return;
didInit = true;
// Launch dedicated i/o thread
int rc = pthread_create(&ioThread, NULL, io_thread_main, NULL);
if (rc != 0) {
std::cerr << "emsocket_init: Failed to launch I/O thread" << std::endl;
abort();
}
}
EM_JS(void, _set_proxy, (const char *url), {
setProxy(UTF8ToString(url));
});
EMSCRIPTEN_KEEPALIVE
extern "C"
void emsocket_set_proxy(const char *url) {
char *urlcopy = strdup(url);
emsocket_run_on_io_thread(false, [urlcopy]() {
_set_proxy(urlcopy);
free(urlcopy);
});
}
EM_JS(void, _set_vpn, (const char *vpn), {
setVPN(UTF8ToString(vpn));
});
EMSCRIPTEN_KEEPALIVE
extern "C"
void emsocket_set_vpn(const char *vpn) {
char *vpncopy = strdup(vpn);
emsocket_run_on_io_thread(false, [vpncopy]() {
_set_vpn(vpncopy);
free(vpncopy);
});
}
static void io_thread_reenter(void) {
std::vector<std::function<void()> > callbacks;
{
const std::lock_guard<std::mutex> lock(ioMutex);
callbacks = std::move(ioCallbacks);
ioCallbacks.clear();
ioCounter += 1;
}
ioCv.notify_all();
for (const auto &callback : callbacks) {
callback();
}
}
static void *io_thread_main(void *) {
init_proxyjs();
// TODO: emsocket_run_on_io_thread should use a WebWorker
// message to wakeup the I/O thread instead of polling
// every 10ms.
emscripten_set_main_loop(io_thread_reenter, 100, EM_TRUE);
abort(); // unreachable
}
namespace emsocket {
// Returns the id of the callback.
// Use this id in emsocket_remove_io_callback()
void emsocket_run_on_io_thread(bool sync, std::function<void()> && callback) {
std::unique_lock<std::mutex> lock(ioMutex);
ioCallbacks.emplace_back(std::move(callback));
if (sync) {
// Wait for 2 counter clicks, that'll guarantee the callback has run.
uint64_t ioTarget = ioCounter + 2;
ioCv.wait(lock, [&](){ return ioCounter >= ioTarget; });
}
}
} // namespace emsocket

View File

@ -0,0 +1,46 @@
/*
MIT License
Copyright (c) 2022 paradust7
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
void emsocket_init(void);
void emsocket_set_proxy(const char *url);
void emsocket_set_vpn(const char *vpn);
#ifdef __cplusplus
}
#endif
#ifdef EMSOCKET_INTERNAL
#include <functional>
namespace emsocket {
// Will be called exactly once in the I/O thread.
void emsocket_run_on_io_thread(bool sync, std::function<void()> && callback);
} // namespace
#endif

10
src/emsocket/gen.sh Executable file
View File

@ -0,0 +1,10 @@
#!/bin/sh
echo "CALLED WITH $1 $2"
INPUT_FILE="$1"
OUTPUT_FILE="$2"
echo 'EM_JS(void, init_proxyjs, (), {' > "$OUTPUT_FILE"
cat "$INPUT_FILE" >> "$OUTPUT_FILE"
echo "});" >> "$OUTPUT_FILE"

259
src/emsocket/proxy.js Normal file
View File

@ -0,0 +1,259 @@
// This code runs on a WebWorker wrapped inside a function body.
// To export a global symbol, assigned it through 'self'.
self.proxyUrl = "";
function setProxy(url) {
self.proxyUrl = url;
}
self.setProxy = setProxy;
self.vpnCode = "";
function setVPN(code) {
self.vpnCode = code;
}
self.setVPN = setVPN;
function inet_ntop(n) {
const a = (n >> 24) & 0xFF;
const b = (n >> 16) & 0xFF;
const c = (n >> 8) & 0xFF;
const d = (n >> 0) & 0xFF;
return `${a}.${b}.${c}.${d}`;
}
self.inet_ntop = inet_ntop;
function inet_pton(ip) {
const ret = new ArrayBuffer(4);
const v = new DataView(ret);
var [a, b, c, d] = ip.split('.');
v.setUint8(0, parseInt(a));
v.setUint8(1, parseInt(b));
v.setUint8(2, parseInt(c));
v.setUint8(3, parseInt(d));
return ret;
}
self.inet_pton = inet_pton;
self.EP_MAGIC = 0x778B4CF3;
function unencapsulate(data) {
// Data is encapsulated with a 12 byte header.
// Magic - 4 bytes EP_MAGIC
// Dest IP - 4 bytes 0xAABBCCDD for AA.BB.CC.DD
// Dest Port - 2 bytes
// Packet Len - 2 bytes
if (!(data instanceof ArrayBuffer)) {
throw new Error("Received text over encapsulated channel");
}
if (data.byteLength < 12) {
throw new Error("Encapsulated header not present (short message)");
}
const view = new DataView(data);
const magic = view.getUint32(0);
if (magic != EP_MAGIC) {
throw new Error("Encapsulated packet header corrupted");
}
const src_ip = inet_ntop(view.getUint32(4));
const src_port = view.getUint16(8);
const pktlen = view.getUint16(10);
if (data.byteLength != 12 + pktlen) {
throw new Error("Invalid encapsulated packet length");
}
return [src_ip, src_port, data.slice(12)];
}
self.unencapsulate = unencapsulate;
function encapsulate(dest_ip, dest_port, data) {
const edata = new ArrayBuffer(12 + data.byteLength);
const view = new DataView(edata);
view.setUint32(0, EP_MAGIC);
(new Uint8Array(edata, 4, 4)).set(new Uint8Array(inet_pton(dest_ip)));
view.setUint16(8, dest_port);
view.setUint16(10, data.byteLength);
(new Uint8Array(edata, 12)).set(data);
return edata;
}
self.encapsulate = encapsulate;
class ProxyLink {
constructor(bind_port, udp) {
this.bind_port = bind_port;
this.udp = udp;
this.onopen = null;
this.onerror = null;
this.onclose = null;
this.onmessage = null;
this.expectHandshake = null;
this.userEnabled = false;
this.userBuffer = [];
this.index = ProxyLink.links.length;
ProxyLink.links.push(this);
this.ws = null;
this.activated = false;
this.dead = false;
this.encapsulated = false;
this.receive_info = null;
if (this.udp && vpnCode) {
this.encapsulated = true;
this._activate();
}
}
connect(ip, port) {
if (this.udp)
throw new Error('ProxyLink: connect() called on udp socket');
this.connect_info = [ip, port];
this._activate();
}
_activate() {
if (this.activated)
throw new Error('ProxyLink activated twice');
this.activated = true;
const ws = new WebSocket(proxyUrl);
this.ws = ws;
ws.binaryType = "arraybuffer";
ws.onopen = this.handleOpen.bind(this);
ws.onerror = this.handleError.bind(this);
ws.onclose = this.handleClose.bind(this);
ws.onmessage = this.handleMessage.bind(this);
}
handleOpen() {
var req;
// Send proxy request
if (this.encapsulated) {
req = `VPN ${vpnCode} BIND IPV4 UDP ${this.bind_port}`;
this.expectHandshake = 'BIND OK';
} else {
const [ip, port] = this.connect_info;
req = `PROXY IPV4 ${this.udp ? "UDP" : "TCP"} ${ip} ${port}`;
this.expectHandshake = 'PROXY OK';
}
this.ws.send(req);
}
handleError() {
if (this.onerror) {
this.onerror();
}
}
handleClose() {
this._close();
}
handleMessage(e) {
try {
this._handleMessage(e);
} catch (err) {
console.log("ProxyLink javascript exception");
console.log(err);
this._close();
}
}
_handleMessage(e) {
const data = e.data;
if (!this.userEnabled) {
// Waiting for proxy auth message
if (!this.expectHandshake) {
throw new Error("Invalid message before proxy request");
}
if (data != this.expectHandshake) {
throw new Error("Invalid handshake response");
}
this._enable();
this.onopen();
return;
}
if (this.encapsulated) {
const [src_ip, src_port, rdata] = unencapsulate(data);
this.onmessage(rdata, src_ip, src_port);
} else {
const [src_ip, src_port] = this.connect_info;
this.onmessage(data, src_ip, src_port);
}
}
sendto(data, ip, port) {
if (this.dead) return;
if (!this.activated) {
this.connect_info = [ip, port];
this._activate();
}
if (this.encapsulated) {
const edata = encapsulate(ip, port, data);
this._send(edata);
} else {
if (this.connect_info[0] !== ip || this.connect_info[1] !== port) {
throw new Error('ProxyLink: Address mismatch on non-encapsulated link');
}
this._send(data);
}
}
send(data) {
if (this.dead) return;
if (!this.activated) {
throw new Error('ProxyLink: send before connect');
}
if (this.encapsulated) {
throw new Error('ProxyLink: Encapsulated send not supported');
}
this._send(data);
}
_send(data) {
if (typeof data !== 'string') {
// If this copy isn't done, send fails with:
// "The provided ArrayBufferView value must not be shared."
data = new Uint8Array(data);
}
if (this.userEnabled) {
this.ws.send(data);
} else {
this.userBuffer.push(data);
}
}
_enable() {
this.userEnabled = true;
for (const data of this.userBuffer) {
this.ws.send(data);
}
this.userBuffer = null;
}
// Call this internally to dispatch the onclose callback but leave
// this link on the list.
_close() {
this.dead = true;
const ws = this.ws;
if (ws) {
ws.onopen = ws.onerror = ws.onclose = ws.onmessage = null;
ws.close();
this.ws = null;
}
if (this.onclose) {
this.onclose();
}
}
// This should only be called externally, because it does not
// invoke onclose(), and immediately removes this link from the list.
close() {
this.onopen = null;
this.onerror = null;
this.onclose = null;
this.onmessage = null;
this._close();
delete ProxyLink.links[this.index];
}
static get(index) {
return ProxyLink.links[index];
}
}
self.ProxyLink = ProxyLink;
// minify can't handle inline static members, so declare it here.
self.ProxyLink.links = [null]; // 0 considered an invalid index

60
src/emsocket/wrap.py Executable file
View File

@ -0,0 +1,60 @@
#!/usr/bin/env python3
# Emscripten's linker doesn't support --wrap, so instead this script replaces
# all relevant #include directives (see TO_WRAP) with an include of <emsocket.h>.
# This ensure the emsocket_* functions will be used.
#
# Usage:
# ./wrap.py <path_to_source_tree_to_modify>
#
TO_WRAP = [
'sys/socket.h',
'sys/select.h',
'sys/epoll.h',
'poll.h',
'netdb.h',
'unistd.h',
'fcntl.h',
'sys/ioctl.h',
]
import sys
import os
import re
def wrap(path):
for entry in os.scandir(path):
if entry.is_symlink():
continue
elif entry.is_dir(follow_symlinks=False):
wrap(entry.path)
elif entry.is_file(follow_symlinks=False):
if entry.name.endswith(('.c', '.cpp', '.h')):
wrap_file(entry.path)
include_line = re.compile(r'\#\s*include\s+<([^>]+)>(.*)', re.DOTALL)
def wrap_file(path):
contents = open(path, 'rb').read().decode('utf8')
lines = contents.splitlines(True)
out = []
changed = False
for line in lines:
m = include_line.match(line)
if m and m.groups()[0].strip() in TO_WRAP:
out.append('#include <emsocket.h>' + m.groups()[1])
changed = True
else:
out.append(line)
if changed:
print("Edited " + path)
with open(path, 'wb') as f:
f.write(''.join(out).encode('utf8'))
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: {} <path_to_source_tree_to_modify>".format(sys.argv[0]))
sys.exit(1)
path = sys.argv[1]
wrap(path)