Redesign APIs

master
Ryan Lee 2016-09-06 15:42:38 +09:00
parent 990d19886a
commit 101fa25cfd
16 changed files with 744 additions and 711 deletions

View File

@ -82,6 +82,7 @@ find_package(Asio)
# ============================================================================
set(HEADERS
"src/peerconnect.h"
"src/common.h"
"src/control.h"
"src/controlobserver.h"
"src/peer.h"

View File

@ -1,8 +1,8 @@
# PeerConnect
A PeerConnect is a peer to peer socket library.
A PeerConnect is a peer-to-peer library for network.
- Network connection by random id or email address
- Network connection by random id
- No ip address and port number
- Support NAT traversal and WebRTC
@ -10,23 +10,23 @@ A PeerConnect is a peer to peer socket library.
### How to use
Peer A (listen)
```
PeerConnect pc;
pc.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) {
PeerConnect pc1("PEER_A");
pc1.On("message", function_pc(string peer, PeerConnect::Buffer& data) {
std::cout << "A message has been received." << std::endl;
});
pc.SignIn("PEER_A");
pc1.Open();
```
Peer B (connect)
```
PeerConnect pc;
pc.On("signin", function_pc(PeerConnect* pc, string id) {
pc->Connect("PEER_A");
PeerConnect pc2("PEER_B");
pc2.On("open", function_pc(string peer) {
pc2.>Connect("PEER_A");
});
pc.On("connect", function_pc(PeerConnect* pc, string id) {
pc->Send("PEER_A", "Hello");
pc2.On("connect", function_pc(string peer) {
pc2.Send("PEER_A", "Hello");
});
pc.SignIn("PEER_B");
pc2.Open();
```
### How it works

View File

@ -22,30 +22,31 @@ int main(int argc, char *argv[]) {
return 1;
}
string name = argv[1];
string server = argv[1];
PeerConnect pc;
pc.On("signin", function_pc(PeerConnect* pc, string id) {
pc->Connect(name);
pc.On("open", function_pc(string channel) {
pc.Connect(server);
});
pc.On("connect", function_pc(PeerConnect* pc, string id) {
pc->Send(id, "Hello world");
std::cout << "Sent 'Hello world' message to " << id << "." << std::endl;
pc.On("connect", function_pc(string channel) {
pc.Send(channel, "Hello world");
std::cout << "Sent 'Hello world' message to " << channel << "." << std::endl;
});
pc.On("disconnect", function_pc(PeerConnect* pc, string id) {
std::cout << "Peer " << id << " has been disconnected" << std::endl;
pc.On("close", function_pc(string channel, CloseCode code, string desc) {
std::cout << "Peer " << channel << " has been closed" << std::endl;
PeerConnect::Stop();
});
pc.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) {
pc.On("message", function_pc(string channel, PeerConnect::Buffer& data) {
std::cout << "Message '" << std::string(data.buf_, data.size_) <<
"' has been received." << std::endl;
pc.Close();
});
pc.SignIn();
pc.Open();
PeerConnect::Run();
return 0;

View File

@ -19,25 +19,25 @@ int main(int argc, char *argv[]) {
usage(argv[0]);
return 1;
}
string name = argv[1];
string server = argv[1];
PeerConnect pc;
PeerConnect pc(server);
pc.On("connect", function_pc(PeerConnect* pc, string id) {
std::cout << "Peer " << id << " has been connected." << std::endl;
pc.On("connect", function_pc(string peer) {
std::cout << "Peer " << peer << " has been connected." << std::endl;
});
pc.On("disconnect", function_pc(PeerConnect* pc, string id) {
std::cout << "Peer " << id << " has been disconnected." << std::endl;
pc.On("close", function_pc(string peer, CloseCode code, string desc) {
std::cout << "Peer " << peer << " has been closed." << std::endl;
});
pc.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) {
pc.On("message", function_pc(string peer, PeerConnect::Buffer& data) {
std::cout << "Message " << std::string(data.buf_, data.size_) <<
" has been received." << std::endl;
pc->Send(id, data.buf_, data.size_);
pc.Send(peer, data.buf_, data.size_);
});
pc.SignIn(name);
pc.Open();
PeerConnect::Run();
return 0;

View File

@ -27,9 +27,9 @@
using namespace std;
using namespace pc;
bool parse_args(int argc, char* argv[], std::string& alias, std::string& connect_to, bool& server_mode);
bool parse_args(int argc, char* argv[], string& local_channel, string& remote_channel, bool& server_mode);
void usage(const char* prg);
void read_stdin(PeerConnect* pc, std::string id);
void read_stdin(PeerConnect* pc, std::string channel);
bool write_stdout(const char* buf, int len);
void set_mode(PeerConnect* pc);
void ctrlc_handler(int s);
@ -39,15 +39,15 @@ static PeerConnect *pc_;
int main(int argc, char *argv[]) {
string connec_to;
string alias;
string local_channel;
string remote_channel;
bool server_mode;
//
// Parse arguments
//
if (!parse_args(argc, argv, alias, connec_to, server_mode)) {
if (!parse_args(argc, argv, local_channel, remote_channel, server_mode)) {
usage(argv[0]);
return 1;
}
@ -56,43 +56,42 @@ int main(int argc, char *argv[]) {
// Set event handlers
//
PeerConnect pc;
PeerConnect pc(local_channel);
set_mode(&pc);
pc.On("signin", function_pc(PeerConnect* pc, string id) {
pc.On("open", function_pc( string peer ) {
if (server_mode) {
std::cerr << "Listening " << id << std::endl;
std::cerr << "Listening " << peer << std::endl;
}
else {
std::cerr << "Connecting to " << connec_to << std::endl;
pc->Connect(connec_to);
std::cerr << "Connecting to " << remote_channel << std::endl;
pc.Connect(remote_channel);
}
});
pc.On("connect", function_pc(PeerConnect* pc, string id) {
pc.On("connect", function_pc( string peer ) {
std::cerr << "Connected" << std::endl;
std::thread(read_stdin, pc, id).detach();
std::thread(read_stdin, &pc, peer).detach();
});
pc.On("disconnect", function_pc(PeerConnect* pc, string id) {
if (server_mode)
std::cerr << "Disconnected" << std::endl;
else
pc->SignOut();
pc.On("close", function_pc( string peer, CloseCode code, string desc ) {
if ( peer == local_channel ) {
PeerConnect::Stop();
}
else {
pc.Close();
}
if ( !desc.empty() ) {
std::cerr << desc << std::endl;
}
});
pc.On("signout", function_pc(PeerConnect* pc, string id) {
PeerConnect::Stop();
});
pc.On("error", function_pc(PeerConnect* pc, string id){
std::cerr << pc->GetErrorMessage() << std::endl;
PeerConnect::Stop();
});
pc.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) {
pc.On("message", function_pc(string peer, PeerConnect::Buffer& data) {
if (!write_stdout(data.buf_, data.size_)) {
pc->Disconnect(id);
pc.Close(peer);
}
});
@ -100,7 +99,7 @@ int main(int argc, char *argv[]) {
// Sign in as anonymous user
//
pc.SignIn(alias, "anonymous", "nopassword");
pc.Open();
PeerConnect::Run();
return 0;
@ -120,7 +119,7 @@ int main(int argc, char *argv[]) {
#define STDERR_FILENO 2
#endif
void read_stdin(PeerConnect* pc, std::string id)
void read_stdin(PeerConnect* pc, std::string channel)
{
int nbytes;
char buf[32*1024];
@ -128,11 +127,11 @@ void read_stdin(PeerConnect* pc, std::string id)
for (;;) {
nbytes = read(STDIN_FILENO, buf, sizeof(buf));
if (nbytes <= 0) {
pc->Disconnect(id);
pc->Close( channel );
return;
}
if (!pc->SyncSend(id, buf, nbytes)) {
if (!pc->Send(channel, buf, nbytes, WAITING_ON)) {
return;
}
}
@ -157,7 +156,7 @@ bool write_stdout(const char* buf, int len)
void ctrlc_handler(int s) {
std::cerr << "Terminating..." << std::endl;
pc_->SignOut();
pc_->Close();
}
void set_mode(PeerConnect* pc)
@ -172,14 +171,15 @@ void set_mode(PeerConnect* pc)
#endif
}
bool parse_args(int argc, char* argv[], std::string& alias, std::string& connect_to, bool& server_mode) {
bool parse_args(int argc, char* argv[], string& local_channel, string& remote_channel, bool& server_mode) {
if (argc == 2) {
connect_to = argv[1];
remote_channel = argv[1];
local_channel = PeerConnect::CreateRandomUuid();
server_mode = false;
return true;
}
else if (argc == 3 && std::string(argv[1]) == "-l") {
alias = argv[2];
local_channel = argv[2];
server_mode = true;
return true;
}
@ -187,13 +187,13 @@ bool parse_args(int argc, char* argv[], std::string& alias, std::string& connect
}
void usage(const char* prg) {
std::cerr << "P2P netcat version 0.1 (http://github.com/peerconnect/peerconnect)" << std::endl << std::endl;
std::cerr << "P2P netcat demo (http://github.com/peersio/peerconnect)" << std::endl << std::endl;
std::cerr << "Usage: " << prg << " [-l] name" << std::endl << std::endl;
std::cerr << " Options:" << std::endl;
std::cerr << " -l Listen mode, for inbound connections" << std::endl << std::endl;
std::cerr << "Example: " << std::endl;
std::cerr << " > " << prg << " -l random_id : Listen randoom_id" << std::endl;
std::cerr << " > " << prg << " random_id : Connect to random_id" << std::endl;
std::cerr << " > " << prg << " -l my_channel : Listen my_channel" << std::endl;
std::cerr << " > " << prg << " my_channel : Connect to my_channel" << std::endl;
}

35
src/common.h Normal file
View File

@ -0,0 +1,35 @@
/*
* Copyright 2016 The PeerConnect Project Authors. All rights reserved.
*
* Ryan Lee
*/
#ifndef __PEERCONNECT_COMMON_H__
#define __PEERCONNECT_COMMON_H__
namespace pc {
#define function_pc [&]
enum CloseCode {
// Success
CLOSE_NORMAL = 0,
// Failure
CLOSE_GOING_AWAY,
CLOSE_ABNORMAL,
CLOSE_PROTOCOL_ERROR,
CLOSE_SIGNAL_ERROR
};
const bool WAITING_OFF = false;
const bool WAITING_ON = true;
const bool FORCE_QUEUING_OFF = false;
const bool FORCE_QUEUING_ON = true;
} // namespace pc
#endif // __PEERCONNECT_COMMON_H__

View File

@ -61,14 +61,14 @@ bool Control::InitializeControl() {
webrtc::MediaConstraintsInterface* constraints = NULL;
if (!CreatePeerFactory(constraints)) {
if ( !CreatePeerFactory(constraints) ) {
LOGP_F(LERROR) << "CreatePeerFactory failed";
DeleteControl();
return false;
}
webrtc_thread_ = rtc::Thread::Current();
ASSERT( webrtc_thread_ != nullptr);
ASSERT( webrtc_thread_ != nullptr );
return true;
}
@ -83,43 +83,32 @@ void Control::DeleteControl() {
}
void Control::SignIn(const string& user_id, const string& user_password, const string& open_id) {
void Control::Open(const string& user_id, const string& user_password, const string& channel) {
// 1. Connect to signal server
// 2. Send signin command to signal server
// 3. Send createchannel command to signal server (channel name is id or alias)
// Other peers connect to this peer by channel name, that is id or alias
// 4. Generate 'signedin' event to PeerConnect
// 2. Send open command to signal server
// 3. Send createchannel command to signal server.
// A channel name is user-supplied string to listen or random string.
// Other peers connect to this peer by channel name.
// 4. Generate 'open' event to PeerConnect
if (signal_.get() == NULL) {
LOGP_F( LERROR ) << "SignIn failed, no signal server";
LOGP_F( LERROR ) << "Open failed, no signal server";
return;
}
open_id_ = open_id;
channel_ = channel;
user_id_ = user_id;
// Start by signing in
signal_->SignIn(user_id, user_password);
// Connect to signal server
signal_->Open(user_id, user_password);
LOGP_F( INFO ) << "Done";
return;
}
void Control::SignOut() {
void Control::Connect(const string channel) {
if (webrtc_thread_ != rtc::Thread::Current()) {
ControlMessageData *data = new ControlMessageData(0, ref_);
webrtc_thread_->Post(this, MSG_SIGNOUT, data);
return;
}
signal_->SignOut();
Close();
LOGP_F( INFO ) << "Done";
}
void Control::Connect(const string id) {
// 1. Join channel on signal server
// 2. Server(remote) peer createoffer
// 3. Client(local) peer answeroffer
@ -130,48 +119,29 @@ void Control::Connect(const string id) {
return;
}
LOGP_F( INFO ) << "Joining channel " << id;
JoinChannel(id);
LOGP_F( INFO ) << "Joining channel " << channel;
JoinChannel(channel);
}
void Control::Close(const string id, bool force_queuing) {
void Control::Close(const CloseCode code, bool force_queuing) {
LOGP_F( INFO ) << "Call";
//
// Called by
// PeerConnect if user closes the connection.
// PeerControl if remote peer closes a ice connection or data channel
// Verify current thread
//
if (force_queuing || webrtc_thread_ != rtc::Thread::Current()) {
ControlMessageData *data = new ControlMessageData(id, ref_);
webrtc_thread_->Post(this, MSG_CLOSE_PEER, data);
ControlMessageData *data = new ControlMessageData(code, ref_);
webrtc_thread_->Post(this, MSG_CLOSE, data);
LOGP_F( INFO ) << "Queued";
return;
}
// 1. Leave channel on signal server
// 2. Erase peer
// 3. Close peer
//
// Close peers
//
LeaveChannel(id);
auto peer_found = peers_.find(id);
if ( peer_found == peers_.end() ) {
LOGP_F( WARNING ) << "peer not found, " << id;
return;
}
Peer peer = peer_found->second;
peers_.erase( peer_found );
peer->Close();
LOGP_F( INFO ) << "Done, id is " << id;
}
void Control::Close( const string id ) {
Close( id, QUEUEING_ON );
}
void Control::Close() {
std::vector<string> peer_ids;
for (auto peer : peers_) {
@ -182,8 +152,52 @@ void Control::Close() {
for (auto id : peer_ids) {
LOGP_F( INFO ) << "Try to close peer having id " << id;
Close(id, QUEUEING_ON);
ClosePeer(id, code);
}
//
// Close signal server
//
if ( pc_ ) {
pc_->OnClose( channel_ ,code );
}
LOGP_F( INFO ) << "Done";
}
void Control::ClosePeer( const string channel, const CloseCode code, bool force_queuing ) {
//
// Called by
// PeerConnect if user closes the connection.
// PeerControl if remote peer closes a ice connection or data channel
//
if (force_queuing || webrtc_thread_ != rtc::Thread::Current()) {
ControlMessageData *data = new ControlMessageData(channel, ref_);
data->data_int32_ = code;
webrtc_thread_->Post(this, MSG_CLOSE_PEER, data);
return;
}
// 1. Erase peer
// 2. Close peer
auto peer_found = peers_.find(channel);
if ( peer_found == peers_.end() ) {
LOGP_F( WARNING ) << "peer not found, " << channel;
return;
}
Peer peer = peer_found->second;
peers_.erase( peer_found );
peer->Close(code);
// 3. Leave channel on signal server
LeaveChannel(channel);
LOGP_F( INFO ) << "Done, channel is " << channel;
}
//
@ -216,99 +230,73 @@ bool Control::SyncSend(const string to, const char* buffer, const size_t size) {
// Send command to other peer by signal server
//
void Control::SendCommand(const string& id, const string& command, const Json::Value& data) {
signal_->SendCommand(id, command, data);
void Control::SendCommand(const string& channel, const string& command, const Json::Value& data) {
signal_->SendCommand(channel, command, data);
}
void Control::OnConnected(const string id) {
if ( observer_ == nullptr ) {
LOGP_F( WARNING ) << "observer_ is null, id is " << id;
void Control::OnPeerConnect(const string channel) {
if ( pc_ == nullptr ) {
LOGP_F( WARNING ) << "pc_ is null, channel is " << channel;
return;
}
observer_->OnPeerConnected(id);
LOGP_F( INFO ) << "Done, id is " << id;
pc_->OnConnect(channel);
LOGP_F( INFO ) << "Done, channel is " << channel;
}
//
// Ice connection state has been changed to close.
// It means that peer data channel had been closed already.
//
// Implements PeerObserver::OnDisconnected()
//
void Control::OnPeerClose(const string channel, CloseCode code) {
void Control::OnClosed(const string id, const bool force_queuing) {
if (force_queuing || webrtc_thread_ != rtc::Thread::Current()) {
ControlMessageData *data = new ControlMessageData(id, ref_);
if (webrtc_thread_ != rtc::Thread::Current()) {
ControlMessageData *data = new ControlMessageData(channel, ref_);
// Call Control::OnPeerDisconnected()
webrtc_thread_->Post(this, MSG_ON_PEER_CLOSED, data);
LOGP_F( INFO ) << "Queued, id is " << id;
webrtc_thread_->Post(this, MSG_ON_PEER_CLOSE, data);
LOGP_F( INFO ) << "Queued, channel is " << channel;
return;
}
if ( observer_ == nullptr ) {
LOGP_F( WARNING ) << "observer_ is null, id is " << id;
LOGP_F( INFO ) << "Enter, channel is " << channel;
if ( pc_ == nullptr ) {
LOGP_F( WARNING ) << "pc_ is null, channel is " << channel;
return;
}
LOGP_F( INFO ) << "Calling OnPeerDisconnected, id is " << id;
observer_->OnPeerDisconnected(id);
pc_->OnClose( channel, code );
if (peers_.size() == 0) {
LOGP_F( INFO ) << "peers_ has been empty. id is " << id;
OnSignedOut(open_id_);
}
LOGP_F( INFO ) << "Done, id is " << id;
LOGP_F( INFO ) << "Done, channel is " << channel;
}
void Control::OnClosed(const string id) {
OnClosed( id, QUEUEING_ON );
}
//
// Signal receiving data
//
void Control::OnMessage(const string& id, const char* buffer, const size_t size) {
if ( observer_ == nullptr ) {
LOGP_F( WARNING ) << "observer_ is null, id is " << id;
void Control::OnPeerMessage(const string& channel, const char* buffer, const size_t size) {
if ( pc_ == nullptr ) {
LOGP_F( WARNING ) << "pc_ is null, channel is " << channel;
return;
}
observer_->OnPeerMessage(id, buffer, size);
pc_->OnMessage(channel, buffer, size);
}
void Control::OnWritable(const string& id) {
if ( observer_ == nullptr ) {
LOGP_F( WARNING ) << "observer_ is null, id is " << id;
void Control::OnPeerWritable(const string& channel) {
if ( pc_ == nullptr ) {
LOGP_F( WARNING ) << "pc_ is null, channel is " << channel;
return;
}
observer_->OnPeerWritable(id);
pc_->OnWritable(channel);
}
void Control::OnError( const string id, const string& reason ) {
if ( observer_ == nullptr ) {
LOGP_F( WARNING ) << "observer_ is null, id is " << id;
return;
}
observer_->OnError( id, reason );
}
void Control::RegisterObserver(ControlObserver* observer, std::shared_ptr<Control> ref) {
ref_ = ref;
observer_ = observer;
pc_ = observer;
LOGP_F( INFO ) << "Registered";
}
void Control::UnregisterObserver() {
observer_ = nullptr;
pc_ = nullptr;
ref_.reset();
LOGP_F( INFO ) << "Unregistered";
@ -325,23 +313,24 @@ void Control::OnMessage(rtc::Message* msg) {
param = static_cast<ControlMessageData*>(msg->pdata);
OnCommandReceived(param->data_json_);
break;
case MSG_CLOSE:
param = static_cast<ControlMessageData*>(msg->pdata);
Close((CloseCode)param->data_int32_);
break;
case MSG_CLOSE_PEER:
param = static_cast<ControlMessageData*>(msg->pdata);
Close(param->data_string_, QUEUEING_OFF);
ClosePeer(param->data_string_, (CloseCode) param->data_int32_);
break;
case MSG_ON_PEER_CLOSED:
case MSG_ON_PEER_CLOSE:
param = static_cast<ControlMessageData*>(msg->pdata);
OnClosed(param->data_string_, QUEUEING_OFF);
OnPeerClose(param->data_string_, (CloseCode) param->data_int32_);
break;
case MSG_SIGNOUT:
case MSG_ON_SIGLAL_CONNECTION_CLOSE:
param = static_cast<ControlMessageData*>(msg->pdata);
SignOut();
break;
case MSG_SIGNAL_SERVER_CLOSED:
param = static_cast<ControlMessageData*>(msg->pdata);
OnSignedOut(param->data_string_);
Close((CloseCode)param->data_int32_);
break;
default:
LOGP_F( WARNING ) << "Unknown message";
break;
}
@ -370,17 +359,20 @@ void Control::OnCommandReceived(const Json::Value& message) {
peer_id.clear();
}
if (command == "signin") {
OnSignedIn(data);
if (command == "open") {
OnOpen(data);
}
else if (command == "channelcreated") {
OnChannelCreated(data);
else if (command == "channelcreate") {
OnChannelCreate(data);
}
else if (command == "channeljoined") {
OnChannelJoined(data);
else if (command == "channeljoin") {
OnChannelJoin(data);
}
else if (command == "channelleaved") {
OnChannelLeaved(data);
else if (command == "channelleave") {
OnChannelLeave(data);
}
else if (command == "peerclosed") {
OnRemotePeerClose(peer_id, data);
}
else if (command == "createoffer") {
CreateOffer(data);
@ -403,35 +395,14 @@ void Control::OnSignalCommandReceived(const Json::Value& message) {
}
void Control::OnSignalConnectionClosed(websocketpp::close::status::value code) {
LOGP_F(INFO) << "Calling OnSignalConnectionClosed() with " << code;
if (code == websocketpp::close::status::normal) {
ControlMessageData *data = new ControlMessageData(open_id_, ref_);
webrtc_thread_->Post(this, MSG_SIGNAL_SERVER_CLOSED, data);
LOGP_F(INFO) << "Enter, code is " << code;
if (code != websocketpp::close::status::normal) {
ControlMessageData *data = new ControlMessageData(CLOSE_SIGNAL_ERROR, ref_);
webrtc_thread_->Post(this, MSG_ON_SIGLAL_CONNECTION_CLOSE, data);
}
LOGP_F( INFO ) << "Done";
}
void Control::OnSignedOut(const string& id) {
LOGP_F( INFO ) << "Calling OnSignedOut() with " << id;
if ( signal_ == nullptr || signal_->opened() ) {
LOGP_F( WARNING ) << "signal_ is null or not opened";
return;
}
if ( peers_.size() != 0 ) {
LOGP_F( WARNING ) << "peers_ is empty";
return;
}
if ( observer_ != nullptr ) {
observer_->OnSignedOut( id );
}
LOGP_F( INFO ) << "Done";
}
//
// Commands to signal server
//
@ -526,24 +497,24 @@ void Control::AddIceCandidate(const string& peer_id, const Json::Value& data) {
//
// 'signin' command
// 'open' command
//
void Control::OnSignedIn(const Json::Value& data) {
void Control::OnOpen(const Json::Value& data) {
bool result;
if (!rtc::GetBoolFromJsonObject(data, "result", &result)) {
LOGP_F(WARNING) << "Unknown signin response";
LOGP_F(WARNING) << "Unknown open response";
return;
}
if (!result) {
LOGP_F(LERROR) << "Signin failed";
LOGP_F(LERROR) << "Open failed";
return;
}
string session_id;
if (!rtc::GetStringFromJsonObject(data, "session_id", &session_id)) {
LOGP_F(LERROR) << "Signin failed - no session_id";
LOGP_F(LERROR) << "Open failed - no session_id";
return;
}
@ -553,61 +524,67 @@ void Control::OnSignedIn(const Json::Value& data) {
// Create channel
//
CreateChannel(open_id_);
CreateChannel(channel_);
LOGP_F( INFO ) << "Done";
}
void Control::OnChannelCreated(const Json::Value& data) {
void Control::OnChannelCreate(const Json::Value& data) {
bool result;
if (!rtc::GetBoolFromJsonObject(data, "result", &result)) {
LOGP_F(WARNING) << "Unknown signin response";
LOGP_F(WARNING) << "Unknown open response";
pc_->OnClose(channel_, CLOSE_SIGNAL_ERROR);
return;
}
string channel;
if (!rtc::GetStringFromJsonObject(data, "name", &channel)) {
pc_->OnClose(channel_, CLOSE_SIGNAL_ERROR);
LOGP_F(LERROR) << "Create channel failed - no channel name";
return;
}
if (!result) {
LOGP_F(LERROR) << "Create channel failed";
string reason;
if (!rtc::GetStringFromJsonObject(data, "reason", &reason)) {
reason = "Unknown reason";
string desc;
if (!rtc::GetStringFromJsonObject(data, "desc", &desc)) {
desc = "Unknown reason";
}
observer_->OnError(channel, reason);
pc_->OnClose(channel, CLOSE_SIGNAL_ERROR, desc);
return;
}
observer_->OnSignedIn(channel);
pc_->OnOpen(channel);
LOGP_F( INFO ) << "Done";
}
void Control::OnChannelJoined(const Json::Value& data) {
void Control::OnChannelJoin(const Json::Value& data) {
bool result;
LOGP_F(INFO) << "OnChannelJoined(" << data.toStyledString() << ")";
if (!rtc::GetBoolFromJsonObject(data, "result", &result)) {
pc_->OnClose( "", CLOSE_SIGNAL_ERROR );
LOGP_F(LERROR) << "Unknown channel join response";
return;
}
string channel;
if (!rtc::GetStringFromJsonObject(data, "name", &channel)) {
pc_->OnClose( "", CLOSE_SIGNAL_ERROR );
LOGP_F(LERROR) << "Join channel failed - no channel name";
return;
}
if (!result) {
LOGP_F(LERROR) << "Join channel failed";
string reason;
if (!rtc::GetStringFromJsonObject(data, "reason", &reason)) {
reason = "Unknown reason";
string desc;
if (!rtc::GetStringFromJsonObject(data, "desc", &desc)) {
desc = "Unknown reason";
}
observer_->OnError(channel, reason);
pc_->OnClose( channel, CLOSE_SIGNAL_ERROR, desc );
return;
}
@ -619,11 +596,15 @@ void Control::OnChannelJoined(const Json::Value& data) {
// 'leave' command
//
void Control::OnChannelLeaved(const Json::Value& data) {
// Nothing
void Control::OnChannelLeave(const Json::Value& data) {
// Do nothing
}
void Control::OnRemotePeerClose(const string& peer_id, const Json::Value& data) {
ClosePeer( peer_id, CLOSE_NORMAL );
}
//
// 'createoffer' command
//
@ -643,10 +624,10 @@ void Control::CreateOffer(const Json::Value& data) {
return;
}
Peer peer = new rtc::RefCountedObject<PeerControl>(open_id_, remote_id, this, peer_connection_factory_);
Peer peer = new rtc::RefCountedObject<PeerControl>(channel_, remote_id, this, peer_connection_factory_);
if ( !peer->Initialize() ) {
LOGP_F( LERROR ) << "Peer initialization failed";
OnClosed( remote_id );
OnPeerClose( remote_id, CLOSE_ABNORMAL );
return;
}
@ -670,10 +651,10 @@ void Control::ReceiveOfferSdp(const string& peer_id, const Json::Value& data) {
return;
}
Peer peer = new rtc::RefCountedObject<PeerControl>(open_id_, peer_id, this, peer_connection_factory_);
Peer peer = new rtc::RefCountedObject<PeerControl>(channel_, peer_id, this, peer_connection_factory_);
if ( !peer->Initialize() ) {
LOGP_F( LERROR ) << "Peer initialization failed";
OnClosed( peer_id );
OnPeerClose( peer_id, CLOSE_ABNORMAL );
return;
}

View File

@ -46,28 +46,25 @@ public:
void Send(const string to, const char* buffer, const size_t size);
bool SyncSend(const string to, const char* buffer, const size_t size);
void SignIn(const string& user_id, const string& user_password, const string& open_id);
void SignOut();
void Connect(const string id);
void Close();
bool IsWritable(const string id);
void Open(const string& user_id, const string& user_password, const string& channel);
void Close(const CloseCode code, bool force_queueing = FORCE_QUEUING_OFF);
void Connect(const string channel);
bool IsWritable(const string channel);
void OnCommandReceived(const Json::Value& message);
void OnSignalCommandReceived(const Json::Value& message);
void OnSignalConnectionClosed(websocketpp::close::status::value code);
void OnSignedOut(const string& id);
//
// PeerObserver implementation
//
virtual void SendCommand(const string& id, const string& command, const Json::Value& data);
virtual void Close(const string id);
virtual void OnConnected(const string id);
virtual void OnClosed(const string id);
virtual void OnMessage(const string& id, const char* buffer, const size_t size);
virtual void OnWritable(const string& id);
virtual void OnError( const string id, const string& reason );
virtual void SendCommand(const string& channel, const string& command, const Json::Value& data);
virtual void ClosePeer( const string channel, const CloseCode code, bool force_queueing = FORCE_QUEUING_OFF );
virtual void OnPeerConnect(const string channel);
virtual void OnPeerClose(const string channel, const CloseCode code);
virtual void OnPeerMessage(const string& channel, const char* buffer, const size_t size);
virtual void OnPeerWritable(const string& channel);
// Register/Unregister observer
@ -87,19 +84,17 @@ protected:
void ReceiveOfferSdp(const string& peer_id, const Json::Value& data);
void ReceiveAnswerSdp(const string& peer_id, const Json::Value& data);
void OnSignedIn(const Json::Value& data);
void OnChannelCreated(const Json::Value& data);
void OnChannelJoined(const Json::Value& data);
void OnChannelLeaved(const Json::Value& data);
void Close( const string id, const bool force_queuing);
void OnClosed(const string id, const bool force_queuing);
void OnOpen(const Json::Value& data);
void OnChannelCreate(const Json::Value& data);
void OnChannelJoin(const Json::Value& data);
void OnChannelLeave(const Json::Value& data);
void OnRemotePeerClose(const string& peer_id, const Json::Value& data);
// open_id_: other peers can find this peer by open_id_ and it is user_id or alias
// channel_: A name of local channel. Other peers can find this peer by channel_
// user_id_: A user id to sign in signal server (could be 'anonymous' for guest user)
// session_id_: A unique id for signal server connection
string open_id_;
string channel_;
string user_id_;
string session_id_;
@ -114,16 +109,12 @@ protected:
private:
const bool QUEUEING_ON = true;
const bool QUEUEING_OFF = false;
enum {
MSG_COMMAND_RECEIVED, // Command has been received from signal server
MSG_CLOSE, // Queue signout request
MSG_CLOSE_PEER, // Close peer
MSG_ON_PEER_CLOSED, // Peer has been closed
MSG_SIGNOUT, // Queue signout request
MSG_SIGNAL_SERVER_CLOSED // Connection to signal server has been closed
MSG_ON_PEER_CLOSE, // Peer has been closed
MSG_ON_SIGLAL_CONNECTION_CLOSE // Connection to signal server has been closed
};
struct ControlMessageData : public rtc::MessageData {
@ -139,7 +130,7 @@ private:
};
rtc::Thread* webrtc_thread_;
ControlObserver* observer_;
ControlObserver* pc_;
std::shared_ptr<Control> ref_;
};

View File

@ -7,17 +7,17 @@
#ifndef __PEERCONNECT_CONTROLOBSERVER_H__
#define __PEERCONNECT_CONTROLOBSERVER_H__
#include "common.h"
namespace pc {
class ControlObserver {
public:
virtual void OnSignedIn(const std::string id) = 0;
virtual void OnSignedOut(const std::string id) = 0;
virtual void OnPeerConnected(const std::string id) = 0;
virtual void OnPeerDisconnected(const std::string id) = 0;
virtual void OnPeerMessage(const std::string id, const char* buffer, const size_t size) = 0;
virtual void OnPeerWritable(const std::string id) = 0;
virtual void OnError(const std::string id, const std::string& reason) = 0;
virtual void OnOpen(const std::string channel) = 0;
virtual void OnClose(const std::string channel, const pc::CloseCode code, const std::string desc = "") = 0;
virtual void OnConnect(const std::string channel) = 0;
virtual void OnMessage(const std::string channel, const char* buffer, const size_t size) = 0;
virtual void OnWritable(const std::string channel) = 0;
};
} // namespace pc

View File

@ -89,7 +89,7 @@ bool PeerControl::IsWritable() {
return local_data_channel_->IsWritable();
}
void PeerControl::Close() {
void PeerControl::Close(const CloseCode code) {
LOGP_F_IF(state_ != pOpen, WARNING) << "Closing peer when it is not opened";
if ( state_ == pClosing || state_ == pClosed ) {
@ -102,12 +102,20 @@ void PeerControl::Close() {
LOGP_F( INFO ) << "Close data-channel of remote_id_ " << remote_id_;
if ( peer_connection_ ) {
peer_connection_->Close();
}
else {
LOGP_F( WARNING ) << "peer_connection_ is nullptr ";
state_ = pClosed;
peer_connection_ = nullptr;
// As entering here, we can make sure that
// - PeerDataChannelObserver::OnStateChange() had been called with kClosed
// - PeerControl::OnIceConnectionChange() will be ignored,
// ether kIceConnectionClosed and kIceConnectionDisconnected.
// That's because we didn't call peer_connection_->Close().
}
state_ = pClosed;
control_->OnPeerClose(remote_id_, code);
}
@ -158,6 +166,12 @@ void PeerControl::OnSignalingChange(webrtc::PeerConnectionInterface::SignalingSt
}
void PeerControl::OnIceConnectionChange(webrtc::PeerConnectionInterface::IceConnectionState new_state) {
//
// Closing sequence
// kIceConnectionDisconnected -> kIceConnectionClosed
//
switch (new_state) {
case webrtc::PeerConnectionInterface::IceConnectionState::kIceConnectionClosed:
//
@ -165,7 +179,7 @@ void PeerControl::OnIceConnectionChange(webrtc::PeerConnectionInterface::IceConn
// Notify it to Control so the Control will remove peer in peers_
//
LOGP_F( INFO ) << "new_state is " << "kIceConnectionClosed";
OnPeerClosed();
OnPeerDisconnected();
break;
case webrtc::PeerConnectionInterface::IceConnectionState::kIceConnectionDisconnected:
@ -218,6 +232,11 @@ void PeerControl::OnSuccess(webrtc::SessionDescriptionInterface* desc) {
if (!desc->ToString(&sdp)) return;
if ( state_ != pConnecting ) {
LOGP_F( WARNING ) << "Invalid state";
return;
}
// Set local description
SetLocalDescription(desc->type(), sdp);
@ -248,22 +267,13 @@ void PeerControl::OnPeerOpened() {
// Fianlly, data-channel has been opened.
state_ = pOpen;
control_->OnConnected(remote_id_);
control_->OnWritable(local_id_);
control_->OnPeerConnect(remote_id_);
control_->OnPeerWritable(local_id_);
}
LOGP_F( INFO ) << "Done";
}
void PeerControl::OnPeerClosed() {
ASSERT( state_ == pClosing );
state_ = pClosed;
control_->OnClosed(remote_id_);
}
void PeerControl::OnPeerDisconnected() {
if ( state_ == pClosed ) {
@ -275,13 +285,19 @@ void PeerControl::OnPeerDisconnected() {
return;
}
control_->Close( remote_id_ );
//
// As entering here, we can make sure that the remote peer has been
// disconnected abnormally, because previous state_ is not pClosing.
// It will be in state pClosing if a user calls Close() manually
//
control_->ClosePeer( remote_id_, CLOSE_GOING_AWAY );
}
void PeerControl::OnPeerMessage(const webrtc::DataBuffer& buffer) {
string data;
control_->OnMessage(remote_id_, buffer.data.data<char>(), buffer.data.size());
control_->OnPeerMessage(remote_id_, buffer.data.data<char>(), buffer.data.size());
}
void PeerControl::OnBufferedAmountChange(const uint64_t previous_amount) {
@ -289,7 +305,7 @@ void PeerControl::OnBufferedAmountChange(const uint64_t previous_amount) {
LOGP_F( LERROR ) << "local_data_channel_ is not writable";
return;
}
control_->OnWritable( remote_id_ );
control_->OnPeerWritable( remote_id_ );
}

View File

@ -15,6 +15,7 @@
#include "webrtc/base/scoped_ref_ptr.h"
#include "webrtc/api/jsep.h"
#include "webrtc/base/json.h"
#include "common.h"
namespace pc {
@ -24,14 +25,12 @@ namespace pc {
class PeerObserver {
public:
virtual void SendCommand(const std::string& id, const std::string& command, const Json::Value& data) = 0;
virtual void Close(const std::string id) = 0;
virtual void OnConnected(const std::string id) = 0;
virtual void OnClosed(const std::string id) = 0;
virtual void OnMessage(const std::string& id, const char* buffer, const size_t size) = 0;
virtual void OnWritable(const std::string& id) = 0;
virtual void OnError( const std::string id, const std::string& reason ) = 0;
virtual void SendCommand(const std::string& channel, const std::string& command, const Json::Value& data) = 0;
virtual void ClosePeer(const std::string channel, const pc::CloseCode code, bool force_queuing = FORCE_QUEUING_OFF ) = 0;
virtual void OnPeerConnect(const std::string channel) = 0;
virtual void OnPeerClose(const std::string channel, const pc::CloseCode code) = 0;
virtual void OnPeerMessage(const std::string& channel, const char* buffer, const size_t size) = 0;
virtual void OnPeerWritable(const std::string& channel) = 0;
};
class PeerDataChannelObserver;
@ -77,7 +76,7 @@ public:
bool Send(const char* buffer, const size_t size);
bool SyncSend(const char* buffer, const size_t size);
bool IsWritable();
void Close();
void Close(const CloseCode code);
//
// PeerConnection
@ -116,7 +115,6 @@ public:
//
void OnPeerOpened();
void OnPeerClosed();
void OnPeerDisconnected();
void OnPeerMessage(const webrtc::DataBuffer& buffer);
void OnBufferedAmountChange(const uint64_t previous_amount);

View File

@ -13,32 +13,27 @@
namespace pc {
PeerConnect::PeerConnect()
: PeerConnect( "" ) {
}
PeerConnect::PeerConnect( const string options ) {
PeerConnect::PeerConnect( const string channel ) {
// Log level
#if DEBUG || _DEBUG
rtc::LogMessage::LogToDebug( rtc::LS_NONE );
pc::LogMessage::LogToDebug( pc::LS_VERBOSE );
pc::LogMessage::LogToDebug( pc::WARNING );
#else
rtc::LogMessage::LogToDebug( rtc::LS_NONE );
pc::LogMessage::LogToDebug( pc::LS_NONE );
#endif
// parse settings
if ( !options.empty() ) {
ParseOptions( options );
string local_channel;
if ( channel.empty() ) {
local_channel = rtc::CreateRandomUuid();
}
else {
local_channel = channel;
}
signout_ = false;
// create signal client
if ( signal_ == nullptr ) {
signal_ = std::make_shared<pc::Signal>( setting_.signal_uri_ );
}
channel_ = local_channel;
close_once_ = false;
LOGP_F( INFO ) << "Done";
}
@ -58,17 +53,21 @@ void PeerConnect::Stop() {
}
void PeerConnect::SignIn( const string alias, const string id, const string password ) {
//
// Check if already signed in
//
void PeerConnect::Open() {
if ( control_.get() != nullptr ) {
LOGP_F( WARNING ) << "Already signined in.";
LOGP_F( WARNING ) << "Already open.";
return;
}
//
// create signal client
//
if ( signal_ == nullptr ) {
signal_ = std::make_shared<pc::Signal>( setting_.signal_uri_ );
}
//
// Initialize control
//
@ -91,78 +90,76 @@ void PeerConnect::SignIn( const string alias, const string id, const string pass
return;
}
//
// Set user_id and open_id
//
string user_id;
string open_id;
user_id = tolower( id );
if ( user_id == "anonymous" ) user_id = "";
open_id = tolower( alias );
if ( open_id.empty() ) open_id = tolower( rtc::CreateRandomUuid() );
//
// Connect to signal server
//
control_->SignIn( user_id, password, open_id );
control_->Open( setting_.signal_id_, setting_.signal_password_, channel_ );
LOGP_F( INFO ) << "Done";
return;
}
void PeerConnect::SignOut() {
signout_ = true;
control_->SignOut();
void PeerConnect::Close( const string channel ) {
if ( channel.empty() || channel == channel_ ) {
control_->Close( CLOSE_NORMAL, FORCE_QUEUING_ON );
signal_->SyncClose();
}
else {
control_->ClosePeer( channel, CLOSE_NORMAL, FORCE_QUEUING_ON );
}
LOGP_F( INFO ) << "Done";
}
void PeerConnect::Connect( const string id ) {
control_->Connect( id );
LOGP_F( INFO ) << "Done, id is " << id;
void PeerConnect::Connect( const string channel ) {
control_->Connect( channel );
LOGP_F( INFO ) << "Done, channel is " << channel;
return;
}
void PeerConnect::Disconnect( const string id ) {
control_->Close( id );
LOGP_F( INFO ) << "Done, id is " << id;
return;
}
//
// Send message to destination peer session id
//
void PeerConnect::Send( const string& id, const char* buffer, const size_t size ) {
control_->Send( id, buffer, size );
bool PeerConnect::Send( const string& channel, const char* buffer, const size_t size, const bool wait ) {
if ( wait ) {
//
// Synchronous send returns true or false
// and a timeout is 60*1000 ms by default.
//
return control_->SyncSend( channel, buffer, size );
}
else {
control_->Send( channel, buffer, size );
//
// Asyncronous send always returns true and
// trigger 'close' event with CloseCode if failed
//
return true;
}
}
void PeerConnect::Send( const string& id, const char* message ) {
Send( id, message, strlen( message ) );
bool PeerConnect::Send( const string& channel, const char* message, const bool wait ) {
return Send( channel, message, strlen( message ), wait );
}
void PeerConnect::Send( const string& id, const string& message ) {
Send( id, message.c_str(), message.size() );
bool PeerConnect::Send( const string& channel, const string& message, const bool wait ) {
return Send( channel, message.c_str(), message.size(), wait );
}
bool PeerConnect::SyncSend( const string& id, const char* buffer, const size_t size ) {
return control_->SyncSend( id, buffer, size );
bool PeerConnect::SetOptions( const string options ) {
// parse settings
if ( !options.empty() ) {
return ParseOptions( options );
}
return true;
}
bool PeerConnect::SyncSend( const string& id, const char* message ) {
return SyncSend( id, message, strlen( message ) );
}
bool PeerConnect::SyncSend( const string& id, const string& message ) {
return SyncSend( id, message.c_str(), message.size() );
}
std::string PeerConnect::CreateRandomUuid() {
return rtc::CreateRandomUuid();
@ -171,24 +168,58 @@ std::string PeerConnect::CreateRandomUuid() {
//
// Register Event handler
//
PeerConnect& PeerConnect::On( string event_id, std::function<void( PeerConnect*, string )> handler ) {
PeerConnect& PeerConnect::On( string event_id, std::function<void( string )> handler ) {
if ( event_id.empty() ) return *this;
std::unique_ptr<EventHandler_2> f( new EventHandler_2( handler ) );
event_handler_.insert( Events::value_type( event_id, std::move( f ) ) );
LOGP_F( INFO ) << "An event handler '" << event_id << "' has been inserted";
if ( event_id == "open" || event_id == "connect" || event_id == "writable" ) {
std::unique_ptr<EventHandler_2> f( new EventHandler_2( handler ) );
event_handler_.insert( Events::value_type( event_id, std::move( f ) ) );
LOGP_F( INFO ) << "An event handler '" << event_id << "' has been inserted";
}
else {
LOGP_F( LERROR ) << "Unsupported event type: " << event_id;
}
return *this;
}
//
// Register Message handler
//
PeerConnect& PeerConnect::On( string event_id, std::function<void( string, string )> handler ) {
if ( event_id.empty() ) return *this;
LOGP_F( LERROR ) << "Unsupported event type: " << event_id;
return *this;
}
PeerConnect& PeerConnect::On( string event_id, std::function<void( string, pc::CloseCode, string )> handler ) {
if ( event_id.empty() ) return *this;
if ( event_id == "close" ) {
std::unique_ptr<EventHandler_Close> f( new EventHandler_Close( handler ) );
event_handler_.insert( Events::value_type( event_id, std::move( f ) ) );
LOGP_F( INFO ) << "An event handler '" << event_id << "' has been inserted";
}
else {
LOGP_F( LERROR ) << "Unsupported event type: " << event_id;
}
return *this;
}
PeerConnect& PeerConnect::On( string event_id, std::function<void( string, Buffer& )> handler ) {
if ( event_id.empty() ) return *this;
if ( event_id == "message" ) {
std::unique_ptr<EventHandler_Message> f( new EventHandler_Message( handler ) );
event_handler_.insert( Events::value_type( event_id, std::move( f ) ) );
LOGP_F( INFO ) << "An event handler '" << event_id << "' has been inserted";
}
else {
LOGP_F( LERROR ) << "Unsupported event type: " << event_id;
}
PeerConnect& PeerConnect::OnMessage( std::function<void( PeerConnect*, string, Buffer& )> handler ) {
message_handler_ = handler;
LOGP_F( INFO ) << "A message handler has been inserted";
return *this;
}
@ -196,75 +227,68 @@ PeerConnect& PeerConnect::OnMessage( std::function<void( PeerConnect*, string, B
// Signal event handler
//
void PeerConnect::OnSignedIn( const string id ) {
signout_ = false;
void PeerConnect::OnOpen( const string channel ) {
close_once_ = false;
if ( event_handler_.find( "signin" ) == event_handler_.end() ) {
return;
if ( event_handler_.find( "open" ) != event_handler_.end() ) {
CallEventHandler( "open", channel );
}
CallEventHandler( "signin", this, id );
LOGP_F( INFO ) << "Done";
}
void PeerConnect::OnSignedOut( const string id ) {
if ( !signout_ ) {
LOGP_F( WARNING ) << "signout_ is false, id is " << id;
return;
void PeerConnect::OnClose( const string channel, const CloseCode code, const string desc ) {
// This instance of PeerConnect and local channel is going to be closed
if ( channel == channel_ ) {
if ( close_once_ ) {
LOGP_F( WARNING ) << "close_ is false, channel is " << channel;
return;
}
close_once_ = true;
if ( event_handler_.find( "close" ) != event_handler_.end() ) {
CallEventHandler( "close", channel, code, desc );
}
control_->UnregisterObserver();
control_.reset();
}
// Remote peer has been closed
else {
if ( event_handler_.find( "close" ) != event_handler_.end() ) {
CallEventHandler( "close", channel, code, desc );
}
}
if ( event_handler_.find( "signout" ) == event_handler_.end() ) {
return;
}
CallEventHandler( "signout", this, id );
control_->UnregisterObserver();
control_.reset();
LOGP_F( INFO ) << "Done, id is " << id;
LOGP_F( INFO ) << "Done, channel is " << channel;
}
void PeerConnect::OnPeerConnected( const string id ) {
if ( event_handler_.find( "connect" ) == event_handler_.end() ) {
return;
void PeerConnect::OnConnect( const string channel ) {
if ( event_handler_.find( "connect" ) != event_handler_.end() ) {
CallEventHandler( "connect", channel );
}
CallEventHandler( "connect", this, id );
LOGP_F( INFO ) << "Done, id is " << id;
LOGP_F( INFO ) << "Done, channel is " << channel;
}
void PeerConnect::OnPeerDisconnected( const string id ) {
if ( event_handler_.find( "disconnect" ) == event_handler_.end() ) {
return;
}
CallEventHandler( "disconnect", this, id );
LOGP_F( INFO ) << "Done, id is " << id;
}
void PeerConnect::OnPeerMessage( const string id, const char* buffer, const size_t size ) {
void PeerConnect::OnMessage( const string channel, const char* buffer, const size_t size ) {
Buffer buf( buffer, size );
message_handler_( this, id, buf );
if ( event_handler_.find( "message" ) != event_handler_.end() ) {
CallEventHandler( "message", channel, buf );
}
}
void PeerConnect::OnPeerWritable( const string id ) {
if ( event_handler_.find( "writable" ) == event_handler_.end() ) {
return;
void PeerConnect::OnWritable( const string channel ) {
if ( event_handler_.find( "writable" ) != event_handler_.end() ) {
CallEventHandler( "writable", channel );
}
CallEventHandler( "writable", this, id );
LOGP_F( INFO ) << "Done, id is " << id;
LOGP_F( INFO ) << "Done, channel is " << channel;
}
void PeerConnect::OnError( const string id, const string& reason ) {
if ( event_handler_.find( "error" ) == event_handler_.end() ) {
return;
}
error_reason_ = reason;
CallEventHandler( "error", this, id );
LOGP_F( INFO ) << "Done, id is " << id << " and reason is " << reason;
}
template<typename ...A>
void PeerConnect::CallEventHandler( string msg_id, A&& ... args )
@ -303,14 +327,4 @@ bool PeerConnect::ParseOptions( const string& options ) {
return true;
}
std::string PeerConnect::tolower( const string& str ) {
std::locale loc;
string lower_str;
for ( auto elem : str ) {
lower_str += std::tolower( elem, loc );
}
return lower_str;
}
} // namespace pc

View File

@ -12,10 +12,9 @@
#include <memory>
#include <functional>
#include "common.h"
#include "controlobserver.h"
#define function_pc [&]
namespace pc {
class Control;
@ -52,32 +51,28 @@ public:
static void Run();
static void Stop();
void SignIn( const string alias = "", const string id = "", const string password = "" );
void SignOut();
void Connect( const string id );
void Disconnect( const string id );
void Send( const string& id, const char* buffer, const size_t size );
void Send( const string& id, const char* buffer );
void Send( const string& id, const string& message );
bool SyncSend( const string& id, const char* buffer, const size_t size );
bool SyncSend( const string& id, const char* buffer );
bool SyncSend( const string& id, const string& message );
string GetErrorMessage() { return error_reason_; }
static std::string CreateRandomUuid();
PeerConnect& On( string event_id, std::function<void( PeerConnect*, string )> );
PeerConnect& OnMessage( std::function<void( PeerConnect*, string, Buffer& )> );
void Open();
void Close( const string channel = "" );
void Connect( const string channel );
bool Send( const string& channel, const char* buffer, const size_t size, const bool wait = WAITING_OFF );
bool Send( const string& channel, const char* buffer, const bool wait = WAITING_OFF );
bool Send( const string& channel, const string& message, const bool wait = WAITING_OFF );
bool SetOptions( const string options );
PeerConnect& On( string event_id, std::function<void( string )> );
PeerConnect& On( string event_id, std::function<void( string, string )> );
PeerConnect& On( string event_id, std::function<void( string, pc::CloseCode, string )> );
PeerConnect& On( string event_id, std::function<void( string, Buffer& )> );
//
// Member functions
//
explicit PeerConnect();
explicit PeerConnect( string options );
explicit PeerConnect( const string channel = "" );
~PeerConnect();
static std::string CreateRandomUuid();
protected:
// The base type that is stored in the collection.
@ -96,9 +91,11 @@ protected:
template<typename ...A>
void CallEventHandler( string msg_id, A&& ... args );
using EventHandler_1 = EventHandler_t<PeerConnect*>;
using EventHandler_2 = EventHandler_t<PeerConnect*, string>;
using EventHandler_3 = EventHandler_t<PeerConnect*, string, Data&>;
using EventHandler_1 = EventHandler_t<>;
using EventHandler_2 = EventHandler_t<string>;
using EventHandler_3 = EventHandler_t<string, Data&>;
using EventHandler_Close = EventHandler_t<string, pc::CloseCode, string>;
using EventHandler_Message = EventHandler_t<string, Buffer>;
using Events = std::map<string, std::unique_ptr<Handler_t>>;
using MessageHandler = std::function<void( PeerConnect*, string, Buffer& )>;
@ -106,27 +103,22 @@ protected:
// ControlObserver implementation
//
void OnSignedIn( const string id );
void OnSignedOut( const string id );
void OnPeerConnected( const string id );
void OnPeerDisconnected( const string id );
void OnPeerMessage( const string id, const char* buffer, const size_t size );
void OnPeerWritable( const string id );
void OnError( const string id, const string& reason );
void OnOpen( const string channel );
void OnClose( const string channel, const pc::CloseCode code, const string desc = "" );
void OnConnect( const string channel );
void OnMessage( const string channel, const char* buffer, const size_t size );
void OnWritable( const string channel );
bool ParseOptions( const string& options );
std::string tolower( const string& str );
bool signout_;
bool close_once_;
Setting setting_;
Events event_handler_;
MessageHandler message_handler_;
std::shared_ptr<Control> control_;
std::shared_ptr<Signal> signal_;
string error_reason_;
string channel_;
};

View File

@ -4,31 +4,6 @@
* Ryan Lee
*/
/*
Reference: socket.io-client-cpp
Copyright (c) 2015, Melo Yao
All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to all conditions.
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#if defined(WEBRTC_WIN)
#pragma warning(disable:4503)
#endif
@ -43,7 +18,7 @@ namespace pc {
Signal::Signal(const string url) :
con_state_(con_closed),
network_thread_(),
reconn_attempts_(0xFFFFFFFF),
reconn_attempts_(3),
reconn_made_(0),
reconn_delay_(5000),
reconn_delay_max_(25000),
@ -59,7 +34,7 @@ Signal::Signal(const string url) :
// Default settings
if (url_.empty()) {
url_ = "wss://signal.throughnet.com/hello";
url_ = "wss://signal.peers.io/hello";
}
// Initialize ASIO
@ -80,19 +55,11 @@ Signal::Signal(const string url) :
}
Signal::~Signal() {
#if _DEBUG || DEBUG
client_.clear_access_channels(websocketpp::log::alevel::all);
client_.set_access_channels(websocketpp::log::alevel::fail);
#else
client_.clear_access_channels(websocketpp::log::elevel::all);
client_.clear_access_channels(websocketpp::log::alevel::all);
#endif
Teardown();
LOGP_F( INFO ) << "Done";
}
void Signal::SignIn(const string& id, const string& password) {
void Signal::Open(const string& id, const string& password) {
user_id_ = id;
user_password_ = password;
Connect();
@ -100,11 +67,45 @@ void Signal::SignIn(const string& id, const string& password) {
LOGP_F( INFO ) << "Done";
}
void Signal::SignOut() {
if (opened()) Close();
LOGP_F( INFO ) << "Done";
void Signal::Close() {
if ( !opened() ) {
LOGP_F( WARNING ) << "It is not opened";
return;
}
con_state_ = con_closing;
client_.get_io_service().dispatch(websocketpp::lib::bind(&Signal::CloseInternal,
this,
websocketpp::close::status::normal,
"End by user"));
LOGP_F( INFO ) << "Done";
}
void Signal::SyncClose()
{
if ( !opened() ) {
LOGP_F( WARNING ) << "It is not opened";
return;
}
con_state_ = con_closing;
client_.get_io_service().dispatch(websocketpp::lib::bind(&Signal::CloseInternal,
this,
websocketpp::close::status::normal,
"End by user"));
if (network_thread_ && network_thread_->joinable() )
{
network_thread_->join();
network_thread_.reset();
}
LOGP_F( INFO ) << "Done";
}
void Signal::SendCommand(const string channel,
const string commandname,
const Json::Value& data) {
@ -148,8 +149,6 @@ void Signal::SendGlobalCommand(const string commandname,
SendCommand("", commandname, data);
}
void Signal::Connect()
{
if (reconn_timer_)
@ -184,36 +183,13 @@ void Signal::Connect()
LOGP_F( INFO ) << "Done";
}
void Signal::Close()
{
con_state_ = con_closing;
client_.get_io_service().dispatch(websocketpp::lib::bind(&Signal::CloseInternal,
this,
websocketpp::close::status::normal,
"End by user"));
LOGP_F( INFO ) << "Done";
}
void Signal::SyncClose()
{
con_state_ = con_closing;
client_.get_io_service().dispatch(websocketpp::lib::bind(&Signal::CloseInternal,
this,
websocketpp::close::status::normal,
"End by user"));
if (network_thread_)
{
network_thread_->join();
network_thread_.reset();
}
LOGP_F( INFO ) << "Done";
}
void Signal::Teardown()
{
// TODO: Asyncronous close with PeerConnect::Stop()
SyncClose();
if ( network_thread_ && network_thread_->joinable() ) {
network_thread_->detach();
network_thread_.reset();
}
LOGP_F( INFO ) << "Done";
}
@ -225,13 +201,13 @@ asio::io_service& Signal::GetIoService()
void Signal::SendSignInCommand() {
void Signal::SendOpenCommand() {
Json::Value data;
data["user_id"] = user_id_;
data["user_password"] = user_password_;
SendGlobalCommand("signin", data);
SendGlobalCommand("open", data);
}
void Signal::OnCommandReceived(Json::Value& message) {
@ -263,9 +239,9 @@ void Signal::ConnectInternal()
void Signal::CloseInternal(websocketpp::close::status::value const& code, string const& reason)
void Signal::CloseInternal(websocketpp::close::status::value const& code, string const& desc)
{
LOGP_F(WARNING) << "Close by reason:" << reason;
LOGP_F(WARNING) << "Close by reason:" << desc;
if (reconn_timer_)
{
@ -279,7 +255,7 @@ void Signal::CloseInternal(websocketpp::close::status::value const& code, string
else
{
websocketpp::lib::error_code ec;
client_.close(con_hdl_, code, reason, ec);
client_.close(con_hdl_, code, desc, ec);
}
}
@ -308,10 +284,86 @@ unsigned Signal::NextDelay() const
}
void Signal::OnOpen(websocketpp::connection_hdl con)
{
LOGP_F(WARNING) << "Connected.";
con_state_ = con_opened;
con_hdl_ = con;
reconn_made_ = 0;
SendOpenCommand();
}
void Signal::OnClose(websocketpp::connection_hdl con)
{
//
// This routine will be called if a connection disconnected.
// This routine will not be called when attempt to connection failed.
//
con_state_ = con_closed;
websocketpp::lib::error_code ec;
websocketpp::close::status::value code = websocketpp::close::status::normal;
client_type::connection_ptr conn_ptr = client_.get_con_from_hdl(con, ec);
if (ec) {
LOGP_F(LERROR) << "get conn failed" << ec;
}
else {
code = conn_ptr->get_local_close_code();
}
con_hdl_.reset();
if (code == websocketpp::close::status::normal)
{
// NOTHING
}
else
{
//
// TODO: Implement seamless signal reconnection. Don't close existing ice connection.
// Reconstruction of (creating and joining) channel has to be implemented.
//
//if (reconn_made_<reconn_attempts_)
//{
// LOGP_F(LS_WARNING) << "Reconnect for attempt:" << reconn_made_;
// unsigned delay = this->NextDelay();
// reconn_timer_.reset(new websocketpp::lib::asio::steady_timer(client_.get_io_service()));
// websocketpp::lib::asio::error_code ec;
// reconn_timer_->expires_from_now(websocketpp::lib::asio::milliseconds(delay), ec);
// reconn_timer_->async_wait(websocketpp::lib::bind(&Signal::TimeoutReconnect, this, websocketpp::lib::placeholders::_1));
// return;
//}
SignalOnClosed_(code);
}
LOGP_F( INFO ) << "Done";
}
void Signal::OnFail(websocketpp::connection_hdl con)
{
//
// This routine will be called when attempt to connection failed.
// This routine will not be called if a connection abnormally disconnected.
//
websocketpp::lib::error_code ec;
websocketpp::close::status::value code = websocketpp::close::status::abnormal_close;
client_type::connection_ptr conn_ptr = client_.get_con_from_hdl(con, ec);
if (ec) {
LOGP_F(LERROR) << "get conn failed" << ec;
}
else {
code = conn_ptr->get_local_close_code();
}
con_hdl_.reset();
con_state_ = con_closed;
LOGP_F(LERROR) << "Connection failed.";
if (reconn_made_<reconn_attempts_)
@ -323,59 +375,11 @@ void Signal::OnFail(websocketpp::connection_hdl con)
reconn_timer_->expires_from_now(websocketpp::lib::asio::milliseconds(delay), ec);
reconn_timer_->async_wait(websocketpp::lib::bind(&Signal::TimeoutReconnect, this, websocketpp::lib::placeholders::_1));
}
else {
SignalOnClosed_(code);
}
}
void Signal::OnOpen(websocketpp::connection_hdl con)
{
LOGP_F(WARNING) << "Connected.";
con_state_ = con_opened;
con_hdl_ = con;
reconn_made_ = 0;
SendSignInCommand();
}
void Signal::OnClose(websocketpp::connection_hdl con)
{
con_state_ = con_closed;
websocketpp::lib::error_code ec;
websocketpp::close::status::value code = websocketpp::close::status::normal;
client_type::connection_ptr conn_ptr = client_.get_con_from_hdl(con, ec);
if (ec) {
LOGP_F(LERROR) << "OnClose get conn failed" << ec;
}
else
{
code = conn_ptr->get_local_close_code();
}
con_hdl_.reset();
SignalOnClosed_(code);
if (code == websocketpp::close::status::normal)
{
// NOTHING
}
else
{
if (reconn_made_<reconn_attempts_)
{
LOGP_F(LS_WARNING) << "Reconnect for attempt:" << reconn_made_;
unsigned delay = this->NextDelay();
reconn_timer_.reset(new websocketpp::lib::asio::steady_timer(client_.get_io_service()));
websocketpp::lib::asio::error_code ec;
reconn_timer_->expires_from_now(websocketpp::lib::asio::milliseconds(delay), ec);
reconn_timer_->async_wait(websocketpp::lib::bind(&Signal::TimeoutReconnect, this, websocketpp::lib::placeholders::_1));
return;
}
}
LOGP_F( INFO ) << "Done";
}
void Signal::OnMessage(websocketpp::connection_hdl con, client_type::message_ptr msg)
{

View File

@ -52,8 +52,8 @@ namespace pc {
class SignalInterface {
public:
virtual void SignIn(const std::string& id, const std::string& password) = 0;
virtual void SignOut() = 0;
virtual void Open(const std::string& id, const std::string& password) = 0;
virtual void Close() = 0;
virtual void SendCommand(const std::string id,
const std::string commandname,
@ -95,9 +95,9 @@ public:
Signal(const string url);
~Signal();
void SignIn(const string& id, const string& password);
void SignOut();
void Open(const string& id, const string& password);
void Close();
void SyncClose();
void SendCommand(const string channel,
const string commandname,
@ -115,17 +115,15 @@ public:
protected:
void Connect();
void Close();
void SyncClose();
asio::io_service& GetIoService();
private:
void SendSignInCommand();
void SendOpenCommand();
void OnCommandReceived(Json::Value& message);
void RunLoop();
void ConnectInternal();
void CloseInternal(websocketpp::close::status::value const& code, string const& reason);
void CloseInternal(websocketpp::close::status::value const& code, string const& desc);
void TimeoutReconnect(websocketpp::lib::asio::error_code const& ec);
unsigned NextDelay() const;
@ -153,7 +151,7 @@ private:
unsigned reconn_attempts_;
unsigned reconn_made_;
// Signin
// Signal server
string url_;
string user_id_;
string user_password_;

View File

@ -33,71 +33,71 @@ void test_normal() {
std::string server = PeerConnect::CreateRandomUuid();
std::string client = PeerConnect::CreateRandomUuid();
PeerConnect pc1;
PeerConnect pc2;
PeerConnect pc1(server);
PeerConnect pc2(client);
pc1.On("signin", function_pc(PeerConnect* pc, string id) {
assert(id == server);
std::cout << "pc1: signedin" << std::endl;
pc2.SignIn(client);
pc1.On("open", function_pc( string peer ) {
assert(peer == server);
std::cout << "pc1: open" << std::endl;
pc2.Open();
});
pc1.On("connect", function_pc(PeerConnect* pc, string id) {
assert(id == client);
std::cout << "pc1: pc2(" << id << ") connected" << std::endl;
pc1.On("connect", function_pc( string peer ) {
assert(peer == client);
std::cout << "pc1: pc2(" << peer << ") connected" << std::endl;
});
pc1.On("disconnect", function_pc(PeerConnect* pc, string id) {
assert(id == client);
std::cout << "pc1: pc2 disconnected" << std::endl;
pc1.On("close", function_pc( string peer, CloseCode code, string desc ) {
assert(peer == client || peer == server);
if ( peer == client ) {
std::cout << "pc1: pc2 disconnected" << std::endl;
}
else if ( peer == server ) {
std::cout << "pc1: close out" << std::endl;
pc2.Close();
}
});
pc1.On("signout", function_pc(PeerConnect* pc, string id) {
assert(id == server);
std::cout << "pc1: signed out" << std::endl;
pc2.SignOut();
});
pc1.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) {
pc1.On("message", function_pc( string peer, PeerConnect::Buffer& data ) {
assert(std::string(data.buf_, data.size_) == "Ping");
assert(id == client);
assert(peer == client);
std::cout << "pc1: a message has been received" << std::endl;
pc->Send(client, "Pong");
pc1.Send(client, "Pong");
});
pc2.On("signin", function_pc(PeerConnect* pc, string id) {
assert(id == client);
std::cout << "pc2: signedin" << std::endl;
pc->Connect(server);
pc2.On("open", function_pc( string peer ) {
assert(peer == client);
std::cout << "pc2: open" << std::endl;
pc2.Connect(server);
});
pc2.On("connect", function_pc(PeerConnect* pc, string id) {
assert(id == server);
std::cout << "pc2: pc1(" << id << ") connected" << std::endl;
pc->Send(server, "Ping");
pc2.On("connect", function_pc( string peer ) {
assert(peer == server);
std::cout << "pc2: pc1(" << peer << ") connected" << std::endl;
pc2.Send(server, "Ping");
});
pc2.On("disconnect", function_pc(PeerConnect* pc, string id) {
assert(id == server);
std::cout << "pc2: pc1 disconnected" << std::endl;
pc1.SignOut();
pc2.On("close", function_pc( string peer, CloseCode code, string desc ) {
assert( peer == server || peer == client );
if ( peer == server ) {
std::cout << "pc2: pc1 disconnected" << std::endl;
pc1.Close();
}
else if ( peer == client ) {
std::cout << "pc2: close out" << std::endl;
PeerConnect::Stop();
}
});
pc2.On("signout", function_pc(PeerConnect* pc, string id){
assert(id == client);
std::cout << "pc2: signed out" << std::endl;
PeerConnect::Stop();
});
pc2.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) {
pc2.On("message", function_pc( string peer, PeerConnect::Buffer& data ) {
assert(std::string(data.buf_, data.size_) == "Pong");
assert(id == server);
assert(peer == server);
std::cout << "pc2 has received message" << std::endl;
pc->Disconnect(server);
pc2.Close(server);
});
pc1.SignIn(server);
pc1.Open();
PeerConnect::Run();
}
@ -107,81 +107,83 @@ void test_writable() {
std::string server = PeerConnect::CreateRandomUuid();
std::string client = PeerConnect::CreateRandomUuid();
PeerConnect pc1;
PeerConnect pc2;
PeerConnect pc1(server);
PeerConnect pc2(client);
pc1.On("signin", function_pc(PeerConnect* pc, string id) {
assert(id == server);
std::cout << "pc1: signedin" << std::endl;
pc2.SignIn(client);
pc1.On("open", function_pc( string peer ) {
assert(peer == server);
std::cout << "pc1: open" << std::endl;
pc2.Open();
});
pc1.On("connect", function_pc(PeerConnect* pc, string id) {
assert(id == client);
std::cout << "pc1: pc2(" << id << ") connected" << std::endl;
pc1.On("connect", function_pc( string peer ) {
assert(peer == client);
std::cout << "pc1: pc2(" << peer << ") connected" << std::endl;
});
pc1.On("disconnect", function_pc(PeerConnect* pc, string id) {
assert(id == client);
std::cout << "pc1: pc2 disconnected" << std::endl;
pc1.On("close", function_pc( string peer, CloseCode code, string desc ) {
assert( peer == client || peer == server );
if ( peer == client ) {
std::cout << "pc1: pc2 disconnected" << std::endl;
}
else if ( peer == server ) {
std::cout << "pc1: close" << std::endl;
pc2.Close();
}
});
pc1.On("writable", function_pc(PeerConnect* pc, string id){
assert(id == server);
pc1.On("writable", function_pc( string peer ){
assert(peer == server);
std::cout << "pc1: writable" << std::endl;
});
pc1.On("signout", function_pc(PeerConnect* pc, string id) {
assert(id == server);
std::cout << "pc1: signed out" << std::endl;
pc2.SignOut();
});
pc1.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) {
pc1.On("message", function_pc( string peer, PeerConnect::Buffer& data ) {
assert(std::string(data.buf_, data.size_) == "Ping");
assert(id == client);
assert(peer == client);
std::cout << "pc1: a message has been received" << std::endl;
pc->Send(client, "Pong");
pc1.Send(client, "Pong");
});
pc2.On("signin", function_pc(PeerConnect* pc, string id) {
assert(id == client);
std::cout << "pc2: signedin" << std::endl;
pc->Connect(server);
pc2.On("open", function_pc( string peer ) {
assert(peer == client);
std::cout << "pc2: open" << std::endl;
pc2.Connect(server);
});
pc2.On("connect", function_pc(PeerConnect* pc, string id) {
assert(id == server);
std::cout << "pc2: pc1(" << id << ") connected" << std::endl;
pc2.On("connect", function_pc( string peer ) {
assert( peer == server );
std::cout << "pc2: pc1(" << peer << ") connected" << std::endl;
});
pc2.On("disconnect", function_pc(PeerConnect* pc, string id) {
assert(id == server);
std::cout << "pc2: pc1 disconnected" << std::endl;
pc1.SignOut();
pc2.On("close", function_pc( string peer, CloseCode code, string desc ) {
assert( peer == server || peer == client);
if ( peer == server ) {
std::cout << "pc2: pc1 disconnected" << std::endl;
pc1.Close();
}
else if ( peer == client ) {
std::cout << "pc2: close" << std::endl;
PeerConnect::Stop();
}
});
pc2.On("writable", function_pc(PeerConnect* pc, string id){
assert(id == client);
pc2.On("writable", function_pc( string peer ){
assert(peer == client);
std::cout << "pc2: writable" << std::endl;
pc->Send(server, "Ping");
pc2.Send(server, "Ping");
});
pc2.On("signout", function_pc(PeerConnect* pc, string id){
assert(id == client);
std::cout << "pc2: signed out" << std::endl;
PeerConnect::Stop();
});
pc2.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) {
pc2.On("message", function_pc( string peer, PeerConnect::Buffer& data ) {
assert(std::string(data.buf_, data.size_) == "Pong");
assert(id == server);
assert(peer == server);
std::cout << "pc2 has received message" << std::endl;
pc->Disconnect(server);
pc2.Close(server);
});
pc1.SignIn(server);
pc1.Open();
PeerConnect::Run();
}
}