diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d825a7..55cfea1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -82,6 +82,7 @@ find_package(Asio) # ============================================================================ set(HEADERS "src/peerconnect.h" + "src/common.h" "src/control.h" "src/controlobserver.h" "src/peer.h" diff --git a/README.md b/README.md index 70b0ab9..875c4ac 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # PeerConnect -A PeerConnect is a peer to peer socket library. +A PeerConnect is a peer-to-peer library for network. -- Network connection by random id or email address +- Network connection by random id - No ip address and port number - Support NAT traversal and WebRTC @@ -10,23 +10,23 @@ A PeerConnect is a peer to peer socket library. ### How to use Peer A (listen) ``` -PeerConnect pc; -pc.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) { +PeerConnect pc1("PEER_A"); +pc1.On("message", function_pc(string peer, PeerConnect::Buffer& data) { std::cout << "A message has been received." << std::endl; }); -pc.SignIn("PEER_A"); +pc1.Open(); ``` Peer B (connect) ``` -PeerConnect pc; -pc.On("signin", function_pc(PeerConnect* pc, string id) { - pc->Connect("PEER_A"); +PeerConnect pc2("PEER_B"); +pc2.On("open", function_pc(string peer) { + pc2.>Connect("PEER_A"); }); -pc.On("connect", function_pc(PeerConnect* pc, string id) { - pc->Send("PEER_A", "Hello"); +pc2.On("connect", function_pc(string peer) { + pc2.Send("PEER_A", "Hello"); }); -pc.SignIn("PEER_B"); +pc2.Open(); ``` ### How it works diff --git a/examples/echo_client/main.cc b/examples/echo_client/main.cc index bdee88a..b9013a1 100644 --- a/examples/echo_client/main.cc +++ b/examples/echo_client/main.cc @@ -22,30 +22,31 @@ int main(int argc, char *argv[]) { return 1; } - string name = argv[1]; + string server = argv[1]; PeerConnect pc; - pc.On("signin", function_pc(PeerConnect* pc, string id) { - pc->Connect(name); + pc.On("open", function_pc(string channel) { + pc.Connect(server); }); - pc.On("connect", function_pc(PeerConnect* pc, string id) { - pc->Send(id, "Hello world"); - std::cout << "Sent 'Hello world' message to " << id << "." << std::endl; + pc.On("connect", function_pc(string channel) { + pc.Send(channel, "Hello world"); + std::cout << "Sent 'Hello world' message to " << channel << "." << std::endl; }); - pc.On("disconnect", function_pc(PeerConnect* pc, string id) { - std::cout << "Peer " << id << " has been disconnected" << std::endl; + pc.On("close", function_pc(string channel, CloseCode code, string desc) { + std::cout << "Peer " << channel << " has been closed" << std::endl; PeerConnect::Stop(); }); - pc.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) { + pc.On("message", function_pc(string channel, PeerConnect::Buffer& data) { std::cout << "Message '" << std::string(data.buf_, data.size_) << "' has been received." << std::endl; + pc.Close(); }); - pc.SignIn(); + pc.Open(); PeerConnect::Run(); return 0; diff --git a/examples/echo_server/main.cc b/examples/echo_server/main.cc index 58c0954..16ec272 100644 --- a/examples/echo_server/main.cc +++ b/examples/echo_server/main.cc @@ -19,25 +19,25 @@ int main(int argc, char *argv[]) { usage(argv[0]); return 1; } - string name = argv[1]; + string server = argv[1]; - PeerConnect pc; + PeerConnect pc(server); - pc.On("connect", function_pc(PeerConnect* pc, string id) { - std::cout << "Peer " << id << " has been connected." << std::endl; + pc.On("connect", function_pc(string peer) { + std::cout << "Peer " << peer << " has been connected." << std::endl; }); - pc.On("disconnect", function_pc(PeerConnect* pc, string id) { - std::cout << "Peer " << id << " has been disconnected." << std::endl; + pc.On("close", function_pc(string peer, CloseCode code, string desc) { + std::cout << "Peer " << peer << " has been closed." << std::endl; }); - pc.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) { + pc.On("message", function_pc(string peer, PeerConnect::Buffer& data) { std::cout << "Message " << std::string(data.buf_, data.size_) << " has been received." << std::endl; - pc->Send(id, data.buf_, data.size_); + pc.Send(peer, data.buf_, data.size_); }); - pc.SignIn(name); + pc.Open(); PeerConnect::Run(); return 0; diff --git a/examples/p2p_netcat/main.cc b/examples/p2p_netcat/main.cc index 2e2ef50..ad67057 100644 --- a/examples/p2p_netcat/main.cc +++ b/examples/p2p_netcat/main.cc @@ -27,9 +27,9 @@ using namespace std; using namespace pc; -bool parse_args(int argc, char* argv[], std::string& alias, std::string& connect_to, bool& server_mode); +bool parse_args(int argc, char* argv[], string& local_channel, string& remote_channel, bool& server_mode); void usage(const char* prg); -void read_stdin(PeerConnect* pc, std::string id); +void read_stdin(PeerConnect* pc, std::string channel); bool write_stdout(const char* buf, int len); void set_mode(PeerConnect* pc); void ctrlc_handler(int s); @@ -39,15 +39,15 @@ static PeerConnect *pc_; int main(int argc, char *argv[]) { - string connec_to; - string alias; + string local_channel; + string remote_channel; bool server_mode; // // Parse arguments // - if (!parse_args(argc, argv, alias, connec_to, server_mode)) { + if (!parse_args(argc, argv, local_channel, remote_channel, server_mode)) { usage(argv[0]); return 1; } @@ -56,43 +56,42 @@ int main(int argc, char *argv[]) { // Set event handlers // - PeerConnect pc; + PeerConnect pc(local_channel); + set_mode(&pc); - pc.On("signin", function_pc(PeerConnect* pc, string id) { + pc.On("open", function_pc( string peer ) { if (server_mode) { - std::cerr << "Listening " << id << std::endl; + std::cerr << "Listening " << peer << std::endl; } else { - std::cerr << "Connecting to " << connec_to << std::endl; - pc->Connect(connec_to); + std::cerr << "Connecting to " << remote_channel << std::endl; + pc.Connect(remote_channel); } }); - pc.On("connect", function_pc(PeerConnect* pc, string id) { + pc.On("connect", function_pc( string peer ) { std::cerr << "Connected" << std::endl; - std::thread(read_stdin, pc, id).detach(); + std::thread(read_stdin, &pc, peer).detach(); }); - pc.On("disconnect", function_pc(PeerConnect* pc, string id) { - if (server_mode) - std::cerr << "Disconnected" << std::endl; - else - pc->SignOut(); + pc.On("close", function_pc( string peer, CloseCode code, string desc ) { + + if ( peer == local_channel ) { + PeerConnect::Stop(); + } + else { + pc.Close(); + } + + if ( !desc.empty() ) { + std::cerr << desc << std::endl; + } }); - pc.On("signout", function_pc(PeerConnect* pc, string id) { - PeerConnect::Stop(); - }); - - pc.On("error", function_pc(PeerConnect* pc, string id){ - std::cerr << pc->GetErrorMessage() << std::endl; - PeerConnect::Stop(); - }); - - pc.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) { + pc.On("message", function_pc(string peer, PeerConnect::Buffer& data) { if (!write_stdout(data.buf_, data.size_)) { - pc->Disconnect(id); + pc.Close(peer); } }); @@ -100,7 +99,7 @@ int main(int argc, char *argv[]) { // Sign in as anonymous user // - pc.SignIn(alias, "anonymous", "nopassword"); + pc.Open(); PeerConnect::Run(); return 0; @@ -120,7 +119,7 @@ int main(int argc, char *argv[]) { #define STDERR_FILENO 2 #endif -void read_stdin(PeerConnect* pc, std::string id) +void read_stdin(PeerConnect* pc, std::string channel) { int nbytes; char buf[32*1024]; @@ -128,11 +127,11 @@ void read_stdin(PeerConnect* pc, std::string id) for (;;) { nbytes = read(STDIN_FILENO, buf, sizeof(buf)); if (nbytes <= 0) { - pc->Disconnect(id); + pc->Close( channel ); return; } - if (!pc->SyncSend(id, buf, nbytes)) { + if (!pc->Send(channel, buf, nbytes, WAITING_ON)) { return; } } @@ -157,7 +156,7 @@ bool write_stdout(const char* buf, int len) void ctrlc_handler(int s) { std::cerr << "Terminating..." << std::endl; - pc_->SignOut(); + pc_->Close(); } void set_mode(PeerConnect* pc) @@ -172,14 +171,15 @@ void set_mode(PeerConnect* pc) #endif } -bool parse_args(int argc, char* argv[], std::string& alias, std::string& connect_to, bool& server_mode) { +bool parse_args(int argc, char* argv[], string& local_channel, string& remote_channel, bool& server_mode) { if (argc == 2) { - connect_to = argv[1]; + remote_channel = argv[1]; + local_channel = PeerConnect::CreateRandomUuid(); server_mode = false; return true; } else if (argc == 3 && std::string(argv[1]) == "-l") { - alias = argv[2]; + local_channel = argv[2]; server_mode = true; return true; } @@ -187,13 +187,13 @@ bool parse_args(int argc, char* argv[], std::string& alias, std::string& connect } void usage(const char* prg) { - std::cerr << "P2P netcat version 0.1 (http://github.com/peerconnect/peerconnect)" << std::endl << std::endl; + std::cerr << "P2P netcat demo (http://github.com/peersio/peerconnect)" << std::endl << std::endl; std::cerr << "Usage: " << prg << " [-l] name" << std::endl << std::endl; std::cerr << " Options:" << std::endl; std::cerr << " -l Listen mode, for inbound connections" << std::endl << std::endl; std::cerr << "Example: " << std::endl; - std::cerr << " > " << prg << " -l random_id : Listen randoom_id" << std::endl; - std::cerr << " > " << prg << " random_id : Connect to random_id" << std::endl; + std::cerr << " > " << prg << " -l my_channel : Listen my_channel" << std::endl; + std::cerr << " > " << prg << " my_channel : Connect to my_channel" << std::endl; } diff --git a/src/common.h b/src/common.h new file mode 100644 index 0000000..b0ede27 --- /dev/null +++ b/src/common.h @@ -0,0 +1,35 @@ +/* + * Copyright 2016 The PeerConnect Project Authors. All rights reserved. + * + * Ryan Lee + */ + + +#ifndef __PEERCONNECT_COMMON_H__ +#define __PEERCONNECT_COMMON_H__ + +namespace pc { + +#define function_pc [&] + +enum CloseCode { + // Success + CLOSE_NORMAL = 0, + + // Failure + CLOSE_GOING_AWAY, + CLOSE_ABNORMAL, + CLOSE_PROTOCOL_ERROR, + CLOSE_SIGNAL_ERROR +}; + + +const bool WAITING_OFF = false; +const bool WAITING_ON = true; + +const bool FORCE_QUEUING_OFF = false; +const bool FORCE_QUEUING_ON = true; + +} // namespace pc + +#endif // __PEERCONNECT_COMMON_H__ \ No newline at end of file diff --git a/src/control.cc b/src/control.cc index c8f8b6a..7c0f5cb 100644 --- a/src/control.cc +++ b/src/control.cc @@ -61,14 +61,14 @@ bool Control::InitializeControl() { webrtc::MediaConstraintsInterface* constraints = NULL; - if (!CreatePeerFactory(constraints)) { + if ( !CreatePeerFactory(constraints) ) { LOGP_F(LERROR) << "CreatePeerFactory failed"; DeleteControl(); return false; } webrtc_thread_ = rtc::Thread::Current(); - ASSERT( webrtc_thread_ != nullptr); + ASSERT( webrtc_thread_ != nullptr ); return true; } @@ -83,43 +83,32 @@ void Control::DeleteControl() { } -void Control::SignIn(const string& user_id, const string& user_password, const string& open_id) { +void Control::Open(const string& user_id, const string& user_password, const string& channel) { + // 1. Connect to signal server - // 2. Send signin command to signal server - // 3. Send createchannel command to signal server (channel name is id or alias) - // Other peers connect to this peer by channel name, that is id or alias - // 4. Generate 'signedin' event to PeerConnect + // 2. Send open command to signal server + // 3. Send createchannel command to signal server. + // A channel name is user-supplied string to listen or random string. + // Other peers connect to this peer by channel name. + // 4. Generate 'open' event to PeerConnect if (signal_.get() == NULL) { - LOGP_F( LERROR ) << "SignIn failed, no signal server"; + LOGP_F( LERROR ) << "Open failed, no signal server"; return; } - open_id_ = open_id; + channel_ = channel; user_id_ = user_id; - // Start by signing in - signal_->SignIn(user_id, user_password); + // Connect to signal server + signal_->Open(user_id, user_password); LOGP_F( INFO ) << "Done"; return; } -void Control::SignOut() { +void Control::Connect(const string channel) { - if (webrtc_thread_ != rtc::Thread::Current()) { - ControlMessageData *data = new ControlMessageData(0, ref_); - webrtc_thread_->Post(this, MSG_SIGNOUT, data); - return; - } - - signal_->SignOut(); - Close(); - - LOGP_F( INFO ) << "Done"; -} - -void Control::Connect(const string id) { // 1. Join channel on signal server // 2. Server(remote) peer createoffer // 3. Client(local) peer answeroffer @@ -130,48 +119,29 @@ void Control::Connect(const string id) { return; } - LOGP_F( INFO ) << "Joining channel " << id; - JoinChannel(id); + LOGP_F( INFO ) << "Joining channel " << channel; + JoinChannel(channel); } -void Control::Close(const string id, bool force_queuing) { +void Control::Close(const CloseCode code, bool force_queuing) { + LOGP_F( INFO ) << "Call"; + // - // Called by - // PeerConnect if user closes the connection. - // PeerControl if remote peer closes a ice connection or data channel + // Verify current thread // if (force_queuing || webrtc_thread_ != rtc::Thread::Current()) { - ControlMessageData *data = new ControlMessageData(id, ref_); - webrtc_thread_->Post(this, MSG_CLOSE_PEER, data); + ControlMessageData *data = new ControlMessageData(code, ref_); + webrtc_thread_->Post(this, MSG_CLOSE, data); + LOGP_F( INFO ) << "Queued"; return; } - // 1. Leave channel on signal server - // 2. Erase peer - // 3. Close peer + // + // Close peers + // - LeaveChannel(id); - - auto peer_found = peers_.find(id); - if ( peer_found == peers_.end() ) { - LOGP_F( WARNING ) << "peer not found, " << id; - return; - } - - Peer peer = peer_found->second; - peers_.erase( peer_found ); - peer->Close(); - - LOGP_F( INFO ) << "Done, id is " << id; -} - -void Control::Close( const string id ) { - Close( id, QUEUEING_ON ); -} - -void Control::Close() { std::vector peer_ids; for (auto peer : peers_) { @@ -182,8 +152,52 @@ void Control::Close() { for (auto id : peer_ids) { LOGP_F( INFO ) << "Try to close peer having id " << id; - Close(id, QUEUEING_ON); + ClosePeer(id, code); } + + // + // Close signal server + // + + if ( pc_ ) { + pc_->OnClose( channel_ ,code ); + } + + LOGP_F( INFO ) << "Done"; +} + +void Control::ClosePeer( const string channel, const CloseCode code, bool force_queuing ) { + + // + // Called by + // PeerConnect if user closes the connection. + // PeerControl if remote peer closes a ice connection or data channel + // + + if (force_queuing || webrtc_thread_ != rtc::Thread::Current()) { + ControlMessageData *data = new ControlMessageData(channel, ref_); + data->data_int32_ = code; + webrtc_thread_->Post(this, MSG_CLOSE_PEER, data); + return; + } + + // 1. Erase peer + // 2. Close peer + + auto peer_found = peers_.find(channel); + if ( peer_found == peers_.end() ) { + LOGP_F( WARNING ) << "peer not found, " << channel; + return; + } + + Peer peer = peer_found->second; + peers_.erase( peer_found ); + peer->Close(code); + + // 3. Leave channel on signal server + LeaveChannel(channel); + + LOGP_F( INFO ) << "Done, channel is " << channel; } // @@ -216,99 +230,73 @@ bool Control::SyncSend(const string to, const char* buffer, const size_t size) { // Send command to other peer by signal server // -void Control::SendCommand(const string& id, const string& command, const Json::Value& data) { - signal_->SendCommand(id, command, data); +void Control::SendCommand(const string& channel, const string& command, const Json::Value& data) { + signal_->SendCommand(channel, command, data); } -void Control::OnConnected(const string id) { - if ( observer_ == nullptr ) { - LOGP_F( WARNING ) << "observer_ is null, id is " << id; +void Control::OnPeerConnect(const string channel) { + if ( pc_ == nullptr ) { + LOGP_F( WARNING ) << "pc_ is null, channel is " << channel; return; } - observer_->OnPeerConnected(id); - LOGP_F( INFO ) << "Done, id is " << id; + pc_->OnConnect(channel); + LOGP_F( INFO ) << "Done, channel is " << channel; } -// -// Ice connection state has been changed to close. -// It means that peer data channel had been closed already. -// -// Implements PeerObserver::OnDisconnected() -// +void Control::OnPeerClose(const string channel, CloseCode code) { -void Control::OnClosed(const string id, const bool force_queuing) { - - if (force_queuing || webrtc_thread_ != rtc::Thread::Current()) { - ControlMessageData *data = new ControlMessageData(id, ref_); + if (webrtc_thread_ != rtc::Thread::Current()) { + ControlMessageData *data = new ControlMessageData(channel, ref_); // Call Control::OnPeerDisconnected() - webrtc_thread_->Post(this, MSG_ON_PEER_CLOSED, data); - LOGP_F( INFO ) << "Queued, id is " << id; + webrtc_thread_->Post(this, MSG_ON_PEER_CLOSE, data); + LOGP_F( INFO ) << "Queued, channel is " << channel; return; } - - if ( observer_ == nullptr ) { - LOGP_F( WARNING ) << "observer_ is null, id is " << id; + LOGP_F( INFO ) << "Enter, channel is " << channel; + + if ( pc_ == nullptr ) { + LOGP_F( WARNING ) << "pc_ is null, channel is " << channel; return; } - LOGP_F( INFO ) << "Calling OnPeerDisconnected, id is " << id; - observer_->OnPeerDisconnected(id); + pc_->OnClose( channel, code ); - if (peers_.size() == 0) { - LOGP_F( INFO ) << "peers_ has been empty. id is " << id; - OnSignedOut(open_id_); - } - - LOGP_F( INFO ) << "Done, id is " << id; + LOGP_F( INFO ) << "Done, channel is " << channel; } -void Control::OnClosed(const string id) { - OnClosed( id, QUEUEING_ON ); -} - - // // Signal receiving data // -void Control::OnMessage(const string& id, const char* buffer, const size_t size) { - if ( observer_ == nullptr ) { - LOGP_F( WARNING ) << "observer_ is null, id is " << id; +void Control::OnPeerMessage(const string& channel, const char* buffer, const size_t size) { + if ( pc_ == nullptr ) { + LOGP_F( WARNING ) << "pc_ is null, channel is " << channel; return; } - observer_->OnPeerMessage(id, buffer, size); + pc_->OnMessage(channel, buffer, size); } -void Control::OnWritable(const string& id) { - if ( observer_ == nullptr ) { - LOGP_F( WARNING ) << "observer_ is null, id is " << id; +void Control::OnPeerWritable(const string& channel) { + if ( pc_ == nullptr ) { + LOGP_F( WARNING ) << "pc_ is null, channel is " << channel; return; } - observer_->OnPeerWritable(id); + pc_->OnWritable(channel); } -void Control::OnError( const string id, const string& reason ) { - if ( observer_ == nullptr ) { - LOGP_F( WARNING ) << "observer_ is null, id is " << id; - return; - } - observer_->OnError( id, reason ); -} - - void Control::RegisterObserver(ControlObserver* observer, std::shared_ptr ref) { ref_ = ref; - observer_ = observer; + pc_ = observer; LOGP_F( INFO ) << "Registered"; } void Control::UnregisterObserver() { - observer_ = nullptr; + pc_ = nullptr; ref_.reset(); LOGP_F( INFO ) << "Unregistered"; @@ -325,23 +313,24 @@ void Control::OnMessage(rtc::Message* msg) { param = static_cast(msg->pdata); OnCommandReceived(param->data_json_); break; + case MSG_CLOSE: + param = static_cast(msg->pdata); + Close((CloseCode)param->data_int32_); + break; case MSG_CLOSE_PEER: param = static_cast(msg->pdata); - Close(param->data_string_, QUEUEING_OFF); + ClosePeer(param->data_string_, (CloseCode) param->data_int32_); break; - case MSG_ON_PEER_CLOSED: + case MSG_ON_PEER_CLOSE: param = static_cast(msg->pdata); - OnClosed(param->data_string_, QUEUEING_OFF); + OnPeerClose(param->data_string_, (CloseCode) param->data_int32_); break; - case MSG_SIGNOUT: + case MSG_ON_SIGLAL_CONNECTION_CLOSE: param = static_cast(msg->pdata); - SignOut(); - break; - case MSG_SIGNAL_SERVER_CLOSED: - param = static_cast(msg->pdata); - OnSignedOut(param->data_string_); + Close((CloseCode)param->data_int32_); break; default: + LOGP_F( WARNING ) << "Unknown message"; break; } @@ -370,17 +359,20 @@ void Control::OnCommandReceived(const Json::Value& message) { peer_id.clear(); } - if (command == "signin") { - OnSignedIn(data); + if (command == "open") { + OnOpen(data); } - else if (command == "channelcreated") { - OnChannelCreated(data); + else if (command == "channelcreate") { + OnChannelCreate(data); } - else if (command == "channeljoined") { - OnChannelJoined(data); + else if (command == "channeljoin") { + OnChannelJoin(data); } - else if (command == "channelleaved") { - OnChannelLeaved(data); + else if (command == "channelleave") { + OnChannelLeave(data); + } + else if (command == "peerclosed") { + OnRemotePeerClose(peer_id, data); } else if (command == "createoffer") { CreateOffer(data); @@ -403,35 +395,14 @@ void Control::OnSignalCommandReceived(const Json::Value& message) { } void Control::OnSignalConnectionClosed(websocketpp::close::status::value code) { - LOGP_F(INFO) << "Calling OnSignalConnectionClosed() with " << code; - if (code == websocketpp::close::status::normal) { - ControlMessageData *data = new ControlMessageData(open_id_, ref_); - webrtc_thread_->Post(this, MSG_SIGNAL_SERVER_CLOSED, data); + LOGP_F(INFO) << "Enter, code is " << code; + if (code != websocketpp::close::status::normal) { + ControlMessageData *data = new ControlMessageData(CLOSE_SIGNAL_ERROR, ref_); + webrtc_thread_->Post(this, MSG_ON_SIGLAL_CONNECTION_CLOSE, data); } LOGP_F( INFO ) << "Done"; } -void Control::OnSignedOut(const string& id) { - LOGP_F( INFO ) << "Calling OnSignedOut() with " << id; - - if ( signal_ == nullptr || signal_->opened() ) { - LOGP_F( WARNING ) << "signal_ is null or not opened"; - return; - } - - if ( peers_.size() != 0 ) { - LOGP_F( WARNING ) << "peers_ is empty"; - return; - } - - if ( observer_ != nullptr ) { - observer_->OnSignedOut( id ); - } - - LOGP_F( INFO ) << "Done"; -} - - // // Commands to signal server // @@ -526,24 +497,24 @@ void Control::AddIceCandidate(const string& peer_id, const Json::Value& data) { // -// 'signin' command +// 'open' command // -void Control::OnSignedIn(const Json::Value& data) { +void Control::OnOpen(const Json::Value& data) { bool result; if (!rtc::GetBoolFromJsonObject(data, "result", &result)) { - LOGP_F(WARNING) << "Unknown signin response"; + LOGP_F(WARNING) << "Unknown open response"; return; } if (!result) { - LOGP_F(LERROR) << "Signin failed"; + LOGP_F(LERROR) << "Open failed"; return; } string session_id; if (!rtc::GetStringFromJsonObject(data, "session_id", &session_id)) { - LOGP_F(LERROR) << "Signin failed - no session_id"; + LOGP_F(LERROR) << "Open failed - no session_id"; return; } @@ -553,61 +524,67 @@ void Control::OnSignedIn(const Json::Value& data) { // Create channel // - CreateChannel(open_id_); + CreateChannel(channel_); LOGP_F( INFO ) << "Done"; } -void Control::OnChannelCreated(const Json::Value& data) { +void Control::OnChannelCreate(const Json::Value& data) { bool result; if (!rtc::GetBoolFromJsonObject(data, "result", &result)) { - LOGP_F(WARNING) << "Unknown signin response"; + LOGP_F(WARNING) << "Unknown open response"; + pc_->OnClose(channel_, CLOSE_SIGNAL_ERROR); return; } string channel; if (!rtc::GetStringFromJsonObject(data, "name", &channel)) { + pc_->OnClose(channel_, CLOSE_SIGNAL_ERROR); LOGP_F(LERROR) << "Create channel failed - no channel name"; return; } if (!result) { LOGP_F(LERROR) << "Create channel failed"; - string reason; - if (!rtc::GetStringFromJsonObject(data, "reason", &reason)) { - reason = "Unknown reason"; + string desc; + if (!rtc::GetStringFromJsonObject(data, "desc", &desc)) { + desc = "Unknown reason"; } - observer_->OnError(channel, reason); + + pc_->OnClose(channel, CLOSE_SIGNAL_ERROR, desc); return; } - observer_->OnSignedIn(channel); + pc_->OnOpen(channel); LOGP_F( INFO ) << "Done"; } -void Control::OnChannelJoined(const Json::Value& data) { +void Control::OnChannelJoin(const Json::Value& data) { bool result; LOGP_F(INFO) << "OnChannelJoined(" << data.toStyledString() << ")"; if (!rtc::GetBoolFromJsonObject(data, "result", &result)) { + pc_->OnClose( "", CLOSE_SIGNAL_ERROR ); LOGP_F(LERROR) << "Unknown channel join response"; return; } string channel; if (!rtc::GetStringFromJsonObject(data, "name", &channel)) { + pc_->OnClose( "", CLOSE_SIGNAL_ERROR ); LOGP_F(LERROR) << "Join channel failed - no channel name"; return; } if (!result) { LOGP_F(LERROR) << "Join channel failed"; - string reason; - if (!rtc::GetStringFromJsonObject(data, "reason", &reason)) { - reason = "Unknown reason"; + string desc; + if (!rtc::GetStringFromJsonObject(data, "desc", &desc)) { + desc = "Unknown reason"; } - observer_->OnError(channel, reason); + + pc_->OnClose( channel, CLOSE_SIGNAL_ERROR, desc ); return; } @@ -619,11 +596,15 @@ void Control::OnChannelJoined(const Json::Value& data) { // 'leave' command // -void Control::OnChannelLeaved(const Json::Value& data) { - // Nothing +void Control::OnChannelLeave(const Json::Value& data) { + // Do nothing } +void Control::OnRemotePeerClose(const string& peer_id, const Json::Value& data) { + ClosePeer( peer_id, CLOSE_NORMAL ); +} + // // 'createoffer' command // @@ -643,10 +624,10 @@ void Control::CreateOffer(const Json::Value& data) { return; } - Peer peer = new rtc::RefCountedObject(open_id_, remote_id, this, peer_connection_factory_); + Peer peer = new rtc::RefCountedObject(channel_, remote_id, this, peer_connection_factory_); if ( !peer->Initialize() ) { LOGP_F( LERROR ) << "Peer initialization failed"; - OnClosed( remote_id ); + OnPeerClose( remote_id, CLOSE_ABNORMAL ); return; } @@ -670,10 +651,10 @@ void Control::ReceiveOfferSdp(const string& peer_id, const Json::Value& data) { return; } - Peer peer = new rtc::RefCountedObject(open_id_, peer_id, this, peer_connection_factory_); + Peer peer = new rtc::RefCountedObject(channel_, peer_id, this, peer_connection_factory_); if ( !peer->Initialize() ) { LOGP_F( LERROR ) << "Peer initialization failed"; - OnClosed( peer_id ); + OnPeerClose( peer_id, CLOSE_ABNORMAL ); return; } diff --git a/src/control.h b/src/control.h index 353e8c1..c94c580 100644 --- a/src/control.h +++ b/src/control.h @@ -46,28 +46,25 @@ public: void Send(const string to, const char* buffer, const size_t size); bool SyncSend(const string to, const char* buffer, const size_t size); - void SignIn(const string& user_id, const string& user_password, const string& open_id); - void SignOut(); - void Connect(const string id); - void Close(); - bool IsWritable(const string id); + void Open(const string& user_id, const string& user_password, const string& channel); + void Close(const CloseCode code, bool force_queueing = FORCE_QUEUING_OFF); + void Connect(const string channel); + bool IsWritable(const string channel); void OnCommandReceived(const Json::Value& message); void OnSignalCommandReceived(const Json::Value& message); void OnSignalConnectionClosed(websocketpp::close::status::value code); - void OnSignedOut(const string& id); // // PeerObserver implementation // - virtual void SendCommand(const string& id, const string& command, const Json::Value& data); - virtual void Close(const string id); - virtual void OnConnected(const string id); - virtual void OnClosed(const string id); - virtual void OnMessage(const string& id, const char* buffer, const size_t size); - virtual void OnWritable(const string& id); - virtual void OnError( const string id, const string& reason ); + virtual void SendCommand(const string& channel, const string& command, const Json::Value& data); + virtual void ClosePeer( const string channel, const CloseCode code, bool force_queueing = FORCE_QUEUING_OFF ); + virtual void OnPeerConnect(const string channel); + virtual void OnPeerClose(const string channel, const CloseCode code); + virtual void OnPeerMessage(const string& channel, const char* buffer, const size_t size); + virtual void OnPeerWritable(const string& channel); // Register/Unregister observer @@ -87,19 +84,17 @@ protected: void ReceiveOfferSdp(const string& peer_id, const Json::Value& data); void ReceiveAnswerSdp(const string& peer_id, const Json::Value& data); - void OnSignedIn(const Json::Value& data); - void OnChannelCreated(const Json::Value& data); - void OnChannelJoined(const Json::Value& data); - void OnChannelLeaved(const Json::Value& data); - - void Close( const string id, const bool force_queuing); - void OnClosed(const string id, const bool force_queuing); + void OnOpen(const Json::Value& data); + void OnChannelCreate(const Json::Value& data); + void OnChannelJoin(const Json::Value& data); + void OnChannelLeave(const Json::Value& data); + void OnRemotePeerClose(const string& peer_id, const Json::Value& data); - // open_id_: other peers can find this peer by open_id_ and it is user_id or alias + // channel_: A name of local channel. Other peers can find this peer by channel_ // user_id_: A user id to sign in signal server (could be 'anonymous' for guest user) // session_id_: A unique id for signal server connection - string open_id_; + string channel_; string user_id_; string session_id_; @@ -114,16 +109,12 @@ protected: private: - const bool QUEUEING_ON = true; - const bool QUEUEING_OFF = false; - - enum { MSG_COMMAND_RECEIVED, // Command has been received from signal server + MSG_CLOSE, // Queue signout request MSG_CLOSE_PEER, // Close peer - MSG_ON_PEER_CLOSED, // Peer has been closed - MSG_SIGNOUT, // Queue signout request - MSG_SIGNAL_SERVER_CLOSED // Connection to signal server has been closed + MSG_ON_PEER_CLOSE, // Peer has been closed + MSG_ON_SIGLAL_CONNECTION_CLOSE // Connection to signal server has been closed }; struct ControlMessageData : public rtc::MessageData { @@ -139,7 +130,7 @@ private: }; rtc::Thread* webrtc_thread_; - ControlObserver* observer_; + ControlObserver* pc_; std::shared_ptr ref_; }; diff --git a/src/controlobserver.h b/src/controlobserver.h index 7f7d22c..29ed701 100644 --- a/src/controlobserver.h +++ b/src/controlobserver.h @@ -7,17 +7,17 @@ #ifndef __PEERCONNECT_CONTROLOBSERVER_H__ #define __PEERCONNECT_CONTROLOBSERVER_H__ +#include "common.h" + namespace pc { class ControlObserver { public: - virtual void OnSignedIn(const std::string id) = 0; - virtual void OnSignedOut(const std::string id) = 0; - virtual void OnPeerConnected(const std::string id) = 0; - virtual void OnPeerDisconnected(const std::string id) = 0; - virtual void OnPeerMessage(const std::string id, const char* buffer, const size_t size) = 0; - virtual void OnPeerWritable(const std::string id) = 0; - virtual void OnError(const std::string id, const std::string& reason) = 0; + virtual void OnOpen(const std::string channel) = 0; + virtual void OnClose(const std::string channel, const pc::CloseCode code, const std::string desc = "") = 0; + virtual void OnConnect(const std::string channel) = 0; + virtual void OnMessage(const std::string channel, const char* buffer, const size_t size) = 0; + virtual void OnWritable(const std::string channel) = 0; }; } // namespace pc diff --git a/src/peer.cc b/src/peer.cc index 77f87b7..ce25c23 100644 --- a/src/peer.cc +++ b/src/peer.cc @@ -89,7 +89,7 @@ bool PeerControl::IsWritable() { return local_data_channel_->IsWritable(); } -void PeerControl::Close() { +void PeerControl::Close(const CloseCode code) { LOGP_F_IF(state_ != pOpen, WARNING) << "Closing peer when it is not opened"; if ( state_ == pClosing || state_ == pClosed ) { @@ -102,12 +102,20 @@ void PeerControl::Close() { LOGP_F( INFO ) << "Close data-channel of remote_id_ " << remote_id_; if ( peer_connection_ ) { - peer_connection_->Close(); - } - else { - LOGP_F( WARNING ) << "peer_connection_ is nullptr "; - state_ = pClosed; + + peer_connection_ = nullptr; + + // As entering here, we can make sure that + // - PeerDataChannelObserver::OnStateChange() had been called with kClosed + // - PeerControl::OnIceConnectionChange() will be ignored, + // ether kIceConnectionClosed and kIceConnectionDisconnected. + // That's because we didn't call peer_connection_->Close(). + } + + state_ = pClosed; + control_->OnPeerClose(remote_id_, code); + } @@ -158,6 +166,12 @@ void PeerControl::OnSignalingChange(webrtc::PeerConnectionInterface::SignalingSt } void PeerControl::OnIceConnectionChange(webrtc::PeerConnectionInterface::IceConnectionState new_state) { + + // + // Closing sequence + // kIceConnectionDisconnected -> kIceConnectionClosed + // + switch (new_state) { case webrtc::PeerConnectionInterface::IceConnectionState::kIceConnectionClosed: // @@ -165,7 +179,7 @@ void PeerControl::OnIceConnectionChange(webrtc::PeerConnectionInterface::IceConn // Notify it to Control so the Control will remove peer in peers_ // LOGP_F( INFO ) << "new_state is " << "kIceConnectionClosed"; - OnPeerClosed(); + OnPeerDisconnected(); break; case webrtc::PeerConnectionInterface::IceConnectionState::kIceConnectionDisconnected: @@ -218,6 +232,11 @@ void PeerControl::OnSuccess(webrtc::SessionDescriptionInterface* desc) { if (!desc->ToString(&sdp)) return; + if ( state_ != pConnecting ) { + LOGP_F( WARNING ) << "Invalid state"; + return; + } + // Set local description SetLocalDescription(desc->type(), sdp); @@ -248,22 +267,13 @@ void PeerControl::OnPeerOpened() { // Fianlly, data-channel has been opened. state_ = pOpen; - control_->OnConnected(remote_id_); - control_->OnWritable(local_id_); + control_->OnPeerConnect(remote_id_); + control_->OnPeerWritable(local_id_); } LOGP_F( INFO ) << "Done"; } -void PeerControl::OnPeerClosed() { - - ASSERT( state_ == pClosing ); - - state_ = pClosed; - control_->OnClosed(remote_id_); - -} - void PeerControl::OnPeerDisconnected() { if ( state_ == pClosed ) { @@ -275,13 +285,19 @@ void PeerControl::OnPeerDisconnected() { return; } - control_->Close( remote_id_ ); + // + // As entering here, we can make sure that the remote peer has been + // disconnected abnormally, because previous state_ is not pClosing. + // It will be in state pClosing if a user calls Close() manually + // + + control_->ClosePeer( remote_id_, CLOSE_GOING_AWAY ); } void PeerControl::OnPeerMessage(const webrtc::DataBuffer& buffer) { string data; - control_->OnMessage(remote_id_, buffer.data.data(), buffer.data.size()); + control_->OnPeerMessage(remote_id_, buffer.data.data(), buffer.data.size()); } void PeerControl::OnBufferedAmountChange(const uint64_t previous_amount) { @@ -289,7 +305,7 @@ void PeerControl::OnBufferedAmountChange(const uint64_t previous_amount) { LOGP_F( LERROR ) << "local_data_channel_ is not writable"; return; } - control_->OnWritable( remote_id_ ); + control_->OnPeerWritable( remote_id_ ); } diff --git a/src/peer.h b/src/peer.h index 4bb29d3..e9682ca 100644 --- a/src/peer.h +++ b/src/peer.h @@ -15,6 +15,7 @@ #include "webrtc/base/scoped_ref_ptr.h" #include "webrtc/api/jsep.h" #include "webrtc/base/json.h" +#include "common.h" namespace pc { @@ -24,14 +25,12 @@ namespace pc { class PeerObserver { public: - virtual void SendCommand(const std::string& id, const std::string& command, const Json::Value& data) = 0; - virtual void Close(const std::string id) = 0; - virtual void OnConnected(const std::string id) = 0; - virtual void OnClosed(const std::string id) = 0; - virtual void OnMessage(const std::string& id, const char* buffer, const size_t size) = 0; - virtual void OnWritable(const std::string& id) = 0; - virtual void OnError( const std::string id, const std::string& reason ) = 0; - + virtual void SendCommand(const std::string& channel, const std::string& command, const Json::Value& data) = 0; + virtual void ClosePeer(const std::string channel, const pc::CloseCode code, bool force_queuing = FORCE_QUEUING_OFF ) = 0; + virtual void OnPeerConnect(const std::string channel) = 0; + virtual void OnPeerClose(const std::string channel, const pc::CloseCode code) = 0; + virtual void OnPeerMessage(const std::string& channel, const char* buffer, const size_t size) = 0; + virtual void OnPeerWritable(const std::string& channel) = 0; }; class PeerDataChannelObserver; @@ -77,7 +76,7 @@ public: bool Send(const char* buffer, const size_t size); bool SyncSend(const char* buffer, const size_t size); bool IsWritable(); - void Close(); + void Close(const CloseCode code); // // PeerConnection @@ -116,7 +115,6 @@ public: // void OnPeerOpened(); - void OnPeerClosed(); void OnPeerDisconnected(); void OnPeerMessage(const webrtc::DataBuffer& buffer); void OnBufferedAmountChange(const uint64_t previous_amount); diff --git a/src/peerconnect.cc b/src/peerconnect.cc index acfa54b..4cb4596 100644 --- a/src/peerconnect.cc +++ b/src/peerconnect.cc @@ -13,32 +13,27 @@ namespace pc { - -PeerConnect::PeerConnect() - : PeerConnect( "" ) { -} - -PeerConnect::PeerConnect( const string options ) { +PeerConnect::PeerConnect( const string channel ) { // Log level #if DEBUG || _DEBUG rtc::LogMessage::LogToDebug( rtc::LS_NONE ); - pc::LogMessage::LogToDebug( pc::LS_VERBOSE ); + pc::LogMessage::LogToDebug( pc::WARNING ); #else rtc::LogMessage::LogToDebug( rtc::LS_NONE ); pc::LogMessage::LogToDebug( pc::LS_NONE ); #endif - // parse settings - if ( !options.empty() ) { - ParseOptions( options ); + string local_channel; + + if ( channel.empty() ) { + local_channel = rtc::CreateRandomUuid(); + } + else { + local_channel = channel; } - signout_ = false; - - // create signal client - if ( signal_ == nullptr ) { - signal_ = std::make_shared( setting_.signal_uri_ ); - } + channel_ = local_channel; + close_once_ = false; LOGP_F( INFO ) << "Done"; } @@ -58,17 +53,21 @@ void PeerConnect::Stop() { } -void PeerConnect::SignIn( const string alias, const string id, const string password ) { - - // - // Check if already signed in - // +void PeerConnect::Open() { if ( control_.get() != nullptr ) { - LOGP_F( WARNING ) << "Already signined in."; + LOGP_F( WARNING ) << "Already open."; return; } + // + // create signal client + // + + if ( signal_ == nullptr ) { + signal_ = std::make_shared( setting_.signal_uri_ ); + } + // // Initialize control // @@ -91,78 +90,76 @@ void PeerConnect::SignIn( const string alias, const string id, const string pass return; } - // - // Set user_id and open_id - // - - string user_id; - string open_id; - - user_id = tolower( id ); - if ( user_id == "anonymous" ) user_id = ""; - - open_id = tolower( alias ); - if ( open_id.empty() ) open_id = tolower( rtc::CreateRandomUuid() ); - // // Connect to signal server // - control_->SignIn( user_id, password, open_id ); + control_->Open( setting_.signal_id_, setting_.signal_password_, channel_ ); LOGP_F( INFO ) << "Done"; return; } -void PeerConnect::SignOut() { - signout_ = true; - control_->SignOut(); +void PeerConnect::Close( const string channel ) { + + if ( channel.empty() || channel == channel_ ) { + control_->Close( CLOSE_NORMAL, FORCE_QUEUING_ON ); + signal_->SyncClose(); + } + else { + control_->ClosePeer( channel, CLOSE_NORMAL, FORCE_QUEUING_ON ); + } LOGP_F( INFO ) << "Done"; } -void PeerConnect::Connect( const string id ) { - control_->Connect( id ); - LOGP_F( INFO ) << "Done, id is " << id; +void PeerConnect::Connect( const string channel ) { + control_->Connect( channel ); + LOGP_F( INFO ) << "Done, channel is " << channel; return; } -void PeerConnect::Disconnect( const string id ) { - control_->Close( id ); - LOGP_F( INFO ) << "Done, id is " << id; - return; -} - - // // Send message to destination peer session id // -void PeerConnect::Send( const string& id, const char* buffer, const size_t size ) { - control_->Send( id, buffer, size ); +bool PeerConnect::Send( const string& channel, const char* buffer, const size_t size, const bool wait ) { + if ( wait ) { + + // + // Synchronous send returns true or false + // and a timeout is 60*1000 ms by default. + // + + return control_->SyncSend( channel, buffer, size ); + } + else { + control_->Send( channel, buffer, size ); + + // + // Asyncronous send always returns true and + // trigger 'close' event with CloseCode if failed + // + + return true; + } } -void PeerConnect::Send( const string& id, const char* message ) { - Send( id, message, strlen( message ) ); +bool PeerConnect::Send( const string& channel, const char* message, const bool wait ) { + return Send( channel, message, strlen( message ), wait ); } -void PeerConnect::Send( const string& id, const string& message ) { - Send( id, message.c_str(), message.size() ); +bool PeerConnect::Send( const string& channel, const string& message, const bool wait ) { + return Send( channel, message.c_str(), message.size(), wait ); } -bool PeerConnect::SyncSend( const string& id, const char* buffer, const size_t size ) { - return control_->SyncSend( id, buffer, size ); +bool PeerConnect::SetOptions( const string options ) { + + // parse settings + if ( !options.empty() ) { + return ParseOptions( options ); + } + return true; } -bool PeerConnect::SyncSend( const string& id, const char* message ) { - return SyncSend( id, message, strlen( message ) ); -} - -bool PeerConnect::SyncSend( const string& id, const string& message ) { - return SyncSend( id, message.c_str(), message.size() ); -} - - - - std::string PeerConnect::CreateRandomUuid() { return rtc::CreateRandomUuid(); @@ -171,24 +168,58 @@ std::string PeerConnect::CreateRandomUuid() { // // Register Event handler // -PeerConnect& PeerConnect::On( string event_id, std::function handler ) { +PeerConnect& PeerConnect::On( string event_id, std::function handler ) { if ( event_id.empty() ) return *this; - std::unique_ptr f( new EventHandler_2( handler ) ); - event_handler_.insert( Events::value_type( event_id, std::move( f ) ) ); - - LOGP_F( INFO ) << "An event handler '" << event_id << "' has been inserted"; + if ( event_id == "open" || event_id == "connect" || event_id == "writable" ) { + std::unique_ptr f( new EventHandler_2( handler ) ); + event_handler_.insert( Events::value_type( event_id, std::move( f ) ) ); + LOGP_F( INFO ) << "An event handler '" << event_id << "' has been inserted"; + } + else { + LOGP_F( LERROR ) << "Unsupported event type: " << event_id; + } return *this; } -// -// Register Message handler -// +PeerConnect& PeerConnect::On( string event_id, std::function handler ) { + + if ( event_id.empty() ) return *this; + + LOGP_F( LERROR ) << "Unsupported event type: " << event_id; + return *this; +} + +PeerConnect& PeerConnect::On( string event_id, std::function handler ) { + if ( event_id.empty() ) return *this; + + if ( event_id == "close" ) { + std::unique_ptr f( new EventHandler_Close( handler ) ); + event_handler_.insert( Events::value_type( event_id, std::move( f ) ) ); + + LOGP_F( INFO ) << "An event handler '" << event_id << "' has been inserted"; + } + else { + LOGP_F( LERROR ) << "Unsupported event type: " << event_id; + } + + return *this; +} + +PeerConnect& PeerConnect::On( string event_id, std::function handler ) { + if ( event_id.empty() ) return *this; + + if ( event_id == "message" ) { + std::unique_ptr f( new EventHandler_Message( handler ) ); + event_handler_.insert( Events::value_type( event_id, std::move( f ) ) ); + + LOGP_F( INFO ) << "An event handler '" << event_id << "' has been inserted"; + } + else { + LOGP_F( LERROR ) << "Unsupported event type: " << event_id; + } -PeerConnect& PeerConnect::OnMessage( std::function handler ) { - message_handler_ = handler; - LOGP_F( INFO ) << "A message handler has been inserted"; return *this; } @@ -196,75 +227,68 @@ PeerConnect& PeerConnect::OnMessage( std::functionUnregisterObserver(); + control_.reset(); + } + // Remote peer has been closed + else { + if ( event_handler_.find( "close" ) != event_handler_.end() ) { + CallEventHandler( "close", channel, code, desc ); + } } - if ( event_handler_.find( "signout" ) == event_handler_.end() ) { - return; - } - - CallEventHandler( "signout", this, id ); - - control_->UnregisterObserver(); - control_.reset(); - LOGP_F( INFO ) << "Done, id is " << id; + LOGP_F( INFO ) << "Done, channel is " << channel; } -void PeerConnect::OnPeerConnected( const string id ) { - if ( event_handler_.find( "connect" ) == event_handler_.end() ) { - return; +void PeerConnect::OnConnect( const string channel ) { + if ( event_handler_.find( "connect" ) != event_handler_.end() ) { + CallEventHandler( "connect", channel ); } - CallEventHandler( "connect", this, id ); - LOGP_F( INFO ) << "Done, id is " << id; + LOGP_F( INFO ) << "Done, channel is " << channel; } -void PeerConnect::OnPeerDisconnected( const string id ) { - if ( event_handler_.find( "disconnect" ) == event_handler_.end() ) { - return; - } - - CallEventHandler( "disconnect", this, id ); - LOGP_F( INFO ) << "Done, id is " << id; -} - -void PeerConnect::OnPeerMessage( const string id, const char* buffer, const size_t size ) { +void PeerConnect::OnMessage( const string channel, const char* buffer, const size_t size ) { Buffer buf( buffer, size ); - message_handler_( this, id, buf ); + + if ( event_handler_.find( "message" ) != event_handler_.end() ) { + CallEventHandler( "message", channel, buf ); + } } -void PeerConnect::OnPeerWritable( const string id ) { - if ( event_handler_.find( "writable" ) == event_handler_.end() ) { - return; +void PeerConnect::OnWritable( const string channel ) { + if ( event_handler_.find( "writable" ) != event_handler_.end() ) { + CallEventHandler( "writable", channel ); } - CallEventHandler( "writable", this, id ); - LOGP_F( INFO ) << "Done, id is " << id; + LOGP_F( INFO ) << "Done, channel is " << channel; } -void PeerConnect::OnError( const string id, const string& reason ) { - if ( event_handler_.find( "error" ) == event_handler_.end() ) { - return; - } - - error_reason_ = reason; - CallEventHandler( "error", this, id ); - LOGP_F( INFO ) << "Done, id is " << id << " and reason is " << reason; -} template void PeerConnect::CallEventHandler( string msg_id, A&& ... args ) @@ -303,14 +327,4 @@ bool PeerConnect::ParseOptions( const string& options ) { return true; } -std::string PeerConnect::tolower( const string& str ) { - std::locale loc; - string lower_str; - for ( auto elem : str ) { - lower_str += std::tolower( elem, loc ); - } - return lower_str; -} - - } // namespace pc \ No newline at end of file diff --git a/src/peerconnect.h b/src/peerconnect.h index c021ed2..380bac8 100644 --- a/src/peerconnect.h +++ b/src/peerconnect.h @@ -12,10 +12,9 @@ #include #include +#include "common.h" #include "controlobserver.h" -#define function_pc [&] - namespace pc { class Control; @@ -52,32 +51,28 @@ public: static void Run(); static void Stop(); - void SignIn( const string alias = "", const string id = "", const string password = "" ); - void SignOut(); - void Connect( const string id ); - void Disconnect( const string id ); - void Send( const string& id, const char* buffer, const size_t size ); - void Send( const string& id, const char* buffer ); - void Send( const string& id, const string& message ); - bool SyncSend( const string& id, const char* buffer, const size_t size ); - bool SyncSend( const string& id, const char* buffer ); - bool SyncSend( const string& id, const string& message ); - string GetErrorMessage() { return error_reason_; } - - static std::string CreateRandomUuid(); - - PeerConnect& On( string event_id, std::function ); - PeerConnect& OnMessage( std::function ); + void Open(); + void Close( const string channel = "" ); + void Connect( const string channel ); + bool Send( const string& channel, const char* buffer, const size_t size, const bool wait = WAITING_OFF ); + bool Send( const string& channel, const char* buffer, const bool wait = WAITING_OFF ); + bool Send( const string& channel, const string& message, const bool wait = WAITING_OFF ); + bool SetOptions( const string options ); + PeerConnect& On( string event_id, std::function ); + PeerConnect& On( string event_id, std::function ); + PeerConnect& On( string event_id, std::function ); + PeerConnect& On( string event_id, std::function ); // // Member functions // - explicit PeerConnect(); - explicit PeerConnect( string options ); + explicit PeerConnect( const string channel = "" ); ~PeerConnect(); + static std::string CreateRandomUuid(); + protected: // The base type that is stored in the collection. @@ -96,9 +91,11 @@ protected: template void CallEventHandler( string msg_id, A&& ... args ); - using EventHandler_1 = EventHandler_t; - using EventHandler_2 = EventHandler_t; - using EventHandler_3 = EventHandler_t; + using EventHandler_1 = EventHandler_t<>; + using EventHandler_2 = EventHandler_t; + using EventHandler_3 = EventHandler_t; + using EventHandler_Close = EventHandler_t; + using EventHandler_Message = EventHandler_t; using Events = std::map>; using MessageHandler = std::function; @@ -106,27 +103,22 @@ protected: // ControlObserver implementation // - void OnSignedIn( const string id ); - void OnSignedOut( const string id ); - void OnPeerConnected( const string id ); - void OnPeerDisconnected( const string id ); - void OnPeerMessage( const string id, const char* buffer, const size_t size ); - void OnPeerWritable( const string id ); - void OnError( const string id, const string& reason ); - + void OnOpen( const string channel ); + void OnClose( const string channel, const pc::CloseCode code, const string desc = "" ); + void OnConnect( const string channel ); + void OnMessage( const string channel, const char* buffer, const size_t size ); + void OnWritable( const string channel ); bool ParseOptions( const string& options ); - std::string tolower( const string& str ); - bool signout_; + bool close_once_; Setting setting_; Events event_handler_; - MessageHandler message_handler_; std::shared_ptr control_; std::shared_ptr signal_; - string error_reason_; + string channel_; }; diff --git a/src/signalconnection.cc b/src/signalconnection.cc index 935bf5b..4551c86 100644 --- a/src/signalconnection.cc +++ b/src/signalconnection.cc @@ -4,31 +4,6 @@ * Ryan Lee */ -/* - Reference: socket.io-client-cpp - - Copyright (c) 2015, Melo Yao - All rights reserved. - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to all conditions. - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE. -*/ - #if defined(WEBRTC_WIN) #pragma warning(disable:4503) #endif @@ -43,7 +18,7 @@ namespace pc { Signal::Signal(const string url) : con_state_(con_closed), network_thread_(), - reconn_attempts_(0xFFFFFFFF), + reconn_attempts_(3), reconn_made_(0), reconn_delay_(5000), reconn_delay_max_(25000), @@ -59,7 +34,7 @@ Signal::Signal(const string url) : // Default settings if (url_.empty()) { - url_ = "wss://signal.throughnet.com/hello"; + url_ = "wss://signal.peers.io/hello"; } // Initialize ASIO @@ -80,19 +55,11 @@ Signal::Signal(const string url) : } Signal::~Signal() { -#if _DEBUG || DEBUG - client_.clear_access_channels(websocketpp::log::alevel::all); - client_.set_access_channels(websocketpp::log::alevel::fail); -#else - client_.clear_access_channels(websocketpp::log::elevel::all); - client_.clear_access_channels(websocketpp::log::alevel::all); -#endif - Teardown(); LOGP_F( INFO ) << "Done"; } -void Signal::SignIn(const string& id, const string& password) { +void Signal::Open(const string& id, const string& password) { user_id_ = id; user_password_ = password; Connect(); @@ -100,11 +67,45 @@ void Signal::SignIn(const string& id, const string& password) { LOGP_F( INFO ) << "Done"; } -void Signal::SignOut() { - if (opened()) Close(); - LOGP_F( INFO ) << "Done"; +void Signal::Close() { + + if ( !opened() ) { + LOGP_F( WARNING ) << "It is not opened"; + return; + } + + con_state_ = con_closing; + client_.get_io_service().dispatch(websocketpp::lib::bind(&Signal::CloseInternal, + this, + websocketpp::close::status::normal, + "End by user")); + LOGP_F( INFO ) << "Done"; } + +void Signal::SyncClose() +{ + if ( !opened() ) { + LOGP_F( WARNING ) << "It is not opened"; + return; + } + + con_state_ = con_closing; + client_.get_io_service().dispatch(websocketpp::lib::bind(&Signal::CloseInternal, + this, + websocketpp::close::status::normal, + "End by user")); + + if (network_thread_ && network_thread_->joinable() ) + { + network_thread_->join(); + network_thread_.reset(); + } + + LOGP_F( INFO ) << "Done"; +} + + void Signal::SendCommand(const string channel, const string commandname, const Json::Value& data) { @@ -148,8 +149,6 @@ void Signal::SendGlobalCommand(const string commandname, SendCommand("", commandname, data); } - - void Signal::Connect() { if (reconn_timer_) @@ -184,36 +183,13 @@ void Signal::Connect() LOGP_F( INFO ) << "Done"; } - -void Signal::Close() -{ - con_state_ = con_closing; - client_.get_io_service().dispatch(websocketpp::lib::bind(&Signal::CloseInternal, - this, - websocketpp::close::status::normal, - "End by user")); - LOGP_F( INFO ) << "Done"; -} - -void Signal::SyncClose() -{ - con_state_ = con_closing; - client_.get_io_service().dispatch(websocketpp::lib::bind(&Signal::CloseInternal, - this, - websocketpp::close::status::normal, - "End by user")); - if (network_thread_) - { - network_thread_->join(); - network_thread_.reset(); - } - LOGP_F( INFO ) << "Done"; -} void Signal::Teardown() { - // TODO: Asyncronous close with PeerConnect::Stop() - SyncClose(); + if ( network_thread_ && network_thread_->joinable() ) { + network_thread_->detach(); + network_thread_.reset(); + } LOGP_F( INFO ) << "Done"; } @@ -225,13 +201,13 @@ asio::io_service& Signal::GetIoService() -void Signal::SendSignInCommand() { +void Signal::SendOpenCommand() { Json::Value data; data["user_id"] = user_id_; data["user_password"] = user_password_; - SendGlobalCommand("signin", data); + SendGlobalCommand("open", data); } void Signal::OnCommandReceived(Json::Value& message) { @@ -263,9 +239,9 @@ void Signal::ConnectInternal() -void Signal::CloseInternal(websocketpp::close::status::value const& code, string const& reason) +void Signal::CloseInternal(websocketpp::close::status::value const& code, string const& desc) { - LOGP_F(WARNING) << "Close by reason:" << reason; + LOGP_F(WARNING) << "Close by reason:" << desc; if (reconn_timer_) { @@ -279,7 +255,7 @@ void Signal::CloseInternal(websocketpp::close::status::value const& code, string else { websocketpp::lib::error_code ec; - client_.close(con_hdl_, code, reason, ec); + client_.close(con_hdl_, code, desc, ec); } } @@ -308,10 +284,86 @@ unsigned Signal::NextDelay() const } +void Signal::OnOpen(websocketpp::connection_hdl con) +{ + LOGP_F(WARNING) << "Connected."; + con_state_ = con_opened; + con_hdl_ = con; + reconn_made_ = 0; + + SendOpenCommand(); +} + + +void Signal::OnClose(websocketpp::connection_hdl con) +{ + // + // This routine will be called if a connection disconnected. + // This routine will not be called when attempt to connection failed. + // + + con_state_ = con_closed; + websocketpp::lib::error_code ec; + websocketpp::close::status::value code = websocketpp::close::status::normal; + client_type::connection_ptr conn_ptr = client_.get_con_from_hdl(con, ec); + if (ec) { + LOGP_F(LERROR) << "get conn failed" << ec; + } + else { + code = conn_ptr->get_local_close_code(); + } + + con_hdl_.reset(); + + if (code == websocketpp::close::status::normal) + { + // NOTHING + } + else + { + // + // TODO: Implement seamless signal reconnection. Don't close existing ice connection. + // Reconstruction of (creating and joining) channel has to be implemented. + // + + //if (reconn_made_NextDelay(); + // reconn_timer_.reset(new websocketpp::lib::asio::steady_timer(client_.get_io_service())); + // websocketpp::lib::asio::error_code ec; + // reconn_timer_->expires_from_now(websocketpp::lib::asio::milliseconds(delay), ec); + // reconn_timer_->async_wait(websocketpp::lib::bind(&Signal::TimeoutReconnect, this, websocketpp::lib::placeholders::_1)); + // return; + //} + + SignalOnClosed_(code); + } + + LOGP_F( INFO ) << "Done"; + +} + void Signal::OnFail(websocketpp::connection_hdl con) { + // + // This routine will be called when attempt to connection failed. + // This routine will not be called if a connection abnormally disconnected. + // + + websocketpp::lib::error_code ec; + websocketpp::close::status::value code = websocketpp::close::status::abnormal_close; + client_type::connection_ptr conn_ptr = client_.get_con_from_hdl(con, ec); + if (ec) { + LOGP_F(LERROR) << "get conn failed" << ec; + } + else { + code = conn_ptr->get_local_close_code(); + } + con_hdl_.reset(); con_state_ = con_closed; + LOGP_F(LERROR) << "Connection failed."; if (reconn_made_expires_from_now(websocketpp::lib::asio::milliseconds(delay), ec); reconn_timer_->async_wait(websocketpp::lib::bind(&Signal::TimeoutReconnect, this, websocketpp::lib::placeholders::_1)); } + else { + SignalOnClosed_(code); + } } -void Signal::OnOpen(websocketpp::connection_hdl con) -{ - LOGP_F(WARNING) << "Connected."; - con_state_ = con_opened; - con_hdl_ = con; - reconn_made_ = 0; - - SendSignInCommand(); -} - - -void Signal::OnClose(websocketpp::connection_hdl con) -{ - con_state_ = con_closed; - websocketpp::lib::error_code ec; - websocketpp::close::status::value code = websocketpp::close::status::normal; - client_type::connection_ptr conn_ptr = client_.get_con_from_hdl(con, ec); - if (ec) { - LOGP_F(LERROR) << "OnClose get conn failed" << ec; - } - else - { - code = conn_ptr->get_local_close_code(); - } - - con_hdl_.reset(); - - SignalOnClosed_(code); - - if (code == websocketpp::close::status::normal) - { - // NOTHING - } - else - { - if (reconn_made_NextDelay(); - reconn_timer_.reset(new websocketpp::lib::asio::steady_timer(client_.get_io_service())); - websocketpp::lib::asio::error_code ec; - reconn_timer_->expires_from_now(websocketpp::lib::asio::milliseconds(delay), ec); - reconn_timer_->async_wait(websocketpp::lib::bind(&Signal::TimeoutReconnect, this, websocketpp::lib::placeholders::_1)); - return; - } - } - - LOGP_F( INFO ) << "Done"; - -} - void Signal::OnMessage(websocketpp::connection_hdl con, client_type::message_ptr msg) { diff --git a/src/signalconnection.h b/src/signalconnection.h index 59fbff6..6fb8fd4 100644 --- a/src/signalconnection.h +++ b/src/signalconnection.h @@ -52,8 +52,8 @@ namespace pc { class SignalInterface { public: - virtual void SignIn(const std::string& id, const std::string& password) = 0; - virtual void SignOut() = 0; + virtual void Open(const std::string& id, const std::string& password) = 0; + virtual void Close() = 0; virtual void SendCommand(const std::string id, const std::string commandname, @@ -95,9 +95,9 @@ public: Signal(const string url); ~Signal(); - void SignIn(const string& id, const string& password); - void SignOut(); - + void Open(const string& id, const string& password); + void Close(); + void SyncClose(); void SendCommand(const string channel, const string commandname, @@ -115,17 +115,15 @@ public: protected: void Connect(); - void Close(); - void SyncClose(); asio::io_service& GetIoService(); private: - void SendSignInCommand(); + void SendOpenCommand(); void OnCommandReceived(Json::Value& message); void RunLoop(); void ConnectInternal(); - void CloseInternal(websocketpp::close::status::value const& code, string const& reason); + void CloseInternal(websocketpp::close::status::value const& code, string const& desc); void TimeoutReconnect(websocketpp::lib::asio::error_code const& ec); unsigned NextDelay() const; @@ -153,7 +151,7 @@ private: unsigned reconn_attempts_; unsigned reconn_made_; - // Signin + // Signal server string url_; string user_id_; string user_password_; diff --git a/src/test/test_main.cc b/src/test/test_main.cc index 36d7db9..df3c230 100644 --- a/src/test/test_main.cc +++ b/src/test/test_main.cc @@ -33,71 +33,71 @@ void test_normal() { std::string server = PeerConnect::CreateRandomUuid(); std::string client = PeerConnect::CreateRandomUuid(); - PeerConnect pc1; - PeerConnect pc2; + PeerConnect pc1(server); + PeerConnect pc2(client); - pc1.On("signin", function_pc(PeerConnect* pc, string id) { - assert(id == server); - std::cout << "pc1: signedin" << std::endl; - pc2.SignIn(client); + pc1.On("open", function_pc( string peer ) { + assert(peer == server); + std::cout << "pc1: open" << std::endl; + pc2.Open(); }); - pc1.On("connect", function_pc(PeerConnect* pc, string id) { - assert(id == client); - std::cout << "pc1: pc2(" << id << ") connected" << std::endl; + pc1.On("connect", function_pc( string peer ) { + assert(peer == client); + std::cout << "pc1: pc2(" << peer << ") connected" << std::endl; }); - pc1.On("disconnect", function_pc(PeerConnect* pc, string id) { - assert(id == client); - std::cout << "pc1: pc2 disconnected" << std::endl; + pc1.On("close", function_pc( string peer, CloseCode code, string desc ) { + assert(peer == client || peer == server); + if ( peer == client ) { + std::cout << "pc1: pc2 disconnected" << std::endl; + } + else if ( peer == server ) { + std::cout << "pc1: close out" << std::endl; + pc2.Close(); + } }); - pc1.On("signout", function_pc(PeerConnect* pc, string id) { - assert(id == server); - std::cout << "pc1: signed out" << std::endl; - pc2.SignOut(); - }); - - pc1.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) { + pc1.On("message", function_pc( string peer, PeerConnect::Buffer& data ) { assert(std::string(data.buf_, data.size_) == "Ping"); - assert(id == client); + assert(peer == client); std::cout << "pc1: a message has been received" << std::endl; - pc->Send(client, "Pong"); + pc1.Send(client, "Pong"); }); - pc2.On("signin", function_pc(PeerConnect* pc, string id) { - assert(id == client); - std::cout << "pc2: signedin" << std::endl; - pc->Connect(server); + pc2.On("open", function_pc( string peer ) { + assert(peer == client); + std::cout << "pc2: open" << std::endl; + pc2.Connect(server); }); - pc2.On("connect", function_pc(PeerConnect* pc, string id) { - assert(id == server); - std::cout << "pc2: pc1(" << id << ") connected" << std::endl; - pc->Send(server, "Ping"); + pc2.On("connect", function_pc( string peer ) { + assert(peer == server); + std::cout << "pc2: pc1(" << peer << ") connected" << std::endl; + pc2.Send(server, "Ping"); }); - pc2.On("disconnect", function_pc(PeerConnect* pc, string id) { - assert(id == server); - std::cout << "pc2: pc1 disconnected" << std::endl; - pc1.SignOut(); + pc2.On("close", function_pc( string peer, CloseCode code, string desc ) { + assert( peer == server || peer == client ); + if ( peer == server ) { + std::cout << "pc2: pc1 disconnected" << std::endl; + pc1.Close(); + } + else if ( peer == client ) { + std::cout << "pc2: close out" << std::endl; + PeerConnect::Stop(); + } }); - pc2.On("signout", function_pc(PeerConnect* pc, string id){ - assert(id == client); - std::cout << "pc2: signed out" << std::endl; - PeerConnect::Stop(); - }); - - pc2.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) { + pc2.On("message", function_pc( string peer, PeerConnect::Buffer& data ) { assert(std::string(data.buf_, data.size_) == "Pong"); - assert(id == server); + assert(peer == server); std::cout << "pc2 has received message" << std::endl; - pc->Disconnect(server); + pc2.Close(server); }); - pc1.SignIn(server); + pc1.Open(); PeerConnect::Run(); } @@ -107,81 +107,83 @@ void test_writable() { std::string server = PeerConnect::CreateRandomUuid(); std::string client = PeerConnect::CreateRandomUuid(); - PeerConnect pc1; - PeerConnect pc2; + PeerConnect pc1(server); + PeerConnect pc2(client); - pc1.On("signin", function_pc(PeerConnect* pc, string id) { - assert(id == server); - std::cout << "pc1: signedin" << std::endl; - pc2.SignIn(client); + pc1.On("open", function_pc( string peer ) { + assert(peer == server); + std::cout << "pc1: open" << std::endl; + pc2.Open(); }); - pc1.On("connect", function_pc(PeerConnect* pc, string id) { - assert(id == client); - std::cout << "pc1: pc2(" << id << ") connected" << std::endl; + pc1.On("connect", function_pc( string peer ) { + assert(peer == client); + std::cout << "pc1: pc2(" << peer << ") connected" << std::endl; }); - pc1.On("disconnect", function_pc(PeerConnect* pc, string id) { - assert(id == client); - std::cout << "pc1: pc2 disconnected" << std::endl; + pc1.On("close", function_pc( string peer, CloseCode code, string desc ) { + assert( peer == client || peer == server ); + if ( peer == client ) { + std::cout << "pc1: pc2 disconnected" << std::endl; + } + else if ( peer == server ) { + std::cout << "pc1: close" << std::endl; + pc2.Close(); + } }); - pc1.On("writable", function_pc(PeerConnect* pc, string id){ - assert(id == server); + pc1.On("writable", function_pc( string peer ){ + assert(peer == server); std::cout << "pc1: writable" << std::endl; }); - pc1.On("signout", function_pc(PeerConnect* pc, string id) { - assert(id == server); - std::cout << "pc1: signed out" << std::endl; - pc2.SignOut(); - }); - - pc1.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) { + pc1.On("message", function_pc( string peer, PeerConnect::Buffer& data ) { assert(std::string(data.buf_, data.size_) == "Ping"); - assert(id == client); + assert(peer == client); std::cout << "pc1: a message has been received" << std::endl; - pc->Send(client, "Pong"); + pc1.Send(client, "Pong"); }); - pc2.On("signin", function_pc(PeerConnect* pc, string id) { - assert(id == client); - std::cout << "pc2: signedin" << std::endl; - pc->Connect(server); + pc2.On("open", function_pc( string peer ) { + assert(peer == client); + std::cout << "pc2: open" << std::endl; + pc2.Connect(server); }); - pc2.On("connect", function_pc(PeerConnect* pc, string id) { - assert(id == server); - std::cout << "pc2: pc1(" << id << ") connected" << std::endl; + pc2.On("connect", function_pc( string peer ) { + assert( peer == server ); + std::cout << "pc2: pc1(" << peer << ") connected" << std::endl; }); - pc2.On("disconnect", function_pc(PeerConnect* pc, string id) { - assert(id == server); - std::cout << "pc2: pc1 disconnected" << std::endl; - pc1.SignOut(); + pc2.On("close", function_pc( string peer, CloseCode code, string desc ) { + assert( peer == server || peer == client); + if ( peer == server ) { + std::cout << "pc2: pc1 disconnected" << std::endl; + pc1.Close(); + } + else if ( peer == client ) { + std::cout << "pc2: close" << std::endl; + PeerConnect::Stop(); + } }); - pc2.On("writable", function_pc(PeerConnect* pc, string id){ - assert(id == client); + pc2.On("writable", function_pc( string peer ){ + assert(peer == client); std::cout << "pc2: writable" << std::endl; - pc->Send(server, "Ping"); + pc2.Send(server, "Ping"); }); - pc2.On("signout", function_pc(PeerConnect* pc, string id){ - assert(id == client); - std::cout << "pc2: signed out" << std::endl; - PeerConnect::Stop(); - }); - - pc2.OnMessage(function_pc(PeerConnect* pc, string id, PeerConnect::Buffer& data) { + pc2.On("message", function_pc( string peer, PeerConnect::Buffer& data ) { assert(std::string(data.buf_, data.size_) == "Pong"); - assert(id == server); + assert(peer == server); std::cout << "pc2 has received message" << std::endl; - pc->Disconnect(server); + pc2.Close(server); }); - pc1.SignIn(server); + pc1.Open(); PeerConnect::Run(); -} \ No newline at end of file +} + +