diff options
author | toma <toma@283d02a7-25f6-0310-bc7c-ecb5cbfe19da> | 2009-11-25 17:56:58 +0000 |
---|---|---|
committer | toma <toma@283d02a7-25f6-0310-bc7c-ecb5cbfe19da> | 2009-11-25 17:56:58 +0000 |
commit | bcb704366cb5e333a626c18c308c7e0448a8e69f (patch) | |
tree | f0d6ab7d78ecdd9207cf46536376b44b91a1ca71 /kopete/protocols/jabber/jingle/libjingle/talk/p2p/base | |
download | tdenetwork-bcb704366cb5e333a626c18c308c7e0448a8e69f.tar.gz tdenetwork-bcb704366cb5e333a626c18c308c7e0448a8e69f.zip |
Copy the KDE 3.5 branch to branches/trinity for new KDE 3.5 features.
BUG:215923
git-svn-id: svn://anonsvn.kde.org/home/kde/branches/trinity/kdenetwork@1054174 283d02a7-25f6-0310-bc7c-ecb5cbfe19da
Diffstat (limited to 'kopete/protocols/jabber/jingle/libjingle/talk/p2p/base')
38 files changed, 8282 insertions, 0 deletions
diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/Makefile.am b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/Makefile.am new file mode 100644 index 00000000..6cc30f15 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/Makefile.am @@ -0,0 +1,47 @@ +## Does not compile with final +KDE_OPTIONS = nofinal + +libcricketp2pbase_la_SOURCES = stun.cc \ + port.cc \ + udpport.cc \ + tcpport.cc \ + helpers.cc \ + sessionmanager.cc \ + session.cc \ + p2psocket.cc \ + relayport.cc \ + stunrequest.cc \ + stunport.cc \ + socketmanager.cc + +noinst_HEADERS = candidate.h \ + portallocator.h \ + relayport.h \ + session.h \ + sessionmessage.h \ + stunport.h \ + tcpport.h \ + helpers.h \ + port.h \ + sessionid.h \ + socketmanager.h \ + stunrequest.h \ + udpport.h \ + p2psocket.h \ + sessiondescription.h \ + sessionmanager.h \ + stun.h \ + relayserver.h \ + stunserver.h + +AM_CPPFLAGS = -DPOSIX $(all_includes) -I$(srcdir)/../../.. + +bin_PROGRAMS = relayserver stunserver +relayserver_SOURCES = relayserver.cc relayserver_main.cc +relayserver_LDADD = ../../base/libcricketbase.la libcricketp2pbase.la -lpthread +stunserver_SOURCES = stunserver.cc stunserver_main.cc +stunserver_LDADD = ../../base/libcricketbase.la libcricketp2pbase.la -lpthread + +noinst_LTLIBRARIES = libcricketp2pbase.la + +DEFAULT_INCLUDES = -I$(srcdir)/../../.. diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/candidate.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/candidate.h new file mode 100644 index 00000000..c2f974b8 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/candidate.h @@ -0,0 +1,118 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _CANDIDATE_H_ +#define _CANDIDATE_H_ + +#include <string> +#include <sstream> +#include "talk/base/socketaddress.h" + +namespace cricket { + +// Candidate for ICE based connection discovery. + +class Candidate { +public: + + const std::string & name() const { return name_; } + void set_name(const std::string & name) { name_ = name; } + + const std::string & protocol() const { return protocol_; } + void set_protocol(const std::string & protocol) { protocol_ = protocol; } + + const SocketAddress & address() const { return address_; } + void set_address(const SocketAddress & address) { address_ = address; } + + const float preference() const { return preference_; } + void set_preference(const float preference) { preference_ = preference; } + const std::string preference_str() const { + std::ostringstream ost; + ost << preference_; + return ost.str(); + } + void set_preference_str(const std::string & preference) { + std::istringstream ist(preference); + ist >> preference_; + } + + const std::string & username() const { return username_; } + void set_username(const std::string & username) { username_ = username; } + + const std::string & password() const { return password_; } + void set_password(const std::string & password) { password_ = password; } + + const std::string & type() const { return type_; } + void set_type(const std::string & type) { type_ = type; } + + const std::string & network_name() const { return network_name_; } + void set_network_name(const std::string & network_name) { + network_name_ = network_name; + } + + // Candidates in a new generation replace those in the old generation. + uint32 generation() const { return generation_; } + void set_generation(uint32 generation) { generation_ = generation; } + const std::string generation_str() const { + std::ostringstream ost; + ost << generation_; + return ost.str(); + } + void set_generation_str(const std::string& str) { + std::istringstream ist(str); + ist >> generation_; + } + + // Determines whether this candidate is equivalent to the given one. + bool IsEquivalent(const Candidate& c) const { + // We ignore the network name, since that is just debug information, and + // the preference, since that should be the same if the rest is (and it's + // a float so equality checking is always worrisome). + return (name_ == c.name_) && + (protocol_ == c.protocol_) && + (address_ == c.address_) && + (username_ == c.username_) && + (password_ == c.password_) && + (type_ == c.type_) && + (generation_ == c.generation_); + } + +private: + std::string name_; + std::string protocol_; + SocketAddress address_; + float preference_; + std::string username_; + std::string password_; + std::string type_; + std::string network_name_; + uint32 generation_; +}; + +} // namespace cricket + +#endif // _CANDIDATE_H_ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/helpers.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/helpers.cc new file mode 100644 index 00000000..83e02a27 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/helpers.cc @@ -0,0 +1,129 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "talk/p2p/base/helpers.h" +#include "talk/base/jtime.h" +#include <cstdlib> +#include <cassert> + +// TODO: Change this implementation to use OpenSSL's RAND_bytes. That will +// give cryptographically random values on all platforms. + +#ifdef WIN32 +#include <time.h> +#include <windows.h> +#endif + +namespace cricket { + +static long g_seed = 1L; + +int GetRandom() { + return ((g_seed = g_seed * 214013L + 2531011L) >> 16) & 0x7fff; +} + +void SetRandomSeed(unsigned long seed) +{ + g_seed = (long)seed; +} + +static bool s_initrandom; + +void InitRandom(const char *client_unique, size_t len) { + s_initrandom = true; + + // Hash this string - unique per client + + uint32 hash = 0; + if (client_unique != NULL) { + for (int i = 0; i < (int)len; i++) + hash = ((hash << 2) + hash) + client_unique[i]; + } + + // Now initialize the seed against a high resolution + // counter + +#ifdef WIN32 + LARGE_INTEGER big; + QueryPerformanceCounter(&big); + SetRandomSeed(big.LowPart ^ hash); +#else + SetRandomSeed(Time() ^ hash); +#endif +} + +const char BASE64[64] = { + 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', + 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', + 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', + 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '/' +}; + +// Generates a random string of the given length. We generate base64 values so +// that they will be printable, though that's not necessary. + +std::string CreateRandomString(int len) { + // Random number generator should of been initialized! + assert(s_initrandom); + if (!s_initrandom) + InitRandom(0, 0); + + std::string str; + for (int i = 0; i < len; i++) +#if defined(_MSC_VER) && _MSC_VER < 1300 + str.insert(str.end(), BASE64[GetRandom() & 63]); +#else + str.push_back(BASE64[GetRandom() & 63]); +#endif + return str; +} + +uint32 CreateRandomId() { + uint8 b1 = (uint8)(GetRandom() & 255); + uint8 b2 = (uint8)(GetRandom() & 255); + uint8 b3 = (uint8)(GetRandom() & 255); + uint8 b4 = (uint8)(GetRandom() & 255); + return b1 | (b2 << 8) | (b3 << 16) | (b4 << 24); +} + +bool IsBase64Char(char ch) { + return (('A' <= ch) && (ch <= 'Z')) || + (('a' <= ch) && (ch <= 'z')) || + (('0' <= ch) && (ch <= '9')) || + (ch == '+') || (ch == '/'); +} + +bool IsBase64Encoded(const std::string& str) { + for (size_t i = 0; i < str.size(); ++i) { + if (!IsBase64Char(str.at(i))) + return false; + } + return true; +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/helpers.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/helpers.h new file mode 100644 index 00000000..1c8dfa7f --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/helpers.h @@ -0,0 +1,51 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __HELPERS_H__ +#define __HELPERS_H__ + +#include "talk/base/basictypes.h" +#include <string> + +namespace cricket { + +// srand initializer +void InitRandom(const char *client_unique, size_t len); + +// Generates a (cryptographically) random string of the given length. +std::string CreateRandomString(int length); + +// Generates a random id +uint32 CreateRandomId(); + +// Determines whether the given string consists entirely of valid base64 +// encoded characters. +bool IsBase64Encoded(const std::string& str); + +} // namespace cricket + +#endif // __HELPERS_H__ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/p2psocket.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/p2psocket.cc new file mode 100644 index 00000000..eb53efeb --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/p2psocket.cc @@ -0,0 +1,910 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// Description of the P2PSocket class in P2PSocket.h +// +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif +#include <iostream> +#include <cassert> +#include "talk/base/logging.h" +#include "talk/p2p/base/p2psocket.h" +#include <errno.h> +namespace { + +// messages for queuing up work for ourselves +const uint32 MSG_SORT = 1; +const uint32 MSG_PING = 2; +const uint32 MSG_ALLOCATE = 3; + +// When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers) +// for pinging. When the socket is writable, we will use only 1 Kbps because +// we don't want to degrade the quality on a modem. These numbers should work +// well on a 28.8K modem, which is the slowest connection on which the voice +// quality is reasonable at all. +static const uint32 PING_PACKET_SIZE = 60 * 8; +static const uint32 WRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 1000; // 480ms +static const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000;// 50ms + +// If there is a current writable connection, then we will also try hard to +// make sure it is pinged at this rate. +static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900; // 2*WRITABLE_DELAY - bit + +// The minimum improvement in MOS that justifies a switch. +static const double kMinImprovement = 10; + +// Amount of time that we wait when *losing* writability before we try doing +// another allocation. +static const int kAllocateDelay = 1 * 1000; // 1 second + +// We will try creating a new allocator from scratch after a delay of this +// length without becoming writable (or timing out). +static const int kAllocatePeriod = 20 * 1000; // 20 seconds + +cricket::Port::CandidateOrigin GetOrigin(cricket::Port* port, + cricket::Port* origin_port) { + if (!origin_port) + return cricket::Port::ORIGIN_MESSAGE; + else if (port == origin_port) + return cricket::Port::ORIGIN_THIS_PORT; + else + return cricket::Port::ORIGIN_OTHER_PORT; +} + +// Compares two connections based only on static information about them. +int CompareConnectionCandidates(cricket::Connection* a, + cricket::Connection* b) { + // Combine local and remote preferences + assert(a->local_candidate().preference() == a->port()->preference()); + assert(b->local_candidate().preference() == b->port()->preference()); + double a_pref = a->local_candidate().preference() + * a->remote_candidate().preference(); + double b_pref = b->local_candidate().preference() + * b->remote_candidate().preference(); + + // Now check combined preferences. Lower values get sorted last. + if (a_pref > b_pref) + return 1; + if (a_pref < b_pref) + return -1; + + return 0; +} + +// Compare two connections based on their writability and static preferences. +int CompareConnections(cricket::Connection *a, cricket::Connection *b) { + // Sort based on write-state. Better states have lower values. + if (a->write_state() < b->write_state()) + return 1; + if (a->write_state() > b->write_state()) + return -1; + + // Compare the candidate information. + return CompareConnectionCandidates(a, b); +} + +// Wraps the comparison connection into a less than operator that puts higher +// priority writable connections first. +class ConnectionCompare { +public: + bool operator()(const cricket::Connection *ca, + const cricket::Connection *cb) { + cricket::Connection* a = const_cast<cricket::Connection*>(ca); + cricket::Connection* b = const_cast<cricket::Connection*>(cb); + + // Compare first on writability and static preferences. + int cmp = CompareConnections(a, b); + if (cmp > 0) + return true; + if (cmp < 0) + return false; + + // Otherwise, sort based on latency estimate. + return a->rtt() < b->rtt(); + + // Should we bother checking for the last connection that last received + // data? It would help rendezvous on the connection that is also receiving + // packets. + // + // TODO: Yes we should definitely do this. The TCP protocol gains + // efficiency by being used bidirectionally, as opposed to two separate + // unidirectional streams. This test should probably occur before + // comparison of local prefs (assuming combined prefs are the same). We + // need to be careful though, not to bounce back and forth with both sides + // trying to rendevous with the other. + } +}; + +// Determines whether we should switch between two connections, based first on +// static preferences and then (if those are equal) on latency estimates. +bool ShouldSwitch(cricket::Connection* a_conn, cricket::Connection* b_conn) { + if (a_conn == b_conn) + return false; + + if ((a_conn == NULL) || (b_conn == NULL)) // don't think the latter should happen + return true; + + int prefs_cmp = CompareConnections(a_conn, b_conn); + if (prefs_cmp < 0) + return true; + if (prefs_cmp > 0) + return false; + + return b_conn->rtt() <= a_conn->rtt() + kMinImprovement; +} + +} // unnamed namespace + +namespace cricket { + +P2PSocket::P2PSocket(const std::string &name, PortAllocator *allocator) +: worker_thread_(Thread::Current()), name_(name), allocator_(allocator), + error_(0), state_(STATE_CONNECTING), waiting_for_signaling_(false), + best_connection_(NULL), pinging_started_(false), sort_dirty_(false), + was_writable_(false), was_timed_out_(true) { +} + +P2PSocket::~P2PSocket() { + assert(worker_thread_ == Thread::Current()); + + thread()->Clear(this); + + for (uint32 i = 0; i < allocator_sessions_.size(); ++i) + delete allocator_sessions_[i]; +} + +// Add the allocator session to our list so that we know which sessions +// are still active. +void P2PSocket::AddAllocatorSession(PortAllocatorSession* session) { + session->set_generation(static_cast<uint32>(allocator_sessions_.size())); + allocator_sessions_.push_back(session); + + // We now only want to apply new candidates that we receive to the ports + // created by this new session because these are replacing those of the + // previous sessions. + ports_.clear(); + + session->SignalPortReady.connect(this, &P2PSocket::OnPortReady); + session->SignalCandidatesReady.connect(this, &P2PSocket::OnCandidatesReady); + session->GetInitialPorts(); + if (pinging_started_) + session->StartGetAllPorts(); +} + +// Go into the state of processing candidates, and running in general +void P2PSocket::StartProcessingCandidates() { + assert(worker_thread_ == Thread::Current()); + + // Kick off an allocator session + OnAllocate(); + + // Start pinging as the ports come in. + thread()->Post(this, MSG_PING); +} + +// Reset the socket, clear up any previous allocations and start over +void P2PSocket::Reset() { + assert(worker_thread_ == Thread::Current()); + + // Get rid of all the old allocators. This should clean up everything. + for (uint32 i = 0; i < allocator_sessions_.size(); ++i) + delete allocator_sessions_[i]; + + allocator_sessions_.clear(); + ports_.clear(); + connections_.clear(); + best_connection_ = NULL; + + // Forget about all of the candidates we got before. + remote_candidates_.clear(); + + // Revert to the connecting state. + set_state(STATE_CONNECTING); + + // Reinitialize the rest of our state. + waiting_for_signaling_ = false; + pinging_started_ = false; + sort_dirty_ = false; + was_writable_ = false; + was_timed_out_ = true; + + // Start a new allocator. + OnAllocate(); + + // Start pinging as the ports come in. + thread()->Clear(this); + thread()->Post(this, MSG_PING); +} + +// A new port is available, attempt to make connections for it +void P2PSocket::OnPortReady(PortAllocatorSession *session, Port* port) { + assert(worker_thread_ == Thread::Current()); + + // Set in-effect options on the new port + for (OptionMap::const_iterator it = options_.begin(); it != options_.end(); ++it) { + int val = port->SetOption(it->first, it->second); + if (val < 0) { + LOG(WARNING) << "SetOption(" << it->first << ", " << it->second << ") failed: " << port->GetError(); + } + } + + // Remember the ports and candidates, and signal that candidates are ready. + // The session will handle this, and send an initiate/accept/modify message + // if one is pending. + + ports_.push_back(port); + port->SignalUnknownAddress.connect(this, &P2PSocket::OnUnknownAddress); + port->SignalDestroyed.connect(this, &P2PSocket::OnPortDestroyed); + + // Attempt to create a connection from this new port to all of the remote + // candidates that we were given so far. + + std::vector<RemoteCandidate>::iterator iter; + for (iter = remote_candidates_.begin(); iter != remote_candidates_.end(); + ++iter) + CreateConnection(port, *iter, iter->origin_port(), false); + + SortConnections(); +} + +// A new candidate is available, let listeners know +void P2PSocket::OnCandidatesReady(PortAllocatorSession *session, + const std::vector<Candidate>& candidates) { + SignalCandidatesReady(this, candidates); +} + +// Handle stun packets +void P2PSocket::OnUnknownAddress(Port *port, + const SocketAddress &address, + StunMessage *stun_msg, + const std::string &remote_username) { + assert(worker_thread_ == Thread::Current()); + + // Port has received a valid stun packet from an address that no Connection + // is currently available for. See if the remote user name is in the remote + // candidate list. If it isn't return error to the stun request. + + const Candidate *candidate = NULL; + std::vector<RemoteCandidate>::iterator it; + for (it = remote_candidates_.begin(); it != remote_candidates_.end(); ++it) { + if ((*it).username() == remote_username) { + candidate = &(*it); + break; + } + } + if (candidate == NULL) { + // Don't know about this username, the request is bogus + // This sometimes happens if a binding response comes in before the ACCEPT + // message. It is totally valid; the retry state machine will try again. + + port->SendBindingErrorResponse(stun_msg, address, + STUN_ERROR_STALE_CREDENTIALS, STUN_ERROR_REASON_STALE_CREDENTIALS); + delete stun_msg; + return; + } + + // Check for connectivity to this address. Create connections + // to this address across all local ports. First, add this as a new remote + // address + + Candidate new_remote_candidate = *candidate; + new_remote_candidate.set_address(address); + //new_remote_candidate.set_protocol(port->protocol()); + + // This remote username exists. Now create connections using this candidate, + // and resort + + if (CreateConnections(new_remote_candidate, port, true)) { + // Send the pinger a successful stun response. + port->SendBindingResponse(stun_msg, address); + + // Update the list of connections since we just added another. We do this + // after sending the response since it could (in principle) delete the + // connection in question. + SortConnections(); + } else { + // Hopefully this won't occur, because changing a destination address + // shouldn't cause a new connection to fail + assert(false); + port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_SERVER_ERROR, + STUN_ERROR_REASON_SERVER_ERROR); + } + + delete stun_msg; +} + +// We received a candidate from the other side, make connections so we +// can try to use these remote candidates with our local candidates. +void P2PSocket::AddRemoteCandidates( + const std::vector<Candidate> &remote_candidates) { + assert(worker_thread_ == Thread::Current()); + + // The remote candidates have come in. Remember them and start to establish + // connections + + std::vector<Candidate>::const_iterator it; + for (it = remote_candidates.begin(); it != remote_candidates.end(); ++it) + CreateConnections(*it, NULL, false); + + // Resort the connections + + SortConnections(); +} + +// Creates connections from all of the ports that we care about to the given +// remote candidate. The return value is true iff we created a connection from +// the origin port. +bool P2PSocket::CreateConnections(const Candidate &remote_candidate, + Port* origin_port, + bool readable) { + assert(worker_thread_ == Thread::Current()); + + // Add a new connection for this candidate to every port that allows such a + // connection (i.e., if they have compatible protocols) and that does not + // already have a connection to an equivalent candidate. We must be careful + // to make sure that the origin port is included, even if it was pruned, + // since that may be the only port that can create this connection. + + bool created = false; + + std::vector<Port *>::reverse_iterator it; + for (it = ports_.rbegin(); it != ports_.rend(); ++it) { + if (CreateConnection(*it, remote_candidate, origin_port, readable)) { + if (*it == origin_port) + created = true; + } + } + + if ((origin_port != NULL) && + find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) { + if (CreateConnection(origin_port, remote_candidate, origin_port, readable)) + created = true; + } + + // Remember this remote candidate so that we can add it to future ports. + RememberRemoteCandidate(remote_candidate, origin_port); + + return created; +} + +// Setup a connection object for the local and remote candidate combination. +// And then listen to connection object for changes. +bool P2PSocket::CreateConnection(Port* port, + const Candidate& remote_candidate, + Port* origin_port, + bool readable) { + // Look for an existing connection with this remote address. If one is not + // found, then we can create a new connection for this address. + Connection* connection = port->GetConnection(remote_candidate.address()); + if (connection != NULL) { + // It is not legal to try to change any of the parameters of an existing + // connection; however, the other side can send a duplicate candidate. + if (!remote_candidate.IsEquivalent(connection->remote_candidate())) { + LOG(INFO) << "Attempt to change a remote candidate"; + return false; + } + } else { + Port::CandidateOrigin origin = GetOrigin(port, origin_port); + connection = port->CreateConnection(remote_candidate, origin); + if (!connection) + return false; + + connections_.push_back(connection); + connection->SignalReadPacket.connect(this, &P2PSocket::OnReadPacket); + connection->SignalStateChange.connect(this, &P2PSocket::OnConnectionStateChange); + connection->SignalDestroyed.connect(this, &P2PSocket::OnConnectionDestroyed); + } + + // If we are readable, it is because we are creating this in response to a + // ping from the other side. This will cause the state to become readable. + if (readable) + connection->ReceivedPing(); + + return true; +} + +// Maintain our remote candidate list, adding this new remote one. +void P2PSocket::RememberRemoteCandidate(const Candidate& remote_candidate, + Port* origin_port) { + // Remove any candidates whose generation is older than this one. The + // presence of a new generation indicates that the old ones are not useful. + uint32 i = 0; + while (i < remote_candidates_.size()) { + if (remote_candidates_[i].generation() < remote_candidate.generation()) { + remote_candidates_.erase(remote_candidates_.begin() + i); + LOG(INFO) << "Pruning candidate from old generation: " + << remote_candidates_[i].address().ToString(); + + } else { + i += 1; + } + } + + // Make sure this candidate is not a duplicate. + for (uint32 i = 0; i < remote_candidates_.size(); ++i) { + if (remote_candidates_[i].IsEquivalent(remote_candidate)) { + LOG(INFO) << "Duplicate candidate: " + << remote_candidate.address().ToString(); + return; + } + } + + // Try this candidate for all future ports. + remote_candidates_.push_back(RemoteCandidate(remote_candidate, origin_port)); + + // We have some candidates from the other side, we are now serious about + // this connection. Let's do the StartGetAllPorts thing. + if (!pinging_started_) { + pinging_started_ = true; + for (size_t i = 0; i < allocator_sessions_.size(); ++i) { + if (!allocator_sessions_[i]->IsGettingAllPorts()) + allocator_sessions_[i]->StartGetAllPorts(); + } + } +} + +// Send data to the other side, using our best connection +int P2PSocket::Send(const char *data, size_t len) { + // This can get called on any thread that is convenient to write from! + if (best_connection_ == NULL) { + error_ = EWOULDBLOCK; + return SOCKET_ERROR; + } + int sent = best_connection_->Send(data, len); + if (sent <= 0) { + assert(sent < 0); + error_ = best_connection_->GetError(); + } + return sent; +} + +// Monitor connection states +void P2PSocket::UpdateConnectionStates() { + uint32 now = Time(); + + // We need to copy the list of connections since some may delete themselves + // when we call UpdateState. + for (uint32 i = 0; i < connections_.size(); ++i) + connections_[i]->UpdateState(now); +} + +// Prepare for best candidate sorting +void P2PSocket::RequestSort() { + if (!sort_dirty_) { + worker_thread_->Post(this, MSG_SORT); + sort_dirty_ = true; + } +} + +// Sort the available connections to find the best one. We also monitor +// the number of available connections and the current state so that we +// can possibly kick off more allocators (for more connections). +void P2PSocket::SortConnections() { + assert(worker_thread_ == Thread::Current()); + + // Make sure the connection states are up-to-date since this affects how they + // will be sorted. + UpdateConnectionStates(); + + // Any changes after this point will require a re-sort. + sort_dirty_ = false; + + // Get a list of the networks that we are using. + std::set<Network*> networks; + for (uint32 i = 0; i < connections_.size(); ++i) + networks.insert(connections_[i]->port()->network()); + + // Find the best alternative connection by sorting. It is important to note + // that amongst equal preference, writable connections, this will choose the + // one whose estimated latency is lowest. So it is the only one that we + // need to consider switching to. + + ConnectionCompare cmp; + std::stable_sort(connections_.begin(), connections_.end(), cmp); + Connection* top_connection = NULL; + if (connections_.size() > 0) + top_connection = connections_[0]; + + // If necessary, switch to the new choice. + if (ShouldSwitch(best_connection_, top_connection)) + SwitchBestConnectionTo(top_connection); + + // We can prune any connection for which there is a writable connection on + // the same network with better or equal prefences. We leave those with + // better preference just in case they become writable later (at which point, + // we would prune out the current best connection). We leave connections on + // other networks because they may not be using the same resources and they + // may represent very distinct paths over which we can switch. + std::set<Network*>::iterator network; + for (network = networks.begin(); network != networks.end(); ++network) { + Connection* primier = GetBestConnectionOnNetwork(*network); + if (!primier || (primier->write_state() != Connection::STATE_WRITABLE)) + continue; + + for (uint32 i = 0; i < connections_.size(); ++i) { + if ((connections_[i] != primier) && + (connections_[i]->port()->network() == *network) && + (CompareConnectionCandidates(primier, connections_[i]) >= 0)) { + connections_[i]->Prune(); + } + } + } + + // Count the number of connections in the various states. + + int writable = 0; + int write_connect = 0; + int write_timeout = 0; + + for (uint32 i = 0; i < connections_.size(); ++i) { + switch (connections_[i]->write_state()) { + case Connection::STATE_WRITABLE: + ++writable; + break; + case Connection::STATE_WRITE_CONNECT: + ++write_connect; + break; + case Connection::STATE_WRITE_TIMEOUT: + ++write_timeout; + break; + default: + assert(false); + } + } + + if (writable > 0) { + HandleWritable(); + } else if (write_connect > 0) { + HandleNotWritable(); + } else { + HandleAllTimedOut(); + } + + // Notify of connection state change + SignalConnectionMonitor(this); +} + +// Track the best connection, and let listeners know +void P2PSocket::SwitchBestConnectionTo(Connection* conn) { + best_connection_ = conn; + if (best_connection_) + SignalConnectionChanged(this, + best_connection_->remote_candidate().address()); +} + +// We checked the status of our connections and we had at least one that +// was writable, go into the writable state. +void P2PSocket::HandleWritable() { + // + // One or more connections writable! + // + if (state_ != STATE_WRITABLE) { + for (uint32 i = 0; i < allocator_sessions_.size(); ++i) { + if (allocator_sessions_[i]->IsGettingAllPorts()) { + allocator_sessions_[i]->StopGetAllPorts(); + } + } + + // Stop further allocations. + thread()->Clear(this, MSG_ALLOCATE); + } + + // We're writable, obviously we aren't timed out + was_writable_ = true; + was_timed_out_ = false; + set_state(STATE_WRITABLE); +} + +// We checked the status of our connections and we didn't have any that +// were writable, go into the connecting state (kick off a new allocator +// session). +void P2PSocket::HandleNotWritable() { + // + // No connections are writable but not timed out! + // + if (was_writable_) { + // If we were writable, let's kick off an allocator session immediately + was_writable_ = false; + OnAllocate(); + } + + // We were connecting, obviously not ALL timed out. + was_timed_out_ = false; + set_state(STATE_CONNECTING); +} + +// We checked the status of our connections and not only weren't they writable +// but they were also timed out, we really need a new allocator. +void P2PSocket::HandleAllTimedOut() { + // + // No connections... all are timed out! + // + if (!was_timed_out_) { + // We weren't timed out before, so kick off an allocator now (we'll still + // be in the fully timed out state until the allocator actually gives back + // new ports) + OnAllocate(); + } + + // NOTE: we start was_timed_out_ in the true state so that we don't get + // another allocator created WHILE we are in the process of building up + // our first allocator. + was_timed_out_ = true; + was_writable_ = false; + set_state(STATE_CONNECTING); +} + +// If we have a best connection, return it, otherwise return top one in the +// list (later we will mark it best). +Connection* P2PSocket::GetBestConnectionOnNetwork(Network* network) { + // If the best connection is on this network, then it wins. + if (best_connection_ && (best_connection_->port()->network() == network)) + return best_connection_; + + // Otherwise, we return the top-most in sorted order. + for (uint32 i = 0; i < connections_.size(); ++i) { + if (connections_[i]->port()->network() == network) + return connections_[i]; + } + + return NULL; +} + +// Handle any queued up requests +void P2PSocket::OnMessage(Message *pmsg) { + if (pmsg->message_id == MSG_SORT) + OnSort(); + else if (pmsg->message_id == MSG_PING) + OnPing(); + else if (pmsg->message_id == MSG_ALLOCATE) + OnAllocate(); + else + assert(false); +} + +// Handle queued up sort request +void P2PSocket::OnSort() { + // Resort the connections based on the new statistics. + SortConnections(); +} + +// Handle queued up ping request +void P2PSocket::OnPing() { + // Make sure the states of the connections are up-to-date (since this affects + // which ones are pingable). + UpdateConnectionStates(); + + // Find the oldest pingable connection and have it do a ping. + Connection* conn = FindNextPingableConnection(); + if (conn) + conn->Ping(Time()); + + // Post ourselves a message to perform the next ping. + uint32 delay = (state_ == STATE_WRITABLE) ? WRITABLE_DELAY : UNWRITABLE_DELAY; + thread()->PostDelayed(delay, this, MSG_PING); +} + +// Is the connection in a state for us to even consider pinging the other side? +bool P2PSocket::IsPingable(Connection* conn) { + // An unconnected connection cannot be written to at all, so pinging is out + // of the question. + if (!conn->connected()) + return false; + + if (state_ == STATE_WRITABLE) { + // If we are writable, then we only want to ping connections that could be + // better than this one, i.e., the ones that were not pruned. + return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT); + } else { + // If we are not writable, then we need to try everything that might work. + // This includes both connections that do not have write timeout as well as + // ones that do not have read timeout. A connection could be readable but + // be in write-timeout if we pruned it before. Since the other side is + // still pinging it, it very well might still work. + return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) || + (conn->read_state() != Connection::STATE_READ_TIMEOUT); + } +} + +// Returns the next pingable connection to ping. This will be the oldest +// pingable connection unless we have a writable connection that is past the +// maximum acceptable ping delay. +Connection* P2PSocket::FindNextPingableConnection() { + uint32 now = Time(); + if (best_connection_ && + (best_connection_->write_state() == Connection::STATE_WRITABLE) && + (best_connection_->last_ping_sent() + + MAX_CURRENT_WRITABLE_DELAY <= now)) { + return best_connection_; + } + + Connection* oldest_conn = NULL; + uint32 oldest_time = 0xFFFFFFFF; + for (uint32 i = 0; i < connections_.size(); ++i) { + if (IsPingable(connections_[i])) { + if (connections_[i]->last_ping_sent() < oldest_time) { + oldest_time = connections_[i]->last_ping_sent(); + oldest_conn = connections_[i]; + } + } + } + return oldest_conn; +} + +// return the number of "pingable" connections +uint32 P2PSocket::NumPingableConnections() { + uint32 count = 0; + for (uint32 i = 0; i < connections_.size(); ++i) { + if (IsPingable(connections_[i])) + count += 1; + } + return count; +} + +// When a connection's state changes, we need to figure out who to use as +// the best connection again. It could have become usable, or become unusable. +void P2PSocket::OnConnectionStateChange(Connection *connection) { + assert(worker_thread_ == Thread::Current()); + + // We have to unroll the stack before doing this because we may be changing + // the state of connections while sorting. + RequestSort(); +} + +// When a connection is removed, edit it out, and then update our best +// connection. +void P2PSocket::OnConnectionDestroyed(Connection *connection) { + assert(worker_thread_ == Thread::Current()); + + // Remove this connection from the list. + std::vector<Connection*>::iterator iter = + find(connections_.begin(), connections_.end(), connection); + assert(iter != connections_.end()); + connections_.erase(iter); + + LOG(INFO) << "Removed connection from p2p socket: " + << static_cast<int>(connections_.size()) << " remaining"; + + // If this is currently the best connection, then we need to pick a new one. + // The call to SortConnections will pick a new one. It looks at the current + // best connection in order to avoid switching between fairly similar ones. + // Since this connection is no longer an option, we can just set best to NULL + // and re-choose a best assuming that there was no best connection. + if (best_connection_ == connection) { + SwitchBestConnectionTo(NULL); + RequestSort(); + } +} + +// When a port is destroyed remove it from our list of ports to use for +// connection attempts. +void P2PSocket::OnPortDestroyed(Port* port) { + assert(worker_thread_ == Thread::Current()); + + // Remove this port from the list (if we didn't drop it already). + std::vector<Port*>::iterator iter = find(ports_.begin(), ports_.end(), port); + if (iter != ports_.end()) + ports_.erase(iter); + + LOG(INFO) << "Removed port from p2p socket: " + << static_cast<int>(ports_.size()) << " remaining"; +} + +// We data is available, let listeners know +void P2PSocket::OnReadPacket(Connection *connection, + const char *data, size_t len) { + assert(worker_thread_ == Thread::Current()); + + // Let the client know of an incoming packet + + SignalReadPacket(this, data, len); +} + +// return socket name +const std::string &P2PSocket::name() const { + return name_; +} + +// return socket error value +int P2PSocket::GetError() { + return error_; +} + +// return a reference to the list of connections +const std::vector<Connection *>& P2PSocket::connections() { + return connections_; +} + +// Set options on ourselves is simply setting options on all of our available +// port objects. +int P2PSocket::SetOption(Socket::Option opt, int value) { + OptionMap::iterator it = options_.find(opt); + if (it == options_.end()) { + options_.insert(std::make_pair(opt, value)); + } else if (it->second == value) { + return 0; + } else { + it->second = value; + } + + for (uint32 i = 0; i < ports_.size(); ++i) { + int val = ports_[i]->SetOption(opt, value); + if (val < 0) { + // Because this also occurs deferred, probably no point in reporting an error + LOG(WARNING) << "SetOption(" << opt << ", " << value << ") failed: " << ports_[i]->GetError(); + } + } + return 0; +} + +// returns the current state +P2PSocket::State P2PSocket::state() { + return state_; +} + +// Set the current state, and let listeners know when it changes +void P2PSocket::set_state(P2PSocket::State state) { + assert(worker_thread_ == Thread::Current()); + if (state != state_) { + state_ = state; + SignalState(this, state); + } +} + +// Time for a new allocator, lets make sure we have a signalling channel +// to communicate candidates through first. +void P2PSocket::OnAllocate() { + // Allocation timer went off + waiting_for_signaling_ = true; + SignalRequestSignaling(); +} + +// When the signalling channel is ready, we can really kick off the allocator +void P2PSocket::OnSignalingReady() { + if (waiting_for_signaling_) { + waiting_for_signaling_ = false; + AddAllocatorSession(allocator_->CreateSession(name_)); + thread()->PostDelayed(kAllocatePeriod, this, MSG_ALLOCATE); + } +} + +// return the current best connection writable state. +bool P2PSocket::writable() { + assert(worker_thread_ == Thread::Current()); + + if (best_connection_ == NULL) + return false; + return best_connection_->write_state() == Connection::STATE_WRITABLE; +} + +// return the worker thread +Thread *P2PSocket::thread() { + return worker_thread_; +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/p2psocket.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/p2psocket.h new file mode 100644 index 00000000..32eb1f6d --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/p2psocket.h @@ -0,0 +1,164 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// P2PSocket wraps up the state management of the connection between two +// P2P clients. Clients have candidate ports for connecting, and connections +// which are combinations of candidates from each end (Alice and Bob each +// have candidates, one candidate from Alice and one candidate from Bob are +// used to make a connection, repeat to make many connections). +// +// When all of the available connections become invalid (non-writable), we +// kick off a process of determining more candidates and more connections. +// +#ifndef _CRICKET_P2P_BASE_P2PSOCKET_H_ +#define _CRICKET_P2P_BASE_P2PSOCKET_H_ + +#include <vector> +#include <string> +#include "talk/base/sigslot.h" +#include "talk/p2p/base/candidate.h" +#include "talk/p2p/base/port.h" +#include "talk/p2p/base/portallocator.h" + +namespace cricket { + +// Adds the port on which the candidate originated. +class RemoteCandidate: public Candidate { + public: + RemoteCandidate(const Candidate& c, Port* origin_port) + : Candidate(c), origin_port_(origin_port) {} + + Port* origin_port() { return origin_port_; } + + private: + Port* origin_port_; +}; + +// P2PSocket manages the candidates and connection process to keep two P2P +// clients connected to each other. +class P2PSocket : public MessageHandler, public sigslot::has_slots<> { + public: + enum State { + STATE_CONNECTING = 0, // establishing writability + STATE_WRITABLE, // connected - ready for writing + }; + + P2PSocket(const std::string &name, PortAllocator *allocator); + virtual ~P2PSocket(); + + // Typically SocketManager calls these + + const std::string &name() const; + void StartProcessingCandidates(); + void AddRemoteCandidates(const std::vector<Candidate> &remote_candidates); + void OnSignalingReady(); + void Reset(); + + // Typically the Session Client calls these + + int Send(const char *data, size_t len); + int SetOption(Socket::Option opt, int value); + int GetError(); + + State state(); + bool writable(); + const std::vector<Connection *>& connections(); + Connection* best_connection() { return best_connection_; } + Thread *thread(); + + sigslot::signal2<P2PSocket *, State> SignalState; + sigslot::signal0<> SignalRequestSignaling; + sigslot::signal3<P2PSocket *, const char *, size_t> SignalReadPacket; + sigslot::signal2<P2PSocket *, const SocketAddress &> SignalConnectionChanged; + sigslot::signal2<P2PSocket *, const std::vector<Candidate>&> + SignalCandidatesReady; + sigslot::signal1<P2PSocket *> SignalConnectionMonitor; + + // Handler for internal messages. + virtual void OnMessage(Message *pmsg); + + private: + void set_state(State state); + void UpdateConnectionStates(); + void RequestSort(); + void SortConnections(); + void SwitchBestConnectionTo(Connection* conn); + void HandleWritable(); + void HandleNotWritable(); + void HandleAllTimedOut(); + Connection* GetBestConnectionOnNetwork(Network* network); + bool CreateConnections(const Candidate &remote_candidate, Port* origin_port, + bool readable); + bool CreateConnection(Port* port, const Candidate& remote_candidate, + Port* origin_port, bool readable); + void RememberRemoteCandidate(const Candidate& remote_candidate, + Port* origin_port); + void OnUnknownAddress(Port *port, const SocketAddress &addr, + StunMessage *stun_msg, + const std::string &remote_username); + void OnPortReady(PortAllocatorSession *session, Port* port); + void OnCandidatesReady(PortAllocatorSession *session, + const std::vector<Candidate>& candidates); + void OnConnectionStateChange(Connection *connection); + void OnConnectionDestroyed(Connection *connection); + void OnPortDestroyed(Port* port); + void OnAllocate(); + void OnReadPacket(Connection *connection, const char *data, size_t len); + void OnSort(); + void OnPing(); + bool IsPingable(Connection* conn); + Connection* FindNextPingableConnection(); + uint32 NumPingableConnections(); + PortAllocatorSession* allocator_session() { + return allocator_sessions_.back(); + } + void AddAllocatorSession(PortAllocatorSession* session); + + Thread *worker_thread_; + State state_; + bool waiting_for_signaling_; + int error_; + std::string name_; + PortAllocator *allocator_; + std::vector<PortAllocatorSession*> allocator_sessions_; + std::vector<Port *> ports_; + std::vector<Connection *> connections_; + Connection *best_connection_; + std::vector<RemoteCandidate> remote_candidates_; + bool pinging_started_; // indicates whether StartGetAllCandidates has been called + bool sort_dirty_; // indicates whether another sort is needed right now + bool was_writable_; + bool was_timed_out_; + typedef std::map<Socket::Option, int> OptionMap; + OptionMap options_; + + DISALLOW_EVIL_CONSTRUCTORS(P2PSocket); +}; + +} // namespace cricket + +#endif // _CRICKET_P2P_BASE_P2PSOCKET_H_ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/port.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/port.cc new file mode 100644 index 00000000..14549b5b --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/port.cc @@ -0,0 +1,869 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif +#include "talk/base/logging.h" +#include "talk/base/asyncudpsocket.h" +#include "talk/base/asynctcpsocket.h" +#include "talk/base/socketadapters.h" +#include "talk/p2p/base/port.h" +#include "talk/p2p/base/helpers.h" +#include "talk/base/scoped_ptr.h" +#include <errno.h> +#include <algorithm> +#include <iostream> +#include <cassert> +#include <vector> + +#if defined(_MSC_VER) && _MSC_VER < 1300 +namespace std { + using ::memcmp; +} +#endif + +namespace { + +// The length of time we wait before timing out readability on a connection. +const uint32 CONNECTION_READ_TIMEOUT = 30 * 1000; // 30 seconds + +// The length of time we wait before timing out writability on a connection. +const uint32 CONNECTION_WRITE_TIMEOUT = 15 * 1000; // 15 seconds + +// The length of time we wait before we become unwritable. +const uint32 CONNECTION_WRITE_CONNECT_TIMEOUT = 5 * 1000; // 5 seconds + +// The number of pings that must fail to respond before we become unwritable. +const uint32 CONNECTION_WRITE_CONNECT_FAILURES = 5; + +// This is the length of time that we wait for a ping response to come back. +const int CONNECTION_RESPONSE_TIMEOUT = 5 * 1000; // 5 seconds + +// Determines whether we have seen at least the given maximum number of +// pings fail to have a response. +inline bool TooManyFailures( + const std::vector<uint32>& pings_since_last_response, + uint32 maximum_failures, + uint32 rtt_estimate, + uint32 now) { + + // If we haven't sent that many pings, then we can't have failed that many. + if (pings_since_last_response.size() < maximum_failures) + return false; + + // Check if the window in which we would expect a response to the ping has + // already elapsed. + return pings_since_last_response[maximum_failures - 1] + rtt_estimate < now; +} + +// Determines whether we have gone too long without seeing any response. +inline bool TooLongWithoutResponse( + const std::vector<uint32>& pings_since_last_response, + uint32 maximum_time, + uint32 now) { + + if (pings_since_last_response.size() == 0) + return false; + + return pings_since_last_response[0] + maximum_time < now; +} + +// We will restrict RTT estimates (when used for determining state) to be +// within a reasonable range. +const uint32 MINIMUM_RTT = 100; // 0.1 seconds +const uint32 MAXIMUM_RTT = 3000; // 3 seconds + +// When we don't have any RTT data, we have to pick something reasonable. We +// use a large value just in case the connection is really slow. +const uint32 DEFAULT_RTT = MAXIMUM_RTT; + +// Computes our estimate of the RTT given the current estimate and the number +// of data points on which it is based. +inline uint32 ConservativeRTTEstimate(uint32 rtt, uint32 rtt_data_points) { + if (rtt_data_points == 0) + return DEFAULT_RTT; + else + return cricket::_max(MINIMUM_RTT, cricket::_min(MAXIMUM_RTT, 2 * rtt)); +} + +// Weighting of the old rtt value to new data. +const int RTT_RATIO = 3; // 3 : 1 + +// The delay before we begin checking if this port is useless. +const int kPortTimeoutDelay = 30 * 1000; // 30 seconds + +const uint32 MSG_CHECKTIMEOUT = 1; +const uint32 MSG_DELETE = 1; + +} + +namespace cricket { + +static const char * const PROTO_NAMES[PROTO_LAST+1] = { "udp", "tcp", "ssltcp" }; + +const char * ProtoToString(ProtocolType proto) { + return PROTO_NAMES[proto]; +} + +bool StringToProto(const char * value, ProtocolType& proto) { + for (size_t i=0; i<=PROTO_LAST; ++i) { + if (strcmp(PROTO_NAMES[i], value) == 0) { + proto = static_cast<ProtocolType>(i); + return true; + } + } + return false; +} + +ProxyInfo Port::proxy_; + +Port::Port(Thread* thread, const std::string& type, SocketFactory* factory, + Network* network) + : thread_(thread), factory_(factory), type_(type), network_(network), + preference_(-1), lifetime_(LT_PRESTART) { + + if (factory_ == NULL) + factory_ = thread_->socketserver(); + + set_username_fragment(CreateRandomString(16)); + set_password(CreateRandomString(16)); +} + +Port::~Port() { + // Delete all of the remaining connections. We copy the list up front + // because each deletion will cause it to be modified. + + std::vector<Connection*> list; + + AddressMap::iterator iter = connections_.begin(); + while (iter != connections_.end()) { + list.push_back(iter->second); + ++iter; + } + + for (uint32 i = 0; i < list.size(); i++) + delete list[i]; +} + +Connection* Port::GetConnection(const SocketAddress& remote_addr) { + AddressMap::const_iterator iter = connections_.find(remote_addr); + if (iter != connections_.end()) + return iter->second; + else + return NULL; +} + +void Port::set_username_fragment(const std::string& username_fragment) { + username_frag_ = username_fragment; +} + +void Port::set_password(const std::string& password) { + password_ = password; +} + +void Port::add_address(const SocketAddress& address, const std::string& protocol, bool final) { + Candidate c; + c.set_name(name_); + c.set_type(type_); + c.set_protocol(protocol); + c.set_address(address); + c.set_preference(preference_); + c.set_username(username_frag_); + c.set_password(password_); + c.set_network_name(network_->name()); + c.set_generation(generation_); + candidates_.push_back(c); + + if (final) + SignalAddressReady(this); +} + +void Port::AddConnection(Connection* conn) { + connections_[conn->remote_candidate().address()] = conn; + conn->SignalDestroyed.connect(this, &Port::OnConnectionDestroyed); + SignalConnectionCreated(this, conn); +} + +void Port::OnReadPacket( + const char* data, size_t size, const SocketAddress& addr) { + + // If this is an authenticated STUN request, then signal unknown address and + // send back a proper binding response. + StunMessage* msg; + std::string remote_username; + if (!GetStunMessage(data, size, addr, msg, remote_username)) { + LOG(LERROR) << "Received non-STUN packet from unknown address: " + << addr.ToString(); + } else if (!msg) { + // STUN message handled already + } else if (msg->type() == STUN_BINDING_REQUEST) { + SignalUnknownAddress(this, addr, msg, remote_username); + } else { + LOG(LERROR) << "Received unexpected STUN message type (" << msg->type() + << ") from unknown address: " << addr.ToString(); + delete msg; + } +} + +void Port::SendBindingRequest(Connection* conn) { + + // Construct the request message. + + StunMessage request; + request.SetType(STUN_BINDING_REQUEST); + request.SetTransactionID(CreateRandomString(16)); + + StunByteStringAttribute* username_attr = + StunAttribute::CreateByteString(STUN_ATTR_USERNAME); + std::string username = conn->remote_candidate().username(); + username.append(username_frag_); + username_attr->CopyBytes(username.c_str(), (uint16)username.size()); + request.AddAttribute(username_attr); + + // Send the request message. + // NOTE: If we wanted to, this is where we would add the HMAC. + ByteBuffer buf; + request.Write(&buf); + SendTo(buf.Data(), buf.Length(), conn->remote_candidate().address(), false); +} + +bool Port::GetStunMessage(const char* data, size_t size, + const SocketAddress& addr, StunMessage *& msg, + std::string& remote_username) { + // NOTE: This could clearly be optimized to avoid allocating any memory. + // However, at the data rates we'll be looking at on the client side, + // this probably isn't worth worrying about. + + msg = 0; + + // Parse the request message. If the packet is not a complete and correct + // STUN message, then ignore it. + buzz::scoped_ptr<StunMessage> stun_msg(new StunMessage()); + ByteBuffer buf(data, size); + if (!stun_msg->Read(&buf) || (buf.Length() > 0)) { + return false; + } + + // The packet must include a username that either begins or ends with our + // fragment. It should begin with our fragment if it is a request and it + // should end with our fragment if it is a response. + const StunByteStringAttribute* username_attr = + stun_msg->GetByteString(STUN_ATTR_USERNAME); + + int remote_frag_len = (username_attr ? username_attr->length() : 0); + remote_frag_len -= static_cast<int>(username_frag_.size()); + + if (stun_msg->type() == STUN_BINDING_REQUEST) { + if ((remote_frag_len < 0) + || (std::memcmp(username_attr->bytes(), + username_frag_.c_str(), username_frag_.size()) != 0)) { + LOG(LERROR) << "Received STUN request with bad username"; + SendBindingErrorResponse(stun_msg.get(), addr, STUN_ERROR_BAD_REQUEST, + STUN_ERROR_REASON_BAD_REQUEST); + return true; + } + + remote_username.assign(username_attr->bytes() + username_frag_.size(), + username_attr->bytes() + username_attr->length()); + } else if ((stun_msg->type() == STUN_BINDING_RESPONSE) + || (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE)) { + if ((remote_frag_len < 0) + || (std::memcmp(username_attr->bytes() + remote_frag_len, + username_frag_.c_str(), username_frag_.size()) != 0)) { + LOG(LERROR) << "Received STUN response with bad username"; + // Do not send error response to a response + return true; + } + + remote_username.assign(username_attr->bytes(), + username_attr->bytes() + remote_frag_len); + + if (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE) { + if (const StunErrorCodeAttribute* error_code = stun_msg->GetErrorCode()) { + LOG(LERROR) << "Received STUN binding error:" + << " class=" << error_code->error_class() + << " number=" << error_code->number() + << " reason='" << error_code->reason() << "'"; + // Return message to allow error-specific processing + } else { + LOG(LERROR) << "Received STUN error response with no error code"; + // Drop corrupt message + return true; + } + } + } else { + LOG(LERROR) << "Received STUN packet with invalid type: " + << stun_msg->type(); + return true; + } + + // Return the STUN message found. + msg = stun_msg.release(); + return true; +} + +void Port::SendBindingResponse( + StunMessage* request, const SocketAddress& addr) { + + assert(request->type() == STUN_BINDING_REQUEST); + + // Retrieve the username from the request. + const StunByteStringAttribute* username_attr = + request->GetByteString(STUN_ATTR_USERNAME); + assert(username_attr); + + // Fill in the response message. + + StunMessage response; + response.SetType(STUN_BINDING_RESPONSE); + response.SetTransactionID(request->transaction_id()); + + StunByteStringAttribute* username2_attr = + StunAttribute::CreateByteString(STUN_ATTR_USERNAME); + username2_attr->CopyBytes(username_attr->bytes(), username_attr->length()); + response.AddAttribute(username2_attr); + + StunAddressAttribute* addr_attr = + StunAttribute::CreateAddress(STUN_ATTR_MAPPED_ADDRESS); + addr_attr->SetFamily(1); + addr_attr->SetPort(addr.port()); + addr_attr->SetIP(addr.ip()); + response.AddAttribute(addr_attr); + + // Send the response message. + // NOTE: If we wanted to, this is where we would add the HMAC. + ByteBuffer buf; + response.Write(&buf); + SendTo(buf.Data(), buf.Length(), addr, false); + + // The fact that we received a successful request means that this connection + // (if one exists) should now be readable. + Connection* conn = GetConnection(addr); + assert(conn); + if (conn) + conn->ReceivedPing(); +} + +void Port::SendBindingErrorResponse( + StunMessage* request, const SocketAddress& addr, int error_code, + const std::string& reason) { + + assert(request->type() == STUN_BINDING_REQUEST); + + // Retrieve the username from the request. If it didn't have one, we + // shouldn't be responding at all. + const StunByteStringAttribute* username_attr = + request->GetByteString(STUN_ATTR_USERNAME); + assert(username_attr); + + // Fill in the response message. + + StunMessage response; + response.SetType(STUN_BINDING_ERROR_RESPONSE); + response.SetTransactionID(request->transaction_id()); + + StunByteStringAttribute* username2_attr = + StunAttribute::CreateByteString(STUN_ATTR_USERNAME); + username2_attr->CopyBytes(username_attr->bytes(), username_attr->length()); + response.AddAttribute(username2_attr); + + StunErrorCodeAttribute* error_attr = StunAttribute::CreateErrorCode(); + error_attr->SetErrorCode(error_code); + error_attr->SetReason(reason); + response.AddAttribute(error_attr); + + // Send the response message. + // NOTE: If we wanted to, this is where we would add the HMAC. + ByteBuffer buf; + response.Write(&buf); + SendTo(buf.Data(), buf.Length(), addr, false); +} + +AsyncPacketSocket * Port::CreatePacketSocket(ProtocolType proto) { + if (proto == PROTO_UDP) { + return new AsyncUDPSocket(factory_->CreateAsyncSocket(SOCK_DGRAM)); + } else if ((proto == PROTO_TCP) || (proto == PROTO_SSLTCP)) { + AsyncSocket * socket = factory_->CreateAsyncSocket(SOCK_STREAM); + switch (proxy().type) { + case PROXY_NONE: + break; + case PROXY_SOCKS5: + socket = new AsyncSocksProxySocket(socket, proxy().address, proxy().username, proxy().password); + break; + case PROXY_HTTPS: + default: + socket = new AsyncHttpsProxySocket(socket, proxy().address, proxy().username, proxy().password); + break; + } + if (proto == PROTO_SSLTCP) { + socket = new AsyncSSLSocket(socket); + } + return new AsyncTCPSocket(socket); + } else { + LOG(INFO) << "Unknown protocol: " << proto; + return 0; + } +} + +void Port::OnMessage(Message *pmsg) { + assert(pmsg->message_id == MSG_CHECKTIMEOUT); + assert(lifetime_ == LT_PRETIMEOUT); + lifetime_ = LT_POSTTIMEOUT; + CheckTimeout(); +} + +void Port::Start() { + // The port sticks around for a minimum lifetime, after which + // we destroy it when it drops to zero connections. + if (lifetime_ == LT_PRESTART) { + lifetime_ = LT_PRETIMEOUT; + thread_->PostDelayed(kPortTimeoutDelay, this, MSG_CHECKTIMEOUT); + } else { + LOG(WARNING) << "Port restart attempted"; + } +} + +void Port::OnConnectionDestroyed(Connection* conn) { + AddressMap::iterator iter = connections_.find(conn->remote_candidate().address()); + assert(iter != connections_.end()); + connections_.erase(iter); + + CheckTimeout(); +} + +void Port::CheckTimeout() { + // If this port has no connections, then there's no reason to keep it around. + // When the connections time out (both read and write), they will delete + // themselves, so if we have any connections, they are either readable or + // writable (or still connecting). + if ((lifetime_ == LT_POSTTIMEOUT) && connections_.empty()) { + LOG(INFO) << "Destroying port: " << name_ << "-" << type_; + SignalDestroyed(this); + delete this; + } +} + +// A ConnectionRequest is a simple STUN ping used to determine writability. +class ConnectionRequest : public StunRequest { +public: + ConnectionRequest(Connection* connection) : connection_(connection) { + } + + virtual ~ConnectionRequest() { + } + + virtual void Prepare(StunMessage* request) { + request->SetType(STUN_BINDING_REQUEST); + StunByteStringAttribute* username_attr = + StunAttribute::CreateByteString(STUN_ATTR_USERNAME); + std::string username = connection_->remote_candidate().username(); + username.append(connection_->port()->username_fragment()); + username_attr->CopyBytes(username.c_str(), (uint16)username.size()); + request->AddAttribute(username_attr); + } + + virtual void OnResponse(StunMessage* response) { + connection_->OnConnectionRequestResponse(response, Elapsed()); + } + + virtual void OnErrorResponse(StunMessage* response) { + connection_->OnConnectionRequestErrorResponse(response, Elapsed()); + } + + virtual void OnTimeout() { + } + + virtual int GetNextDelay() { + // Each request is sent only once. After a single delay , the request will + // time out. + timeout_ = true; + return CONNECTION_RESPONSE_TIMEOUT; + } + +private: + Connection* connection_; +}; + +// +// Connection +// + +Connection::Connection(Port* port, size_t index, const Candidate& remote_candidate) + : requests_(port->thread()), port_(port), local_candidate_index_(index), + remote_candidate_(remote_candidate), read_state_(STATE_READ_TIMEOUT), + write_state_(STATE_WRITE_CONNECT), connected_(true), pruned_(false), + rtt_(0), rtt_data_points_(0), last_ping_sent_(0), last_ping_received_(0), + recv_total_bytes_(0), recv_bytes_second_(0), + last_recv_bytes_second_time_((uint32)-1), last_recv_bytes_second_calc_(0), + sent_total_bytes_(0), sent_bytes_second_(0), + last_sent_bytes_second_time_((uint32)-1), last_sent_bytes_second_calc_(0) { + + // Wire up to send stun packets + requests_.SignalSendPacket.connect(this, &Connection::OnSendStunPacket); +} + +Connection::~Connection() { +} + +const Candidate& Connection::local_candidate() const { + if (local_candidate_index_ < port_->candidates().size()) + return port_->candidates()[local_candidate_index_]; + assert(false); + static Candidate foo; + return foo; +} + +void Connection::set_read_state(ReadState value) { + ReadState old_value = read_state_; + read_state_ = value; + if (value != old_value) { + SignalStateChange(this); + CheckTimeout(); + } +} + +void Connection::set_write_state(WriteState value) { + WriteState old_value = write_state_; + write_state_ = value; + if (value != old_value) { + SignalStateChange(this); + CheckTimeout(); + } +} + +void Connection::set_connected(bool value) { + bool old_value = connected_; + connected_ = value; + + // When connectedness is turned off, this connection is done. + if (old_value && !value) + set_write_state(STATE_WRITE_TIMEOUT); +} + +void Connection::OnSendStunPacket(const void* data, size_t size) { + port_->SendTo(data, size, remote_candidate_.address(), false); +} + +void Connection::OnReadPacket(const char* data, size_t size) { + StunMessage* msg; + std::string remote_username; + const SocketAddress& addr(remote_candidate_.address()); + if (!port_->GetStunMessage(data, size, addr, msg, remote_username)) { + // The packet did not parse as a valid STUN message + + // If this connection is readable, then pass along the packet. + if (read_state_ == STATE_READABLE) { + // readable means data from this address is acceptable + // Send it on! + + recv_total_bytes_ += size; + SignalReadPacket(this, data, size); + + // If timed out sending writability checks, start up again + if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) + set_write_state(STATE_WRITE_CONNECT); + } else { + // Not readable means the remote address hasn't send a valid + // binding request yet. + + LOG(WARNING) << "Received non-STUN packet from an unreadable connection."; + } + } else if (!msg) { + // The packet was STUN, but was already handled + } else if (remote_username != remote_candidate_.username()) { + // Not destined this connection + LOG(LERROR) << "Received STUN packet on wrong address."; + if (msg->type() == STUN_BINDING_REQUEST) { + port_->SendBindingErrorResponse(msg, addr, STUN_ERROR_BAD_REQUEST, + STUN_ERROR_REASON_BAD_REQUEST); + } + delete msg; + } else { + // The packet is STUN, with the current username + // If this is a STUN request, then update the readable bit and respond. + // If this is a STUN response, then update the writable bit. + + switch (msg->type()) { + case STUN_BINDING_REQUEST: + // Incoming, validated stun request from remote peer. + // This call will also set the connection readable. + + port_->SendBindingResponse(msg, addr); + + // If timed out sending writability checks, start up again + if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) + set_write_state(STATE_WRITE_CONNECT); + break; + + case STUN_BINDING_RESPONSE: + case STUN_BINDING_ERROR_RESPONSE: + // Response from remote peer. Does it match request sent? + // This doesn't just check, it makes callbacks if transaction + // id's match + requests_.CheckResponse(msg); + break; + + default: + assert(false); + break; + } + + // Done with the message; delete + + delete msg; + } +} + +void Connection::Prune() { + pruned_ = true; + requests_.Clear(); + set_write_state(STATE_WRITE_TIMEOUT); +} + +void Connection::Destroy() { + set_read_state(STATE_READ_TIMEOUT); + set_write_state(STATE_WRITE_TIMEOUT); +} + +void Connection::UpdateState(uint32 now) { + // Check the readable state. + // + // Since we don't know how many pings the other side has attempted, the best + // test we can do is a simple window. + + if ((read_state_ == STATE_READABLE) && + (last_ping_received_ + CONNECTION_READ_TIMEOUT <= now)) { + set_read_state(STATE_READ_TIMEOUT); + } + + // Check the writable state. (The order of these checks is important.) + // + // Before becoming unwritable, we allow for a fixed number of pings to fail + // (i.e., receive no response). We also have to give the response time to + // get back, so we include a conservative estimate of this. + // + // Before timing out writability, we give a fixed amount of time. This is to + // allow for changes in network conditions. + + uint32 rtt = ConservativeRTTEstimate(rtt_, rtt_data_points_); + + if ((write_state_ == STATE_WRITABLE) && + TooManyFailures(pings_since_last_response_, + CONNECTION_WRITE_CONNECT_FAILURES, + rtt, + now) && + TooLongWithoutResponse(pings_since_last_response_, + CONNECTION_WRITE_CONNECT_TIMEOUT, + now)) { + set_write_state(STATE_WRITE_CONNECT); + } + + if ((write_state_ == STATE_WRITE_CONNECT) && + TooLongWithoutResponse(pings_since_last_response_, + CONNECTION_WRITE_TIMEOUT, + now)) { + set_write_state(STATE_WRITE_TIMEOUT); + } +} + +void Connection::Ping(uint32 now) { + assert(connected_); + last_ping_sent_ = now; + pings_since_last_response_.push_back(now); + requests_.Send(new ConnectionRequest(this)); +} + +void Connection::ReceivedPing() { + last_ping_received_ = Time(); + set_read_state(STATE_READABLE); +} + +void Connection::OnConnectionRequestResponse(StunMessage *response, uint32 rtt) { + // We have a potentially valid reply from the remote address. + // The packet must include a username that ends with our fragment, + // since it is a response. + + // Check exact message type + bool valid = true; + if (response->type() != STUN_BINDING_RESPONSE) + valid = false; + + // Must have username attribute + const StunByteStringAttribute* username_attr = + response->GetByteString(STUN_ATTR_USERNAME); + if (valid) { + if (!username_attr) { + LOG(LERROR) << "Received likely STUN packet with no username"; + valid = false; + } + } + + // Length must be at least the size of our fragment (actually, should + // be bigger since our fragment is at the end!) + if (valid) { + if (username_attr->length() <= port_->username_fragment().size()) { + LOG(LERROR) << "Received likely STUN packet with short username"; + valid = false; + } + } + + // Compare our fragment with the end of the username - must be exact match + if (valid) { + std::string username_fragment = port_->username_fragment(); + int offset = (int)(username_attr->length() - username_fragment.size()); + if (std::memcmp(username_attr->bytes() + offset, + username_fragment.c_str(), username_fragment.size()) != 0) { + LOG(LERROR) << "Received STUN response with bad username"; + valid = false; + } + } + + if (valid) { + // Valid response. If we're not already, become writable. We may be + // bringing a pruned connection back to life, but if we don't really want + // it, we can always prune it again. + set_write_state(STATE_WRITABLE); + + pings_since_last_response_.clear(); + rtt_ = (RTT_RATIO * rtt_ + rtt) / (RTT_RATIO + 1); + rtt_data_points_ += 1; + } +} + +void Connection::OnConnectionRequestErrorResponse(StunMessage *response, uint32 rtt) { + const StunErrorCodeAttribute* error = response->GetErrorCode(); + uint32 error_code = error ? error->error_code() : STUN_ERROR_GLOBAL_FAILURE; + + if ((error_code == STUN_ERROR_UNKNOWN_ATTRIBUTE) + || (error_code == STUN_ERROR_SERVER_ERROR) + || (error_code == STUN_ERROR_UNAUTHORIZED)) { + // Recoverable error, retry + } else if (error_code == STUN_ERROR_STALE_CREDENTIALS) { + // Race failure, retry + } else { + // This is not a valid connection. + set_connected(false); + } +} + +void Connection::CheckTimeout() { + // If both read and write have timed out, then this connection can contribute + // no more to p2p socket unless at some later date readability were to come + // back. However, we gave readability a long time to timeout, so at this + // point, it seems fair to get rid of this connectoin. + if ((read_state_ == STATE_READ_TIMEOUT) && + (write_state_ == STATE_WRITE_TIMEOUT)) { + port_->thread()->Post(this, MSG_DELETE); + } +} + +void Connection::OnMessage(Message *pmsg) { + assert(pmsg->message_id == MSG_DELETE); + + LOG(INFO) << "Destroying connection: from " + << local_candidate().address().ToString() + << " to " << remote_candidate_.address().ToString(); + + SignalDestroyed(this); + delete this; +} + +size_t Connection::recv_bytes_second() { + // Snapshot bytes / second calculator + + uint32 current_time = Time(); + if (last_recv_bytes_second_time_ != (uint32)-1) { + int delta = TimeDiff(current_time, last_recv_bytes_second_time_); + if (delta >= 1000) { + int fraction_time = delta % 1000; + int seconds_time = delta - fraction_time; + int fraction_bytes = (int)(recv_total_bytes_ - last_recv_bytes_second_calc_) * fraction_time / delta; + recv_bytes_second_ = (recv_total_bytes_ - last_recv_bytes_second_calc_ - fraction_bytes) * seconds_time / delta; + last_recv_bytes_second_time_ = current_time - fraction_time; + last_recv_bytes_second_calc_ = recv_total_bytes_ - fraction_bytes; + } + } + if (last_recv_bytes_second_time_ == (uint32)-1) { + last_recv_bytes_second_time_ = current_time; + last_recv_bytes_second_calc_ = recv_total_bytes_; + } + + return recv_bytes_second_; +} + +size_t Connection::recv_total_bytes() { + return recv_total_bytes_; +} + +size_t Connection::sent_bytes_second() { + // Snapshot bytes / second calculator + + uint32 current_time = Time(); + if (last_sent_bytes_second_time_ != (uint32)-1) { + int delta = TimeDiff(current_time, last_sent_bytes_second_time_); + if (delta >= 1000) { + int fraction_time = delta % 1000; + int seconds_time = delta - fraction_time; + int fraction_bytes = (int)(sent_total_bytes_ - last_sent_bytes_second_calc_) * fraction_time / delta; + sent_bytes_second_ = (sent_total_bytes_ - last_sent_bytes_second_calc_ - fraction_bytes) * seconds_time / delta; + last_sent_bytes_second_time_ = current_time - fraction_time; + last_sent_bytes_second_calc_ = sent_total_bytes_ - fraction_bytes; + } + } + if (last_sent_bytes_second_time_ == (uint32)-1) { + last_sent_bytes_second_time_ = current_time; + last_sent_bytes_second_calc_ = sent_total_bytes_; + } + + return sent_bytes_second_; +} + +size_t Connection::sent_total_bytes() { + return sent_total_bytes_; +} + +ProxyConnection::ProxyConnection(Port* port, size_t index, const Candidate& candidate) + : Connection(port, index, candidate), error_(0) { +} + +int ProxyConnection::Send(const void* data, size_t size) { + if (write_state() != STATE_WRITABLE) { + error_ = EWOULDBLOCK; + return SOCKET_ERROR; + } + int sent = port_->SendTo(data, size, remote_candidate_.address(), true); + if (sent <= 0) { + assert(sent < 0); + error_ = port_->GetError(); + } else { + sent_total_bytes_ += sent; + } + return sent; +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/port.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/port.h new file mode 100644 index 00000000..c22fad28 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/port.h @@ -0,0 +1,367 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __PORT_H__ +#define __PORT_H__ + +#include "talk/base/network.h" +#include "talk/base/socketaddress.h" +#include "talk/base/proxyinfo.h" +#include "talk/base/sigslot.h" +#include "talk/base/thread.h" +#include "talk/p2p/base/candidate.h" +#include "talk/p2p/base/stun.h" +#include "talk/p2p/base/stunrequest.h" + +#include <string> +#include <vector> +#include <map> + +namespace cricket { + +class Connection; +class AsyncPacketSocket; + +enum ProtocolType { PROTO_UDP, PROTO_TCP, PROTO_SSLTCP, PROTO_LAST = PROTO_SSLTCP }; +const char * ProtoToString(ProtocolType proto); +bool StringToProto(const char * value, ProtocolType& proto); + +struct ProtocolAddress { + SocketAddress address; + ProtocolType proto; + + ProtocolAddress(const SocketAddress& a, ProtocolType p) : address(a), proto(p) { } +}; + +// Represents a local communication mechanism that can be used to create +// connections to similar mechanisms of the other client. Subclasses of this +// one add support for specific mechanisms like local UDP ports. +class Port: public MessageHandler, public sigslot::has_slots<> { +public: + Port(Thread* thread, const std::string &type, SocketFactory* factory, + Network* network); + virtual ~Port(); + + // The thread on which this port performs its I/O. + Thread* thread() { return thread_; } + + // The factory used to create the sockets of this port. + SocketFactory* socket_factory() const { return factory_; } + void set_socket_factory(SocketFactory* factory) { factory_ = factory; } + + // Each port is identified by a name (for debugging purposes). + const std::string& name() const { return name_; } + void set_name(const std::string& name) { name_ = name; } + + // In order to establish a connection to this Port (so that real data can be + // sent through), the other side must send us a STUN binding request that is + // authenticated with this username and password. + const std::string& username_fragment() const { return username_frag_; } + const std::string& password() const { return password_; } + + // A value in [0,1] that indicates the preference for this port versus other + // ports on this client. (Larger indicates more preference.) + float preference() const { return preference_; } + void set_preference(float preference) { preference_ = preference; } + + // Identifies the port type. + //const std::string& protocol() const { return proto_; } + const std::string& type() const { return type_; } + + // Identifies network that this port was allocated on. + Network* network() { return network_; } + + // Identifies the generation that this port was created in. + uint32 generation() { return generation_; } + void set_generation(uint32 generation) { generation_ = generation; } + + // PrepareAddress will attempt to get an address for this port that other + // clients can send to. It may take some time before the address is read. + // Once it is ready, we will send SignalAddressReady. + virtual void PrepareAddress() = 0; + sigslot::signal1<Port*> SignalAddressReady; + //const SocketAddress& address() const { return address_; } + + // Provides all of the above information in one handy object. + const std::vector<Candidate>& candidates() const { return candidates_; } + + // Returns a map containing all of the connections of this port, keyed by the + // remote address. + typedef std::map<SocketAddress, Connection*> AddressMap; + const AddressMap& connections() { return connections_; } + + // Returns the connection to the given address or NULL if none exists. + Connection* GetConnection(const SocketAddress& remote_addr); + + // Creates a new connection to the given address. + enum CandidateOrigin { ORIGIN_THIS_PORT, ORIGIN_OTHER_PORT, ORIGIN_MESSAGE }; + virtual Connection* CreateConnection(const Candidate& remote_candidate, CandidateOrigin origin) = 0; + + // Called each time a connection is created. + sigslot::signal2<Port*, Connection*> SignalConnectionCreated; + + // Sends the given packet to the given address, provided that the address is + // that of a connection or an address that has sent to us already. + virtual int SendTo( + const void* data, size_t size, const SocketAddress& addr, bool payload) = 0; + + // Indicates that we received a successful STUN binding request from an + // address that doesn't correspond to any current connection. To turn this + // into a real connection, call CreateConnection. + sigslot::signal4<Port*, const SocketAddress&, StunMessage*, const std::string&> SignalUnknownAddress; + + // Sends a response message (normal or error) to the given request. One of + // these methods should be called as a response to SignalUnknownAddress. + // NOTE: You MUST call CreateConnection BEFORE SendBindingResponse. + void SendBindingResponse(StunMessage* request, const SocketAddress& addr); + void SendBindingErrorResponse( + StunMessage* request, const SocketAddress& addr, int error_code, + const std::string& reason); + + // Indicates that errors occurred when performing I/O. + sigslot::signal2<Port*, int> SignalReadError; + sigslot::signal2<Port*, int> SignalWriteError; + + // Functions on the underlying socket(s). + virtual int SetOption(Socket::Option opt, int value) = 0; + virtual int GetError() = 0; + + static void set_proxy(const ProxyInfo& proxy) { proxy_ = proxy; } + static const ProxyInfo& proxy() { return proxy_; } + + AsyncPacketSocket * CreatePacketSocket(ProtocolType proto); + + virtual void OnMessage(Message *pmsg); + + // Indicates to the port that its official use has now begun. This will + // start the timer that checks to see if the port is being used. + void Start(); + + // Signaled when this port decides to delete itself because it no longer has + // any usefulness. + sigslot::signal1<Port*> SignalDestroyed; + +protected: + Thread* thread_; + SocketFactory* factory_; + std::string type_; + Network* network_; + uint32 generation_; + std::string name_; + std::string username_frag_; + std::string password_; + float preference_; + std::vector<Candidate> candidates_; + AddressMap connections_; + enum Lifetime { LT_PRESTART, LT_PRETIMEOUT, LT_POSTTIMEOUT } lifetime_; + + // Fills in the username fragment and password. These will be initially set + // in the constructor to random values. Subclasses can override, though. + void set_username_fragment(const std::string& username_fragment); + void set_password(const std::string& password); + + // Fills in the local address of the port. + void add_address(const SocketAddress& address, const std::string& protocol, bool final = true); + + // Adds the given connection to the list. (Deleting removes them.) + void AddConnection(Connection* conn); + + // Called when a packet is received from an unknown address that is not + // currently a connection. If this is an authenticated STUN binding request, + // then we will signal the client. + void OnReadPacket(const char* data, size_t size, const SocketAddress& addr); + + // Constructs a STUN binding request for the given connection and sends it. + void SendBindingRequest(Connection* conn); + + // If the given data comprises a complete and correct STUN message then the + // return value is true, otherwise false. If the message username corresponds + // with this port's username fragment, msg will contain the parsed STUN + // message. Otherwise, the function may send a STUN response internally. + // remote_username contains the remote fragment of the STUN username. + bool GetStunMessage(const char* data, size_t size, const SocketAddress& addr, + StunMessage *& msg, std::string& remote_username); + + friend class Connection; + +private: + // Called when one of our connections deletes itself. + void OnConnectionDestroyed(Connection* conn); + + // Checks if this port is useless, and hence, should be destroyed. + void CheckTimeout(); + + static ProxyInfo proxy_; +}; + +// Represents a communication link between a port on the local client and a +// port on the remote client. +class Connection : public MessageHandler, public sigslot::has_slots<> { +public: + virtual ~Connection(); + + // The local port where this connection sends and receives packets. + Port* port() { return port_; } + const Port* port() const { return port_; } + + // Returns the description of the local port + virtual const Candidate& local_candidate() const; + + // Returns the description of the remote port to which we communicate. + const Candidate& remote_candidate() const { return remote_candidate_; } + + enum ReadState { + STATE_READABLE = 0, // we have received pings recently + STATE_READ_TIMEOUT = 1 // we haven't received pings in a while + }; + + ReadState read_state() const { return read_state_; } + + enum WriteState { + STATE_WRITABLE = 0, // we have received ping responses recently + STATE_WRITE_CONNECT = 1, // we have had a few ping failures + STATE_WRITE_TIMEOUT = 2 // we have had a large number of ping failures + }; + + WriteState write_state() const { return write_state_; } + + // Determines whether the connection has finished connecting. This can only + // be false for TCP connections. + bool connected() const { return connected_; } + + // Estimate of the round-trip time over this connection. + uint32 rtt() const { return rtt_; } + + size_t sent_total_bytes(); + size_t sent_bytes_second(); + size_t recv_total_bytes(); + size_t recv_bytes_second(); + sigslot::signal1<Connection*> SignalStateChange; + + // Sent when the connection has decided that it is no longer of value. It + // will delete itself immediately after this call. + sigslot::signal1<Connection*> SignalDestroyed; + + // The connection can send and receive packets asynchronously. This matches + // the interface of AsyncPacketSocket, which may use UDP or TCP under the covers. + virtual int Send(const void* data, size_t size) = 0; + + // Error if Send() returns < 0 + virtual int GetError() = 0; + + sigslot::signal3<Connection*, const char*, size_t> SignalReadPacket; + + // Called when a packet is received on this connection. + void OnReadPacket(const char* data, size_t size); + + // Called when a connection is determined to be no longer useful to us. We + // still keep it around in case the other side wants to use it. But we can + // safely stop pinging on it and we can allow it to time out if the other + // side stops using it as well. + bool pruned() { return pruned_; } + void Prune(); + + // Makes the connection go away. + void Destroy(); + + // Checks that the state of this connection is up-to-date. The argument is + // the current time, which is compared against various timeouts. + void UpdateState(uint32 now); + + // Called when this connection should try checking writability again. + uint32 last_ping_sent() { return last_ping_sent_; } + void Ping(uint32 now); + + // Called whenever a valid ping is received on this connection. This is + // public because the connection intercepts the first ping for us. + void ReceivedPing(); + +protected: + Port* port_; + size_t local_candidate_index_; + Candidate remote_candidate_; + ReadState read_state_; + WriteState write_state_; + bool connected_; + bool pruned_; + StunRequestManager requests_; + uint32 rtt_; + uint32 rtt_data_points_; + uint32 last_ping_sent_; // last time we sent a ping to the other side + uint32 last_ping_received_; // last time we received a ping from the other side + std::vector<uint32> pings_since_last_response_; + + size_t recv_total_bytes_; + size_t recv_bytes_second_; + uint32 last_recv_bytes_second_time_; + size_t last_recv_bytes_second_calc_; + + size_t sent_total_bytes_; + size_t sent_bytes_second_; + uint32 last_sent_bytes_second_time_; + size_t last_sent_bytes_second_calc_; + + // Callbacks from ConnectionRequest + void OnConnectionRequestResponse(StunMessage *response, uint32 rtt); + void OnConnectionRequestErrorResponse(StunMessage *response, uint32 rtt); + + // Called back when StunRequestManager has a stun packet to send + void OnSendStunPacket(const void* data, size_t size); + + // Constructs a new connection to the given remote port. + Connection(Port* port, size_t index, const Candidate& candidate); + + // Changes the state and signals if necessary. + void set_read_state(ReadState value); + void set_write_state(WriteState value); + void set_connected(bool value); + + // Checks if this connection is useless, and hence, should be destroyed. + void CheckTimeout(); + + void OnMessage(Message *pmsg); + + friend class Port; + friend class ConnectionRequest; +}; + +// ProxyConnection defers all the interesting work to the port + +class ProxyConnection : public Connection { +public: + ProxyConnection(Port* port, size_t index, const Candidate& candidate); + + virtual int Send(const void* data, size_t size); + virtual int GetError() { return error_; } + +private: + int error_; +}; + +} // namespace cricket + +#endif // __PORT_H__ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/portallocator.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/portallocator.h new file mode 100644 index 00000000..3246f29f --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/portallocator.h @@ -0,0 +1,91 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _PORTALLOCATOR_H_ +#define _PORTALLOCATOR_H_ + +#include "talk/base/sigslot.h" +#include "talk/p2p/base/port.h" +#include <string> +#undef SetPort + +namespace cricket { + +// PortAllocator is responsible for allocating Port types for a given +// P2PSocket. It also handles port freeing. +// +// Clients can override this class to control port allocation, including +// what kinds of ports are allocated. + +class PortAllocatorSession : public sigslot::has_slots<> { +public: + // Prepares an initial set of ports to try. + virtual void GetInitialPorts() = 0; + + // Starts and stops the flow of additional ports to try. + virtual void StartGetAllPorts() = 0; + virtual void StopGetAllPorts() = 0; + virtual bool IsGettingAllPorts() = 0; + + sigslot::signal2<PortAllocatorSession*, Port*> SignalPortReady; + sigslot::signal2<PortAllocatorSession*, const std::vector<Candidate>&> SignalCandidatesReady; + + uint32 generation() { return generation_; } + void set_generation(uint32 generation) { generation_ = generation; } + +private: + uint32 generation_; +}; + +const uint32 PORTALLOCATOR_DISABLE_UDP = 0x01; +const uint32 PORTALLOCATOR_DISABLE_STUN = 0x02; +const uint32 PORTALLOCATOR_DISABLE_RELAY = 0x04; +const uint32 PORTALLOCATOR_DISABLE_TCP = 0x08; +const uint32 PORTALLOCATOR_ENABLE_SHAKER = 0x10; + +const uint32 kDefaultPortAllocatorFlags = 0; + +class PortAllocator { +public: + PortAllocator() : flags_(kDefaultPortAllocatorFlags) {} + + virtual PortAllocatorSession *CreateSession(const std::string &name) = 0; + + uint32 flags() const { return flags_; } + void set_flags(uint32 flags) { flags_ = flags; } + + const ProxyInfo& proxy() const { return proxy_; } + void set_proxy(const ProxyInfo& proxy) { proxy_ = proxy; } + +protected: + uint32 flags_; + ProxyInfo proxy_; +}; + +} // namespace cricket + +#endif // _PORTALLOCATOR_H_ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayport.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayport.cc new file mode 100644 index 00000000..4ba12be3 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayport.cc @@ -0,0 +1,640 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif +#include "talk/base/logging.h" +#include "talk/base/asynctcpsocket.h" +#include "talk/p2p/base/relayport.h" +#include "talk/p2p/base/helpers.h" +#include <iostream> +#include <cassert> +#ifdef OSX +#include <errno.h> +#endif + +#if defined(_MSC_VER) && _MSC_VER < 1300 +namespace std { + using ::strerror; +} +#endif + +#ifdef POSIX +extern "C" { +#include <errno.h> +} +#endif // POSIX + +namespace cricket { + +const int KEEPALIVE_DELAY = 10 * 60 * 1000; +const int RETRY_DELAY = 50; // 50ms, from ICE spec +const uint32 RETRY_TIMEOUT = 50 * 1000; // ICE says 50 secs + +const uint32 MSG_DISPOSE_SOCKET = 100; // needs to be more than ID used by Port +typedef TypedMessageData<AsyncPacketSocket *> DisposeSocketData; + +class AsyncTCPSocket; + +// Manages a single connection to the relayserver. We aim to use each +// connection for only a specific destination address so that we can avoid +// wrapping every packet in a STUN send / data indication. +class RelayEntry : public sigslot::has_slots<> { +public: + RelayEntry(RelayPort* port, const SocketAddress& ext_addr, const SocketAddress& local_addr); + ~RelayEntry(); + + RelayPort* port() { return port_; } + + const SocketAddress& address() { return ext_addr_; } + void set_address(const SocketAddress& addr) { ext_addr_ = addr; } + + AsyncPacketSocket* socket() { return socket_; } + + bool connected() { return connected_; } + void set_connected(bool connected) { connected_ = connected; } + + bool locked() { return locked_; } + + // Returns the last error on the socket of this entry. + int GetError() { return socket_->GetError(); } + + // Sends the STUN requests to the server to initiate this connection. + void Connect(); + + // Called when this entry becomes connected. The address given is the one + // exposed to the outside world on the relay server. + void OnConnect(const SocketAddress& mapped_addr); + + // Sends a packet to the given destination address using the socket of this + // entry. This will wrap the packet in STUN if necessary. + int SendTo(const void* data, size_t size, const SocketAddress& addr); + + // Schedules a keep-alive allocate request. + void ScheduleKeepAlive(); + + void SetServerIndex(size_t sindex) { server_index_ = sindex; } + size_t ServerIndex() const { return server_index_; } + + // Try a different server address + void HandleConnectFailure(); + +private: + RelayPort* port_; + SocketAddress ext_addr_, local_addr_; + size_t server_index_; + AsyncPacketSocket* socket_; + bool connected_; + bool locked_; + StunRequestManager requests_; + + // Called when a TCP connection is established or fails + void OnSocketConnect(AsyncTCPSocket* socket); + void OnSocketClose(AsyncTCPSocket* socket, int error); + + // Called when a packet is received on this socket. + void OnReadPacket( + const char* data, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket); + + // Called on behalf of a StunRequest to write data to the socket. This is + // already STUN intended for the server, so no wrapping is necessary. + void OnSendPacket(const void* data, size_t size); + + // Sends the given data on the socket to the server with no wrapping. This + // returns the number of bytes written or -1 if an error occurred. + int SendPacket(const void* data, size_t size); +}; + +// Handles an allocate request for a particular RelayEntry. +class AllocateRequest : public StunRequest { +public: + AllocateRequest(RelayEntry* entry); + virtual ~AllocateRequest() {} + + virtual void Prepare(StunMessage* request); + + virtual int GetNextDelay(); + + virtual void OnResponse(StunMessage* response); + virtual void OnErrorResponse(StunMessage* response); + virtual void OnTimeout(); + +private: + RelayEntry* entry_; + uint32 start_time_; +}; + +const std::string RELAY_PORT_TYPE("relay"); + +RelayPort::RelayPort( + Thread* thread, SocketFactory* factory, Network* network, + const SocketAddress& local_addr, const std::string& username, + const std::string& password, const std::string& magic_cookie) + : Port(thread, RELAY_PORT_TYPE, factory, network), local_addr_(local_addr), + ready_(false), magic_cookie_(magic_cookie), error_(0) { + + entries_.push_back(new RelayEntry(this, SocketAddress(), local_addr_)); + + set_username_fragment(username); + set_password(password); + + if (magic_cookie_.size() == 0) + magic_cookie_.append(STUN_MAGIC_COOKIE_VALUE, 4); +} + +RelayPort::~RelayPort() { + for (unsigned i = 0; i < entries_.size(); ++i) + delete entries_[i]; + thread_->Clear(this); +} + +void RelayPort::AddServerAddress(const ProtocolAddress& addr) { + // Since HTTP proxies usually only allow 443, let's up the priority on PROTO_SSLTCP + if ((addr.proto == PROTO_SSLTCP) + && ((proxy().type == PROXY_HTTPS) || (proxy().type == PROXY_UNKNOWN))) { + server_addr_.push_front(addr); + } else { + server_addr_.push_back(addr); + } +} + +void RelayPort::AddExternalAddress(const ProtocolAddress& addr) { + std::string proto_name = ProtoToString(addr.proto); + for (std::vector<Candidate>::const_iterator it = candidates().begin(); it != candidates().end(); ++it) { + if ((it->address() == addr.address) && (it->protocol() == proto_name)) { + LOG(INFO) << "Redundant relay address: " << proto_name << " @ " << addr.address.ToString(); + return; + } + } + add_address(addr.address, proto_name, false); +} + +void RelayPort::SetReady() { + if (!ready_) { + ready_ = true; + SignalAddressReady(this); + } +} + +const ProtocolAddress * RelayPort::ServerAddress(size_t index) const { + if ((index >= 0) && (index < server_addr_.size())) + return &server_addr_[index]; + return 0; +} + +bool RelayPort::HasMagicCookie(const char* data, size_t size) { + if (size < 24 + magic_cookie_.size()) { + return false; + } else { + return 0 == std::memcmp(data + 24, + magic_cookie_.c_str(), + magic_cookie_.size()); + } +} + +void RelayPort::PrepareAddress() { + // We initiate a connect on the first entry. If this completes, it will fill + // in the server address as the address of this port. + assert(entries_.size() == 1); + entries_[0]->Connect(); + ready_ = false; +} + +Connection* RelayPort::CreateConnection(const Candidate& address, CandidateOrigin origin) { + // We only create connections to non-udp sockets if they are incoming on this port + if ((address.protocol() != "udp") && (origin != ORIGIN_THIS_PORT)) + return 0; + + // We don't support loopback on relays + if (address.type() == type()) + return 0; + + size_t index = 0; + for (size_t i = 0; i < candidates().size(); ++i) { + const Candidate& local = candidates()[i]; + if (local.protocol() == address.protocol()) { + index = i; + break; + } + } + + Connection * conn = new ProxyConnection(this, index, address); + AddConnection(conn); + return conn; +} + +int RelayPort::SendTo(const void* data, + size_t size, + const SocketAddress& addr, bool payload) { + + // Try to find an entry for this specific address. Note that the first entry + // created was not given an address initially, so it can be set to the first + // address that comes along. + + RelayEntry* entry = 0; + + for (unsigned i = 0; i < entries_.size(); ++i) { + if (entries_[i]->address().IsAny() && payload) { + entry = entries_[i]; + entry->set_address(addr); + break; + } else if (entries_[i]->address() == addr) { + entry = entries_[i]; + break; + } + } + + // If we did not find one, then we make a new one. This will not be useable + // until it becomes connected, however. + if (!entry && payload) { + entry = new RelayEntry(this, addr, local_addr_); + if (!entries_.empty()) { + // Use the same port to connect to relay server + entry->SetServerIndex(entries_[0]->ServerIndex()); + } + entry->Connect(); + entries_.push_back(entry); + } + + // If the entry is connected, then we can send on it (though wrapping may + // still be necessary). Otherwise, we can't yet use this connection, so we + // default to the first one. + if (!entry || !entry->connected()) { + assert(!entries_.empty()); + entry = entries_[0]; + if (!entry->connected()) { + error_ = EWOULDBLOCK; + return SOCKET_ERROR; + } + } + + // Send the actual contents to the server using the usual mechanism. + int sent = entry->SendTo(data, size, addr); + if (sent <= 0) { + assert(sent < 0); + error_ = entry->GetError(); + return SOCKET_ERROR; + } + + // The caller of the function is expecting the number of user data bytes, + // rather than the size of the packet. + return (int)size; +} + +void RelayPort::OnMessage(Message *pmsg) { + switch (pmsg->message_id) { + case MSG_DISPOSE_SOCKET: { + DisposeSocketData * data = static_cast<DisposeSocketData *>(pmsg->pdata); + delete data->data(); + delete data; + break; } + default: + Port::OnMessage(pmsg); + } +} + +int RelayPort::SetOption(Socket::Option opt, int value) { + int result = 0; + for (unsigned i = 0; i < entries_.size(); ++i) { + if (entries_[i]->socket()->SetOption(opt, value) < 0) { + result = -1; + error_ = entries_[i]->socket()->GetError(); + } + } + options_.push_back(OptionValue(opt, value)); + return result; +} + +int RelayPort::GetError() { + return error_; +} + +void RelayPort::OnReadPacket( + const char* data, size_t size, const SocketAddress& remote_addr) { + if (Connection* conn = GetConnection(remote_addr)) { + conn->OnReadPacket(data, size); + } else { + Port::OnReadPacket(data, size, remote_addr); + } +} + +void RelayPort::DisposeSocket(AsyncPacketSocket * socket) { + thread_->Post(this, MSG_DISPOSE_SOCKET, new DisposeSocketData(socket)); +} + +RelayEntry::RelayEntry(RelayPort* port, const SocketAddress& ext_addr, + const SocketAddress& local_addr) + : port_(port), ext_addr_(ext_addr), local_addr_(local_addr), server_index_(0), + socket_(0), connected_(false), locked_(false), requests_(port->thread()) { + + requests_.SignalSendPacket.connect(this, &RelayEntry::OnSendPacket); +} + +RelayEntry::~RelayEntry() { + delete socket_; +} + +void RelayEntry::Connect() { + assert(socket_ == 0); + const ProtocolAddress * ra = port()->ServerAddress(server_index_); + if (!ra) { + LOG(INFO) << "Out of relay server connections"; + return; + } + + LOG(INFO) << "Connecting to relay via " << ProtoToString(ra->proto) << " @ " << ra->address.ToString(); + + socket_ = port_->CreatePacketSocket(ra->proto); + assert(socket_ != 0); + + socket_->SignalReadPacket.connect(this, &RelayEntry::OnReadPacket); + if (socket_->Bind(local_addr_) < 0) + LOG(INFO) << "bind: " << std::strerror(socket_->GetError()); + + for (unsigned i = 0; i < port_->options().size(); ++i) + socket_->SetOption(port_->options()[i].first, port_->options()[i].second); + + if ((ra->proto == PROTO_TCP) || (ra->proto == PROTO_SSLTCP)) { + AsyncTCPSocket * tcp = static_cast<AsyncTCPSocket *>(socket_); + tcp->SignalClose.connect(this, &RelayEntry::OnSocketClose); + tcp->SignalConnect.connect(this, &RelayEntry::OnSocketConnect); + tcp->Connect(ra->address); + } else { + requests_.Send(new AllocateRequest(this)); + } +} + +void RelayEntry::OnConnect(const SocketAddress& mapped_addr) { + ProtocolType proto = PROTO_UDP; + LOG(INFO) << "Relay allocate succeeded: " << ProtoToString(proto) << " @ " << mapped_addr.ToString(); + connected_ = true; + + port_->AddExternalAddress(ProtocolAddress(mapped_addr, proto)); + port_->SetReady(); +} + +int RelayEntry::SendTo(const void* data, + size_t size, + const SocketAddress& addr) { + + // If this connection is locked to the address given, then we can send the + // packet with no wrapper. + if (locked_ && (ext_addr_ == addr)) + return SendPacket(data, size); + + // Otherwise, we must wrap the given data in a STUN SEND request so that we + // can communicate the destination address to the server. + // + // Note that we do not use a StunRequest here. This is because there is + // likely no reason to resend this packet. If it is late, we just drop it. + // The next send to this address will try again. + + StunMessage request; + request.SetType(STUN_SEND_REQUEST); + request.SetTransactionID(CreateRandomString(16)); + + StunByteStringAttribute* magic_cookie_attr = + StunAttribute::CreateByteString(STUN_ATTR_MAGIC_COOKIE); + magic_cookie_attr->CopyBytes(port_->magic_cookie().c_str(), + (uint16)port_->magic_cookie().size()); + request.AddAttribute(magic_cookie_attr); + + StunByteStringAttribute* username_attr = + StunAttribute::CreateByteString(STUN_ATTR_USERNAME); + username_attr->CopyBytes(port_->username_fragment().c_str(), + (uint16)port_->username_fragment().size()); + request.AddAttribute(username_attr); + + StunAddressAttribute* addr_attr = + StunAttribute::CreateAddress(STUN_ATTR_DESTINATION_ADDRESS); + addr_attr->SetFamily(1); + addr_attr->SetIP(addr.ip()); + addr_attr->SetPort(addr.port()); + request.AddAttribute(addr_attr); + + // Attempt to lock + if (ext_addr_ == addr) { + StunUInt32Attribute* options_attr = + StunAttribute::CreateUInt32(STUN_ATTR_OPTIONS); + options_attr->SetValue(0x1); + request.AddAttribute(options_attr); + } + + StunByteStringAttribute* data_attr = + StunAttribute::CreateByteString(STUN_ATTR_DATA); + data_attr->CopyBytes(data, (uint16)size); + request.AddAttribute(data_attr); + + // TODO: compute the HMAC. + + ByteBuffer buf; + request.Write(&buf); + + return SendPacket(buf.Data(), buf.Length()); +} + +void RelayEntry::ScheduleKeepAlive() { + requests_.SendDelayed(new AllocateRequest(this), KEEPALIVE_DELAY); +} + +void RelayEntry::HandleConnectFailure() { + //if (GetMillisecondCount() - start_time_ > RETRY_TIMEOUT) + // return; + //ScheduleKeepAlive(); + + connected_ = false; + port()->DisposeSocket(socket_); + socket_ = 0; + server_index_ += 1; + Connect(); +} + +void RelayEntry::OnSocketConnect(AsyncTCPSocket* socket) { + assert(socket == socket_); + LOG(INFO) << "relay tcp connected to " << socket->GetRemoteAddress().ToString(); + requests_.Send(new AllocateRequest(this)); +} + +void RelayEntry::OnSocketClose(AsyncTCPSocket* socket, int error) { + assert(socket == socket_); + PLOG(LERROR, error) << "relay tcp connect failed"; + HandleConnectFailure(); +} + +void RelayEntry::OnReadPacket(const char* data, + size_t size, + const SocketAddress& remote_addr, + AsyncPacketSocket* socket) { + assert(socket == socket_); + //assert(remote_addr == port_->server_addr()); TODO: are we worried about this? + + // If the magic cookie is not present, then this is an unwrapped packet sent + // by the server, The actual remote address is the one we recorded. + if (!port_->HasMagicCookie(data, size)) { + if (locked_) { + port_->OnReadPacket(data, size, ext_addr_); + } else { + LOG(WARNING) << "Dropping packet: entry not locked"; + } + return; + } + + ByteBuffer buf(data, size); + StunMessage msg; + if (!msg.Read(&buf)) { + LOG(INFO) << "Incoming packet was not STUN"; + return; + } + + // The incoming packet should be a STUN ALLOCATE response, SEND response, or + // DATA indication. + if (requests_.CheckResponse(&msg)) { + return; + } else if (msg.type() == STUN_SEND_RESPONSE) { + if (const StunUInt32Attribute* options_attr = msg.GetUInt32(STUN_ATTR_OPTIONS)) { + if (options_attr->value() & 0x1) { + locked_ = true; + } + } + return; + } else if (msg.type() != STUN_DATA_INDICATION) { + LOG(INFO) << "Received BAD stun type from server: " << msg.type() + ; + return; + } + + // This must be a data indication. + + const StunAddressAttribute* addr_attr = + msg.GetAddress(STUN_ATTR_SOURCE_ADDRESS2); + if (!addr_attr) { + LOG(INFO) << "Data indication has no source address"; + return; + } else if (addr_attr->family() != 1) { + LOG(INFO) << "Source address has bad family"; + return; + } + + SocketAddress remote_addr2(addr_attr->ip(), addr_attr->port()); + + const StunByteStringAttribute* data_attr = msg.GetByteString(STUN_ATTR_DATA); + if (!data_attr) { + LOG(INFO) << "Data indication has no data"; + return; + } + + // Process the actual data and remote address in the normal manner. + port_->OnReadPacket(data_attr->bytes(), data_attr->length(), remote_addr2); +} + +void RelayEntry::OnSendPacket(const void* data, size_t size) { + SendPacket(data, size); +} + +int RelayEntry::SendPacket(const void* data, size_t size) { + const ProtocolAddress * ra = port_->ServerAddress(server_index_); + if (!ra) { + socket_->SetError(ENOTCONN); + return SOCKET_ERROR; + } + int sent = socket_->SendTo(data, size, ra->address); + if (sent <= 0) { + LOG(LS_VERBOSE) << "sendto: " << std::strerror(socket_->GetError()); + assert(sent < 0); + } + return sent; +} + +AllocateRequest::AllocateRequest(RelayEntry* entry) : entry_(entry) { + start_time_ = GetMillisecondCount(); +} + +void AllocateRequest::Prepare(StunMessage* request) { + request->SetType(STUN_ALLOCATE_REQUEST); + + StunByteStringAttribute* magic_cookie_attr = + StunAttribute::CreateByteString(STUN_ATTR_MAGIC_COOKIE); + magic_cookie_attr->CopyBytes( + entry_->port()->magic_cookie().c_str(), + (uint16)entry_->port()->magic_cookie().size()); + request->AddAttribute(magic_cookie_attr); + + StunByteStringAttribute* username_attr = + StunAttribute::CreateByteString(STUN_ATTR_USERNAME); + username_attr->CopyBytes( + entry_->port()->username_fragment().c_str(), + (uint16)entry_->port()->username_fragment().size()); + request->AddAttribute(username_attr); +} + +int AllocateRequest::GetNextDelay() { + int delay = 100 * _max(1 << count_, 2); + count_ += 1; + if (count_ == 5) + timeout_ = true; + return delay; +} + +void AllocateRequest::OnResponse(StunMessage* response) { + const StunAddressAttribute* addr_attr = + response->GetAddress(STUN_ATTR_MAPPED_ADDRESS); + if (!addr_attr) { + LOG(INFO) << "Allocate response missing mapped address."; + } else if (addr_attr->family() != 1) { + LOG(INFO) << "Mapped address has bad family"; + } else { + SocketAddress addr(addr_attr->ip(), addr_attr->port()); + entry_->OnConnect(addr); + } + + // We will do a keep-alive regardless of whether this request suceeds. + // This should have almost no impact on network usage. + entry_->ScheduleKeepAlive(); +} + +void AllocateRequest::OnErrorResponse(StunMessage* response) { + const StunErrorCodeAttribute* attr = response->GetErrorCode(); + if (!attr) { + LOG(INFO) << "Bad allocate response error code"; + } else { + LOG(INFO) << "Allocate error response:" + << " code=" << static_cast<int>(attr->error_code()) + << " reason='" << attr->reason() << "'"; + } + + if (GetMillisecondCount() - start_time_ <= RETRY_TIMEOUT) + entry_->ScheduleKeepAlive(); +} + +void AllocateRequest::OnTimeout() { + LOG(INFO) << "Allocate request timed out"; + entry_->HandleConnectFailure(); +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayport.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayport.h new file mode 100644 index 00000000..7cfdc015 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayport.h @@ -0,0 +1,93 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __RELAYPORT_H__ +#define __RELAYPORT_H__ + +#include "talk/p2p/base/port.h" +#include "talk/p2p/base/stunrequest.h" +#include <vector> + +namespace cricket { + +extern const std::string RELAY_PORT_TYPE; +class RelayEntry; + +// Communicates using an allocated port on the relay server. +class RelayPort: public Port { +public: + RelayPort( + Thread* thread, SocketFactory* factory, Network*, + const SocketAddress& local_addr, + const std::string& username, const std::string& password, + const std::string& magic_cookie); + virtual ~RelayPort(); + + void AddServerAddress(const ProtocolAddress& addr); + void AddExternalAddress(const ProtocolAddress& addr); + + typedef std::pair<Socket::Option, int> OptionValue; + const std::vector<OptionValue>& options() const { return options_; } + + const std::string& magic_cookie() const { return magic_cookie_; } + bool HasMagicCookie(const char* data, size_t size); + + virtual void PrepareAddress(); + virtual Connection* CreateConnection(const Candidate& address, CandidateOrigin origin); + + virtual int SetOption(Socket::Option opt, int value); + virtual int GetError(); + + const ProtocolAddress * ServerAddress(size_t index) const; + + void DisposeSocket(AsyncPacketSocket * socket); + +protected: + void SetReady(); + + virtual int SendTo(const void* data, size_t size, const SocketAddress& addr, bool payload); + virtual void OnMessage(Message *pmsg); + + // Dispatches the given packet to the port or connection as appropriate. + void OnReadPacket( + const char* data, size_t size, const SocketAddress& remote_addr); + +private: + friend class RelayEntry; + + SocketAddress local_addr_; + std::deque<ProtocolAddress> server_addr_; + bool ready_; + std::vector<RelayEntry*> entries_; + std::vector<OptionValue> options_; + std::string magic_cookie_; + int error_; +}; + +} // namespace cricket + +#endif // __RELAYPORT_H__ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver.cc new file mode 100644 index 00000000..bb52a1d5 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver.cc @@ -0,0 +1,657 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "talk/p2p/base/relayserver.h" +#include "talk/p2p/base/helpers.h" +#include <algorithm> +#include <cassert> +#include <cstring> +#include <iostream> + +#ifdef POSIX +extern "C" { +#include <errno.h> +} +#endif // POSIX + +namespace cricket { + +// By default, we require a ping every 90 seconds. +const int MAX_LIFETIME = 15 * 60 * 1000; + +// The number of bytes in each of the usernames we use. +const uint32 USERNAME_LENGTH = 16; + +// Calls SendTo on the given socket and logs any bad results. +void Send(AsyncPacketSocket* socket, const char* bytes, size_t size, + const SocketAddress& addr) { + int result = socket->SendTo(bytes, size, addr); + if (result < int(size)) { + std::cerr << "SendTo wrote only " << result << " of " << int(size) + << " bytes" << std::endl; + } else if (result < 0) { + std::cerr << "SendTo: " << std::strerror(errno) << std::endl; + } +} + +// Sends the given STUN message on the given socket. +void SendStun(const StunMessage& msg, + AsyncPacketSocket* socket, + const SocketAddress& addr) { + ByteBuffer buf; + msg.Write(&buf); + Send(socket, buf.Data(), buf.Length(), addr); +} + +// Constructs a STUN error response and sends it on the given socket. +void SendStunError(const StunMessage& msg, AsyncPacketSocket* socket, + const SocketAddress& remote_addr, int error_code, + const char* error_desc, const std::string& magic_cookie) { + + StunMessage err_msg; + err_msg.SetType(GetStunErrorResponseType(msg.type())); + err_msg.SetTransactionID(msg.transaction_id()); + + StunByteStringAttribute* magic_cookie_attr = + StunAttribute::CreateByteString(cricket::STUN_ATTR_MAGIC_COOKIE); + if (magic_cookie.size() == 0) + magic_cookie_attr->CopyBytes(cricket::STUN_MAGIC_COOKIE_VALUE, 4); + else + magic_cookie_attr->CopyBytes(magic_cookie.c_str(), magic_cookie.size()); + err_msg.AddAttribute(magic_cookie_attr); + + StunErrorCodeAttribute* err_code = StunAttribute::CreateErrorCode(); + err_code->SetErrorClass(error_code / 100); + err_code->SetNumber(error_code % 100); + err_code->SetReason(error_desc); + err_msg.AddAttribute(err_code); + + SendStun(err_msg, socket, remote_addr); +} + +RelayServer::RelayServer(Thread* thread) : thread_(thread) { +} + +RelayServer::~RelayServer() { + for (unsigned i = 0; i < internal_sockets_.size(); i++) + delete internal_sockets_[i]; + for (unsigned i = 0; i < external_sockets_.size(); i++) + delete external_sockets_[i]; +} + +void RelayServer::AddInternalSocket(AsyncPacketSocket* socket) { + assert(internal_sockets_.end() == + std::find(internal_sockets_.begin(), internal_sockets_.end(), socket)); + internal_sockets_.push_back(socket); + socket->SignalReadPacket.connect(this, &RelayServer::OnInternalPacket); +} + +void RelayServer::RemoveInternalSocket(AsyncPacketSocket* socket) { + SocketList::iterator iter = + std::find(internal_sockets_.begin(), internal_sockets_.end(), socket); + assert(iter != internal_sockets_.end()); + internal_sockets_.erase(iter); + socket->SignalReadPacket.disconnect(this); +} + +void RelayServer::AddExternalSocket(AsyncPacketSocket* socket) { + assert(external_sockets_.end() == + std::find(external_sockets_.begin(), external_sockets_.end(), socket)); + external_sockets_.push_back(socket); + socket->SignalReadPacket.connect(this, &RelayServer::OnExternalPacket); +} + +void RelayServer::RemoveExternalSocket(AsyncPacketSocket* socket) { + SocketList::iterator iter = + std::find(external_sockets_.begin(), external_sockets_.end(), socket); + assert(iter != external_sockets_.end()); + external_sockets_.erase(iter); + socket->SignalReadPacket.disconnect(this); +} + +void RelayServer::OnInternalPacket( + const char* bytes, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket) { + + // Get the address of the connection we just received on. + SocketAddressPair ap(remote_addr, socket->GetLocalAddress()); + assert(!ap.destination().IsAny()); + + // If this did not come from an existing connection, it should be a STUN + // allocate request. + ConnectionMap::iterator piter = connections_.find(ap); + if (piter == connections_.end()) { + HandleStunAllocate(bytes, size, ap, socket); + return; + } + + RelayServerConnection* int_conn = piter->second; + + // Handle STUN requests to the server itself. + if (int_conn->binding()->HasMagicCookie(bytes, size)) { + HandleStun(int_conn, bytes, size); + return; + } + + // Otherwise, this is a non-wrapped packet that we are to forward. Make sure + // that this connection has been locked. (Otherwise, we would not know what + // address to forward to.) + if (!int_conn->locked()) { + std::cerr << "Dropping packet: connection not locked" << std::endl; + return; + } + + // Forward this to the destination address into the connection. + RelayServerConnection* ext_conn = int_conn->binding()->GetExternalConnection( + int_conn->default_destination()); + if (ext_conn) { + // TODO: Check the HMAC. + ext_conn->Send(bytes, size); + } else { + // This happens very often and is not an error. + //std::cerr << "Dropping packet: no external connection" << std::endl; + } +} + +void RelayServer::OnExternalPacket( + const char* bytes, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket) { + + // Get the address of the connection we just received on. + SocketAddressPair ap(remote_addr, socket->GetLocalAddress()); + assert(!ap.destination().IsAny()); + + // If this connection already exists, then forward the traffic. + ConnectionMap::iterator piter = connections_.find(ap); + if (piter != connections_.end()) { + // TODO: Check the HMAC. + RelayServerConnection* ext_conn = piter->second; + RelayServerConnection* int_conn = + ext_conn->binding()->GetInternalConnection( + ext_conn->addr_pair().source()); + assert(int_conn); + int_conn->Send(bytes, size, ext_conn->addr_pair().source()); + return; + } + + // The first packet should always be a STUN / TURN packet. If it isn't, then + // we should just ignore this packet. + StunMessage msg; + ByteBuffer buf = ByteBuffer(bytes, size); + if (!msg.Read(&buf)) { + std::cerr << "Dropping packet: first packet not STUN" << std::endl; + return; + } + + // The initial packet should have a username (which identifies the binding). + const StunByteStringAttribute* username_attr = + msg.GetByteString(STUN_ATTR_USERNAME); + if (!username_attr) { + std::cerr << "Dropping packet: no username" << std::endl; + return; + } + + uint32 length = _min(uint32(username_attr->length()), USERNAME_LENGTH); + std::string username(username_attr->bytes(), length); + // TODO: Check the HMAC. + + // The binding should already be present. + BindingMap::iterator biter = bindings_.find(username); + if (biter == bindings_.end()) { + // TODO: Turn this back on. This is the sign of a client bug. + //std::cerr << "Dropping packet: no binding with username" << std::endl; + return; + } + + // Add this authenticted connection to the binding. + RelayServerConnection* ext_conn = + new RelayServerConnection(biter->second, ap, socket); + ext_conn->binding()->AddExternalConnection(ext_conn); + AddConnection(ext_conn); + + // We always know where external packets should be forwarded, so we can lock + // them from the beginning. + ext_conn->Lock(); + + // Send this message on the appropriate internal connection. + RelayServerConnection* int_conn = ext_conn->binding()->GetInternalConnection( + ext_conn->addr_pair().source()); + assert(int_conn); + int_conn->Send(bytes, size, ext_conn->addr_pair().source()); +} + +bool RelayServer::HandleStun( + const char* bytes, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket, std::string* username, StunMessage* msg) { + + // Parse this into a stun message. + ByteBuffer buf = ByteBuffer(bytes, size); + if (!msg->Read(&buf)) { + SendStunError(*msg, socket, remote_addr, 400, "Bad Request", ""); + return false; + } + + // The initial packet should have a username (which identifies the binding). + const StunByteStringAttribute* username_attr = + msg->GetByteString(STUN_ATTR_USERNAME); + if (!username_attr) { + SendStunError(*msg, socket, remote_addr, 432, "Missing Username", ""); + return false; + } + + // Record the username if requested. + if (username) + username->append(username_attr->bytes(), username_attr->length()); + + // TODO: Check for unknown attributes (<= 0x7fff) + + return true; +} + +void RelayServer::HandleStunAllocate( + const char* bytes, size_t size, const SocketAddressPair& ap, + AsyncPacketSocket* socket) { + + // Make sure this is a valid STUN request. + StunMessage request; + std::string username; + if (!HandleStun(bytes, size, ap.source(), socket, &username, &request)) + return; + + // Make sure this is a an allocate request. + if (request.type() != STUN_ALLOCATE_REQUEST) { + SendStunError(request, + socket, + ap.source(), + 600, + "Operation Not Supported", + ""); + return; + } + + // TODO: Check the HMAC. + + // Find or create the binding for this username. + + RelayServerBinding* binding; + + BindingMap::iterator biter = bindings_.find(username); + if (biter != bindings_.end()) { + + binding = biter->second; + + } else { + + // NOTE: In the future, bindings will be created by the bot only. This + // else-branch will then disappear. + + // Compute the appropriate lifetime for this binding. + uint32 lifetime = MAX_LIFETIME; + const StunUInt32Attribute* lifetime_attr = + request.GetUInt32(STUN_ATTR_LIFETIME); + if (lifetime_attr) + lifetime = _min(lifetime, lifetime_attr->value() * 1000); + + binding = new RelayServerBinding(this, username, "0", lifetime); + binding->SignalTimeout.connect(this, &RelayServer::OnTimeout); + bindings_[username] = binding; + + std::cout << "Added new binding: " << bindings_.size() << " total" << std::endl; + } + + // Add this connection to the binding. It starts out unlocked. + RelayServerConnection* int_conn = + new RelayServerConnection(binding, ap, socket); + binding->AddInternalConnection(int_conn); + AddConnection(int_conn); + + // Now that we have a connection, this other method takes over. + HandleStunAllocate(int_conn, request); +} + +void RelayServer::HandleStun( + RelayServerConnection* int_conn, const char* bytes, size_t size) { + + // Make sure this is a valid STUN request. + StunMessage request; + std::string username; + if (!HandleStun(bytes, size, int_conn->addr_pair().source(), + int_conn->socket(), &username, &request)) + return; + + // Make sure the username is the one were were expecting. + if (username != int_conn->binding()->username()) { + int_conn->SendStunError(request, 430, "Stale Credentials"); + return; + } + + // TODO: Check the HMAC. + + // Send this request to the appropriate handler. + if (request.type() == STUN_SEND_REQUEST) + HandleStunSend(int_conn, request); + else if (request.type() == STUN_ALLOCATE_REQUEST) + HandleStunAllocate(int_conn, request); + else + int_conn->SendStunError(request, 600, "Operation Not Supported"); +} + +void RelayServer::HandleStunAllocate( + RelayServerConnection* int_conn, const StunMessage& request) { + + // Create a response message that includes an address with which external + // clients can communicate. + + StunMessage response; + response.SetType(STUN_ALLOCATE_RESPONSE); + response.SetTransactionID(request.transaction_id()); + + StunByteStringAttribute* magic_cookie_attr = + StunAttribute::CreateByteString(cricket::STUN_ATTR_MAGIC_COOKIE); + magic_cookie_attr->CopyBytes(int_conn->binding()->magic_cookie().c_str(), + int_conn->binding()->magic_cookie().size()); + response.AddAttribute(magic_cookie_attr); + + size_t index = rand() % external_sockets_.size(); + SocketAddress ext_addr = external_sockets_[index]->GetLocalAddress(); + + StunAddressAttribute* addr_attr = + StunAttribute::CreateAddress(STUN_ATTR_MAPPED_ADDRESS); + addr_attr->SetFamily(1); + addr_attr->SetIP(ext_addr.ip()); + addr_attr->SetPort(ext_addr.port()); + response.AddAttribute(addr_attr); + + StunUInt32Attribute* res_lifetime_attr = + StunAttribute::CreateUInt32(STUN_ATTR_LIFETIME); + res_lifetime_attr->SetValue(int_conn->binding()->lifetime() / 1000); + response.AddAttribute(res_lifetime_attr); + + // TODO: Support transport-prefs (preallocate RTCP port). + // TODO: Support bandwidth restrictions. + // TODO: Add message integrity check. + + // Send a response to the caller. + int_conn->SendStun(response); +} + +void RelayServer::HandleStunSend( + RelayServerConnection* int_conn, const StunMessage& request) { + + const StunAddressAttribute* addr_attr = + request.GetAddress(STUN_ATTR_DESTINATION_ADDRESS); + if (!addr_attr) { + int_conn->SendStunError(request, 400, "Bad Request"); + return; + } + + const StunByteStringAttribute* data_attr = + request.GetByteString(STUN_ATTR_DATA); + if (!data_attr) { + int_conn->SendStunError(request, 400, "Bad Request"); + return; + } + + SocketAddress ext_addr(addr_attr->ip(), addr_attr->port()); + RelayServerConnection* ext_conn = + int_conn->binding()->GetExternalConnection(ext_addr); + if (!ext_conn) { + // This happens very often and is not an error. + //std::cerr << "Dropping packet: no external connection" << std::endl; + return; + } + + ext_conn->Send(data_attr->bytes(), data_attr->length()); + + const StunUInt32Attribute* options_attr = + request.GetUInt32(STUN_ATTR_OPTIONS); + if (options_attr && (options_attr->value() & 0x01 != 0)) { + int_conn->set_default_destination(ext_addr); + int_conn->Lock(); + + StunMessage response; + response.SetType(STUN_SEND_RESPONSE); + response.SetTransactionID(request.transaction_id()); + + StunByteStringAttribute* magic_cookie_attr = + StunAttribute::CreateByteString(cricket::STUN_ATTR_MAGIC_COOKIE); + magic_cookie_attr->CopyBytes(int_conn->binding()->magic_cookie().c_str(), + int_conn->binding()->magic_cookie().size()); + response.AddAttribute(magic_cookie_attr); + + StunUInt32Attribute* options2_attr = + StunAttribute::CreateUInt32(cricket::STUN_ATTR_OPTIONS); + options2_attr->SetValue(0x01); + response.AddAttribute(options2_attr); + + int_conn->SendStun(response); + } +} + +void RelayServer::AddConnection(RelayServerConnection* conn) { + assert(connections_.find(conn->addr_pair()) == connections_.end()); + connections_[conn->addr_pair()] = conn; +} + +void RelayServer::RemoveConnection(RelayServerConnection* conn) { + ConnectionMap::iterator iter = connections_.find(conn->addr_pair()); + assert(iter != connections_.end()); + connections_.erase(iter); +} + +void RelayServer::RemoveBinding(RelayServerBinding* binding) { + BindingMap::iterator iter = bindings_.find(binding->username()); + assert(iter != bindings_.end()); + bindings_.erase(iter); + + std::cout << "Removed a binding: " << bindings_.size() << " remaining" << std::endl; +} + +void RelayServer::OnTimeout(RelayServerBinding* binding) { + // This call will result in all of the necessary clean-up. + delete binding; +} + +RelayServerConnection::RelayServerConnection( + RelayServerBinding* binding, const SocketAddressPair& addrs, + AsyncPacketSocket* socket) + : binding_(binding), addr_pair_(addrs), socket_(socket), locked_(false) { + + // The creation of a new connection constitutes a use of the binding. + binding_->NoteUsed(); +} + +RelayServerConnection::~RelayServerConnection() { + // Remove this connection from the server's map (if it exists there). + binding_->server()->RemoveConnection(this); +} + +void RelayServerConnection::Send(const char* data, size_t size) { + // Note that the binding has been used again. + binding_->NoteUsed(); + + cricket::Send(socket_, data, size, addr_pair_.source()); +} + +void RelayServerConnection::Send( + const char* data, size_t size, const SocketAddress& from_addr) { + // If the from address is known to the client, we don't need to send it. + if (locked() && (from_addr == default_dest_)) { + Send(data, size); + return; + } + + // Wrap the given data in a data-indication packet. + + StunMessage msg; + msg.SetType(STUN_DATA_INDICATION); + msg.SetTransactionID("0000000000000000"); + + StunByteStringAttribute* magic_cookie_attr = + StunAttribute::CreateByteString(cricket::STUN_ATTR_MAGIC_COOKIE); + magic_cookie_attr->CopyBytes(binding_->magic_cookie().c_str(), + binding_->magic_cookie().size()); + msg.AddAttribute(magic_cookie_attr); + + StunAddressAttribute* addr_attr = + StunAttribute::CreateAddress(STUN_ATTR_SOURCE_ADDRESS2); + addr_attr->SetFamily(1); + addr_attr->SetIP(from_addr.ip()); + addr_attr->SetPort(from_addr.port()); + msg.AddAttribute(addr_attr); + + StunByteStringAttribute* data_attr = + StunAttribute::CreateByteString(STUN_ATTR_DATA); + assert(size <= 65536); + data_attr->CopyBytes(data, uint16(size)); + msg.AddAttribute(data_attr); + + SendStun(msg); +} + +void RelayServerConnection::SendStun(const StunMessage& msg) { + // Note that the binding has been used again. + binding_->NoteUsed(); + + cricket::SendStun(msg, socket_, addr_pair_.source()); +} + +void RelayServerConnection::SendStunError( + const StunMessage& request, int error_code, const char* error_desc) { + // An error does not indicate use. If no legitimate use off the binding + // occurs, we want it to be cleaned up even if errors are still occuring. + + cricket::SendStunError( + request, socket_, addr_pair_.source(), error_code, error_desc, + binding_->magic_cookie()); +} + +void RelayServerConnection::Lock() { + locked_ = true; +} + +void RelayServerConnection::Unlock() { + locked_ = false; +} + +// IDs used for posted messages: +const uint32 MSG_LIFETIME_TIMER = 1; + +RelayServerBinding::RelayServerBinding( + RelayServer* server, const std::string& username, + const std::string& password, uint32 lifetime) + : server_(server), username_(username), password_(password), + lifetime_(lifetime) { + + // For now, every connection uses the standard magic cookie value. + magic_cookie_.append( + reinterpret_cast<const char*>(STUN_MAGIC_COOKIE_VALUE), 4); + + // Initialize the last-used time to now. + NoteUsed(); + + // Set the first timeout check. + server_->thread()->PostDelayed(lifetime_, this, MSG_LIFETIME_TIMER); +} + +RelayServerBinding::~RelayServerBinding() { + // Clear the outstanding timeout check. + server_->thread()->Clear(this); + + // Clean up all of the connections. + for (size_t i = 0; i < internal_connections_.size(); ++i) + delete internal_connections_[i]; + for (size_t i = 0; i < external_connections_.size(); ++i) + delete external_connections_[i]; + + // Remove this binding from the server's map. + server_->RemoveBinding(this); +} + +void RelayServerBinding::AddInternalConnection(RelayServerConnection* conn) { + internal_connections_.push_back(conn); +} + +void RelayServerBinding::AddExternalConnection(RelayServerConnection* conn) { + external_connections_.push_back(conn); +} + +void RelayServerBinding::NoteUsed() { + last_used_ = GetMillisecondCount(); +} + +bool RelayServerBinding::HasMagicCookie(const char* bytes, size_t size) const { + if (size < 24 + magic_cookie_.size()) { + return false; + } else { + return 0 == std::memcmp( + bytes + 24, magic_cookie_.c_str(), magic_cookie_.size()); + } +} + +RelayServerConnection* RelayServerBinding::GetInternalConnection( + const SocketAddress& ext_addr) { + + // Look for an internal connection that is locked to this address. + for (size_t i = 0; i < internal_connections_.size(); ++i) { + if (internal_connections_[i]->locked() && + (ext_addr == internal_connections_[i]->default_destination())) + return internal_connections_[i]; + } + + // If one was not found, we send to the first connection. + assert(internal_connections_.size() > 0); + return internal_connections_[0]; +} + +RelayServerConnection* RelayServerBinding::GetExternalConnection( + const SocketAddress& ext_addr) { + for (size_t i = 0; i < external_connections_.size(); ++i) { + if (ext_addr == external_connections_[i]->addr_pair().source()) + return external_connections_[i]; + } + return 0; +} + +void RelayServerBinding::OnMessage(Message *pmsg) { + if (pmsg->message_id == MSG_LIFETIME_TIMER) { + assert(!pmsg->pdata); + + // If the lifetime timeout has been exceeded, then send a signal. + // Otherwise, just keep waiting. + if (GetMillisecondCount() >= last_used_ + lifetime_) { + SignalTimeout(this); + } else { + server_->thread()->PostDelayed(lifetime_, this, MSG_LIFETIME_TIMER); + } + + } else { + assert(false); + } +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver.h new file mode 100644 index 00000000..01dc3678 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver.h @@ -0,0 +1,210 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __RELAYSERVER_H__ +#define __RELAYSERVER_H__ + +#include "talk/base/asyncudpsocket.h" +#include "talk/base/socketaddresspair.h" +#include "talk/base/thread.h" +#include "talk/base/jtime.h" +#include "talk/p2p/base/stun.h" + +#include <string> +#include <vector> +#include <map> + +namespace cricket { + +class RelayServerBinding; +class RelayServerConnection; + +// Relays traffic between connections to the server that are "bound" together. +// All connections created with the same username/password are bound together. +class RelayServer : public sigslot::has_slots<> { +public: + // Creates a server, which will use this thread to post messages to itself. + RelayServer(Thread* thread); + ~RelayServer(); + + Thread* thread() { return thread_; } + + // Updates the set of sockets that the server uses to talk to "internal" + // clients. These are clients that do the "port allocations". + void AddInternalSocket(AsyncPacketSocket* socket); + void RemoveInternalSocket(AsyncPacketSocket* socket); + + // Updates the set of sockets that the server uses to talk to "external" + // clients. These are the clients that do not do allocations. They do not + // know that these addresses represent a relay server. + void AddExternalSocket(AsyncPacketSocket* socket); + void RemoveExternalSocket(AsyncPacketSocket* socket); + +private: + typedef std::vector<AsyncPacketSocket*> SocketList; + typedef std::map<std::string,RelayServerBinding*> BindingMap; + typedef std::map<SocketAddressPair,RelayServerConnection*> ConnectionMap; + + Thread* thread_; + SocketList internal_sockets_; + SocketList external_sockets_; + BindingMap bindings_; + ConnectionMap connections_; + + // Called when a packet is received by the server on one of its sockets. + void OnInternalPacket( + const char* bytes, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket); + void OnExternalPacket( + const char* bytes, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket); + + // Processes the relevant STUN request types from the client. + bool HandleStun(const char* bytes, size_t size, + const SocketAddress& remote_addr, AsyncPacketSocket* socket, + std::string* username, StunMessage* msg); + void HandleStunAllocate(const char* bytes, size_t size, + const SocketAddressPair& ap, + AsyncPacketSocket* socket); + void HandleStun(RelayServerConnection* int_conn, const char* bytes, + size_t size); + void HandleStunAllocate(RelayServerConnection* int_conn, + const StunMessage& msg); + void HandleStunSend(RelayServerConnection* int_conn, const StunMessage& msg); + + // Adds/Removes the a connection or binding. + void AddConnection(RelayServerConnection* conn); + void RemoveConnection(RelayServerConnection* conn); + void RemoveBinding(RelayServerBinding* binding); + + // Called when the timer for checking lifetime times out. + void OnTimeout(RelayServerBinding* binding); + + friend class RelayServerConnection; + friend class RelayServerBinding; +}; + +// Maintains information about a connection to the server. Each connection is +// part of one and only one binding. +class RelayServerConnection { +public: + RelayServerConnection(RelayServerBinding* binding, + const SocketAddressPair& addrs, + AsyncPacketSocket* socket); + ~RelayServerConnection(); + + RelayServerBinding* binding() { return binding_; } + AsyncPacketSocket* socket() { return socket_; } + + // Returns a pair where the source is the remote address and the destination + // is the local address. + const SocketAddressPair& addr_pair() { return addr_pair_; } + + // Sends a packet to the connected client. If an address is provided, then + // we make sure the internal client receives it, wrapping if necessary. + void Send(const char* data, size_t size); + void Send(const char* data, size_t size, const SocketAddress& ext_addr); + + // Sends a STUN message to the connected client with no wrapping. + void SendStun(const StunMessage& msg); + void SendStunError(const StunMessage& request, int code, const char* desc); + + // A locked connection is one for which we know the intended destination of + // any raw packet received. + bool locked() const { return locked_; } + void Lock(); + void Unlock(); + + // Records the address that raw packets should be forwarded to (for internal + // packets only; for external, we already know where they go). + const SocketAddress& default_destination() const { return default_dest_; } + void set_default_destination(const SocketAddress& addr) { + default_dest_ = addr; + } + +private: + RelayServerBinding* binding_; + SocketAddressPair addr_pair_; + AsyncPacketSocket* socket_; + bool locked_; + SocketAddress default_dest_; +}; + +// Records a set of internal and external connections that we relay between, +// or in other words, that are "bound" together. +class RelayServerBinding : public MessageHandler { +public: + RelayServerBinding( + RelayServer* server, const std::string& username, + const std::string& password, uint32 lifetime); + virtual ~RelayServerBinding(); + + RelayServer* server() { return server_; } + uint32 lifetime() { return lifetime_; } + const std::string& username() { return username_; } + const std::string& password() { return password_; } + const std::string& magic_cookie() { return magic_cookie_; } + + // Adds/Removes a connection into the binding. + void AddInternalConnection(RelayServerConnection* conn); + void AddExternalConnection(RelayServerConnection* conn); + + // We keep track of the use of each binding. If we detect that it was not + // used for longer than the lifetime, then we send a signal. + void NoteUsed(); + sigslot::signal1<RelayServerBinding*> SignalTimeout; + + // Determines whether the given packet has the magic cookie present (in the + // right place). + bool HasMagicCookie(const char* bytes, size_t size) const; + + // Determines the connection to use to send packets to or from the given + // external address. + RelayServerConnection* GetInternalConnection(const SocketAddress& ext_addr); + RelayServerConnection* GetExternalConnection(const SocketAddress& ext_addr); + + // MessageHandler: + void OnMessage(Message *pmsg); + +private: + RelayServer* server_; + + std::string username_; + std::string password_; + std::string magic_cookie_; + + std::vector<RelayServerConnection*> internal_connections_; + std::vector<RelayServerConnection*> external_connections_; + + uint32 lifetime_; + uint32 last_used_; + // TODO: bandwidth +}; + +} // namespace cricket + +#endif // __RELAYSERVER_H__ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver.pro b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver.pro new file mode 100644 index 00000000..41bc6b63 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver.pro @@ -0,0 +1,14 @@ +TEMPLATE = app +INCLUDEPATH = ../../.. +DEFINES += POSIX + +include(../../../../../conf.pri) + +# Input +SOURCES += \ + relayserver.cc \ + relayserver_main.cc \ + ../../base/host.cc \ + ../../base/socketaddresspair.cc + +LIBS += ../../../liblibjingle.a diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver_main.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver_main.cc new file mode 100644 index 00000000..5f624f37 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayserver_main.cc @@ -0,0 +1,75 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "talk/base/host.h" +#include "talk/base/thread.h" +#include "talk/p2p/base/relayserver.h" +#include <iostream> +#include <assert.h> + +#ifdef POSIX +extern "C" { +#include <errno.h> +} +#endif // POSIX + +using namespace cricket; + +int main(int argc, char **argv) { + if (argc != 1) { + std::cerr << "usage: relayserver" << std::endl; + return 1; + } + + assert(LocalHost().networks().size() >= 2); + SocketAddress int_addr(LocalHost().networks()[1]->ip(), 5000); + SocketAddress ext_addr(LocalHost().networks()[1]->ip(), 5001); + + Thread *pthMain = Thread::Current(); + + AsyncUDPSocket* int_socket = CreateAsyncUDPSocket(pthMain->socketserver()); + if (int_socket->Bind(int_addr) < 0) { + std::cerr << "bind: " << std::strerror(errno) << std::endl; + return 1; + } + + AsyncUDPSocket* ext_socket = CreateAsyncUDPSocket(pthMain->socketserver()); + if (ext_socket->Bind(ext_addr) < 0) { + std::cerr << "bind: " << std::strerror(errno) << std::endl; + return 1; + } + + RelayServer server(pthMain); + server.AddInternalSocket(int_socket); + server.AddExternalSocket(ext_socket); + + std::cout << "Listening internally at " << int_addr.ToString() << std::endl; + std::cout << "Listening externally at " << ext_addr.ToString() << std::endl; + + pthMain->Loop(); + return 0; +} diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/session.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/session.cc new file mode 100644 index 00000000..73873338 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/session.cc @@ -0,0 +1,421 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "talk/base/common.h" +#include "talk/base/logging.h" +#include "talk/p2p/base/helpers.h" +#include "talk/p2p/base/session.h" + +namespace cricket { + +const uint32 MSG_TIMEOUT = 1; +const uint32 MSG_ERROR = 2; +const uint32 MSG_STATE = 3; + +Session::Session(SessionManager *session_manager, const std::string &name, + const SessionID& id) { + session_manager_ = session_manager; + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + name_ = name; + id_ = id; + error_ = ERROR_NONE; + state_ = STATE_INIT; + initiator_ = false; + description_ = NULL; + remote_description_ = NULL; + socket_manager_ = new SocketManager(session_manager_); + socket_manager_->SignalCandidatesReady.connect(this, &Session::OnCandidatesReady); + socket_manager_->SignalNetworkError.connect(this, &Session::OnNetworkError); + socket_manager_->SignalState.connect(this, &Session::OnSocketState); + socket_manager_->SignalRequestSignaling.connect(this, &Session::OnRequestSignaling); +} + +Session::~Session() { + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + delete description_; + delete remote_description_; + delete socket_manager_; + session_manager_->signaling_thread()->Clear(this); +} + +P2PSocket *Session::CreateSocket(const std::string &name) { + return socket_manager_->CreateSocket(name); +} + +void Session::DestroySocket(P2PSocket *socket) { + socket_manager_->DestroySocket(socket); +} + +void Session::OnCandidatesReady(const std::vector<Candidate>& candidates) { + SendSessionMessage(SessionMessage::TYPE_CANDIDATES, NULL, &candidates, NULL); +} + +void Session::OnNetworkError() { + // Socket manager is experiencing a network error trying to allocate + // network resources (usually port allocation) + + set_error(ERROR_NETWORK); +} + +void Session::OnSocketState() { + // If the call is not in progress, then we don't care about writability. + // We have separate timers for making sure we transition back to the in- + // progress state in time. + if (state_ != STATE_INPROGRESS) + return; + + // Put the timer into the write state. This is called when the state changes, + // so we will restart the timer each time we lose writability. + if (socket_manager_->writable()) { + session_manager_->signaling_thread()->Clear(this, MSG_TIMEOUT); + } else { + session_manager_->signaling_thread()->PostDelayed( + session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT); + } +} + +void Session::OnRequestSignaling() { + SignalRequestSignaling(); +} + +void Session::OnSignalingReady() { + socket_manager_->OnSignalingReady(); +} + +void Session::SendSessionMessage(SessionMessage::Type type, + const SessionDescription* description, + const std::vector<Candidate>* candidates, + SessionMessage::Cookie* redirect_cookie) { + SessionMessage m; + m.set_type(type); + m.set_to(remote_address_); + m.set_name(name_); + m.set_description(description); + m.set_session_id(id_); + if (candidates) + m.set_candidates(*candidates); + m.set_redirect_target(redirect_target_); + m.set_redirect_cookie(redirect_cookie); + SignalOutgoingMessage(this, m); +} + +bool Session::Initiate(const std::string &to, const SessionDescription *description) { + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + + // Only from STATE_INIT + if (state_ != STATE_INIT) + return false; + + // Setup for signaling. Initiate is asynchronous. It occurs once the address + // candidates are ready. + initiator_ = true; + remote_address_ = to; + description_ = description; + SendSessionMessage(SessionMessage::TYPE_INITIATE, description, NULL, NULL); + set_state(Session::STATE_SENTINITIATE); + + // Let the socket manager know we now want the candidates + socket_manager_->StartProcessingCandidates(); + + // Start the session timeout + session_manager_->signaling_thread()->Clear(this, MSG_TIMEOUT); + session_manager_->signaling_thread()->PostDelayed(session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT); + return true; +} + +bool Session::Accept(const SessionDescription *description) { + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + + // Only if just received initiate + if (state_ != STATE_RECEIVEDINITIATE) + return false; + + // Setup for signaling. Accept is asynchronous. It occurs once the address + // candidates are ready. + initiator_ = false; + description_ = description; + SendSessionMessage(SessionMessage::TYPE_ACCEPT, description, NULL, NULL); + set_state(Session::STATE_SENTACCEPT); + + return true; +} + +bool Session::Modify(const SessionDescription *description) { + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + + // Only if session already STATE_INPROGRESS + if (state_ != STATE_INPROGRESS) + return false; + + // Modify is asynchronous. It occurs once the address candidates are ready. + // Either side can send a modify. It is only valid in an already accepted + // session. + description_ = description; + SendSessionMessage(SessionMessage::TYPE_MODIFY, description, NULL, NULL); + set_state(Session::STATE_SENTMODIFY); + + // Start the session timeout + session_manager_->signaling_thread()->Clear(this, MSG_TIMEOUT); + session_manager_->signaling_thread()->PostDelayed(session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT); + return true; +} + +bool Session::Redirect(const std::string& target) { + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + + // Redirect is sent in response to an initiate or modify, to redirect the + // request + if (state_ != STATE_RECEIVEDINITIATE) + return false; + + initiator_ = false; + redirect_target_ = target; + SendSessionMessage(SessionMessage::TYPE_REDIRECT, NULL, NULL, NULL); + + // A redirect puts us in the same state as reject. It just sends a different + // kind of reject message, if you like. + set_state(STATE_SENTREDIRECT); + + return true; +} + +bool Session::Reject() { + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + + // Reject is sent in response to an initiate or modify, to reject the + // request + if (state_ != STATE_RECEIVEDINITIATE && state_ != STATE_RECEIVEDMODIFY) + return false; + + initiator_ = false; + SendSessionMessage(SessionMessage::TYPE_REJECT, NULL, NULL, NULL); + set_state(STATE_SENTREJECT); + + return true; +} + +bool Session::Terminate() { + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + + // Either side can terminate, at any time. + if (state_ == STATE_SENTTERMINATE && state_ != STATE_RECEIVEDTERMINATE) + return false; + + // But we don't need to terminate if we already rejected. The other client + // already knows that we're done with this session. + if (state_ != STATE_SENTREDIRECT) + SendSessionMessage(SessionMessage::TYPE_TERMINATE, NULL, NULL, NULL); + + set_state(STATE_SENTTERMINATE); + + return true; +} + +void Session::OnIncomingError(const SessionMessage &m) { + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + + // If a candidate message errors out or gets dropped for some reason we + // ignore the error. + if (m.type() != SessionMessage::TYPE_CANDIDATES) { + set_error(ERROR_RESPONSE); + } +} + +void Session::OnIncomingMessage(const SessionMessage &m) { + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + + switch (m.type()) { + case SessionMessage::TYPE_INITIATE: + remote_description_ = m.description(); + remote_address_ = m.from(); + name_ = m.name(); + initiator_ = false; + set_state(STATE_RECEIVEDINITIATE); + + // Let the socket manager know we now want the initial candidates + socket_manager_->StartProcessingCandidates(); + break; + + case SessionMessage::TYPE_ACCEPT: + remote_description_ = m.description(); + set_state(STATE_RECEIVEDACCEPT); + break; + + case SessionMessage::TYPE_MODIFY: + remote_description_ = m.description(); + set_state(STATE_RECEIVEDMODIFY); + break; + + case SessionMessage::TYPE_CANDIDATES: + socket_manager_->AddRemoteCandidates(m.candidates()); + break; + + case SessionMessage::TYPE_REJECT: + set_state(STATE_RECEIVEDREJECT); + break; + + case SessionMessage::TYPE_REDIRECT: + OnRedirectMessage(m); + break; + + case SessionMessage::TYPE_TERMINATE: + set_state(STATE_RECEIVEDTERMINATE); + break; + } +} + +void Session::OnRedirectMessage(const SessionMessage &m) { + ASSERT(state_ == STATE_SENTINITIATE); + if (state_ != STATE_SENTINITIATE) + return; + + ASSERT(m.redirect_target().size() != 0); + remote_address_ = m.redirect_target(); + + SendSessionMessage(SessionMessage::TYPE_INITIATE, description_, NULL, + m.redirect_cookie()->Copy()); + + // Restart the session timeout. + session_manager_->signaling_thread()->Clear(this, MSG_TIMEOUT); + session_manager_->signaling_thread()->PostDelayed(session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT); + + // Reset all of the sockets back into the initial state. + socket_manager_->ResetSockets(); +} + +Session::State Session::state() { + return state_; +} + +void Session::set_state(State state) { + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + if (state != state_) { + state_ = state; + SignalState(this, state); + session_manager_->signaling_thread()->Post(this, MSG_STATE); + } +} + +Session::Error Session::error() { + return error_; +} + +void Session::set_error(Error error) { + ASSERT(session_manager_->signaling_thread()->IsCurrent()); + if (error != error_) { + error_ = error; + SignalError(this, error); + session_manager_->signaling_thread()->Post(this, MSG_ERROR); + } +} + +const std::string &Session::name() { + return name_; +} + +const std::string &Session::remote_address() { + return remote_address_; +} + +bool Session::initiator() { + return initiator_; +} + +const SessionID& Session::id() { + return id_; +} + +const SessionDescription *Session::description() { + return description_; +} + +const SessionDescription *Session::remote_description() { + return remote_description_; +} + +SessionManager *Session::session_manager() { + return session_manager_; +} + +void Session::OnMessage(Message *pmsg) { + switch(pmsg->message_id) { + case MSG_TIMEOUT: + // Session timeout has occured. Check to see if the session is still trying + // to signal. If so, the session has timed out. + // The Sockets have their own timeout for connectivity. + set_error(ERROR_TIME); + break; + + case MSG_ERROR: + switch (error_) { + case ERROR_RESPONSE: + // This state could be reached if we get an error in response to an IQ + // or if the network is so slow we time out on an individual IQ exchange. + // In either case, Terminate (send more messages) and ignore the likely + // cascade of more errors. + + // fall through + case ERROR_NETWORK: + case ERROR_TIME: + // Time ran out - no response + Terminate(); + break; + + default: + break; + } + break; + + case MSG_STATE: + switch (state_) { + case STATE_SENTACCEPT: + case STATE_RECEIVEDACCEPT: + set_state(STATE_INPROGRESS); + session_manager_->signaling_thread()->Clear(this, MSG_TIMEOUT); + OnSocketState(); // Update the writability timeout state. + break; + + case STATE_SENTREJECT: + case STATE_SENTREDIRECT: + case STATE_RECEIVEDREJECT: + Terminate(); + break; + + case STATE_SENTTERMINATE: + case STATE_RECEIVEDTERMINATE: + session_manager_->DestroySession(this); + break; + + default: + // explicitly ignoring some states here + break; + } + break; + } +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/session.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/session.h new file mode 100644 index 00000000..1414a375 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/session.h @@ -0,0 +1,140 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _SESSION_H_ +#define _SESSION_H_ + +#include "talk/base/socketaddress.h" +#include "talk/p2p/base/sessiondescription.h" +#include "talk/p2p/base/sessionmanager.h" +#include "talk/p2p/base/sessionmessage.h" +#include "talk/p2p/base/socketmanager.h" +#include "talk/p2p/base/p2psocket.h" +#include "talk/p2p/base/port.h" +#include <string> + +namespace cricket { + +class SessionManager; +class SocketManager; + +// A specific Session created by the SessionManager +// A Session manages signaling for session setup and tear down, and connectivity +// with P2PSockets + +class Session : public MessageHandler, public sigslot::has_slots<> { +public: + enum State { + STATE_INIT = 0, + STATE_SENTINITIATE, // sent initiate, waiting for Accept or Reject + STATE_RECEIVEDINITIATE, // received an initiate. Call Accept or Reject + STATE_SENTACCEPT, // sent accept. begin connectivity establishment + STATE_RECEIVEDACCEPT, // received accept. begin connectivity establishment + STATE_SENTMODIFY, // sent modify, waiting for Accept or Reject + STATE_RECEIVEDMODIFY, // received modify, call Accept or Reject + STATE_SENTREJECT, // sent reject after receiving initiate + STATE_RECEIVEDREJECT, // received reject after sending initiate + STATE_SENTREDIRECT, // sent direct after receiving initiate + STATE_SENTTERMINATE, // sent terminate (any time / either side) + STATE_RECEIVEDTERMINATE, // received terminate (any time / either side) + STATE_INPROGRESS, // session accepted and in progress + }; + + enum Error { + ERROR_NONE = 0, // no error + ERROR_TIME, // no response to signaling + ERROR_RESPONSE, // error during signaling + ERROR_NETWORK, // network error, could not allocate network resources + }; + + Session(SessionManager *session_manager, const std::string &name, const SessionID& id); + ~Session(); + + // From MessageHandler + void OnMessage(Message *pmsg); + + P2PSocket *CreateSocket(const std::string & name); + void DestroySocket(P2PSocket *socket); + + bool Initiate(const std::string &to, const SessionDescription *description); + bool Accept(const SessionDescription *description); + bool Modify(const SessionDescription *description); + bool Reject(); + bool Redirect(const std::string& target); + bool Terminate(); + + SessionManager *session_manager(); + const std::string &name(); + const std::string &remote_address(); + bool initiator(); + const SessionID& id(); + const SessionDescription *description(); + const SessionDescription *remote_description(); + + State state(); + Error error(); + + void OnSignalingReady(); + void OnIncomingMessage(const SessionMessage &m); + void OnIncomingError(const SessionMessage &m); + + sigslot::signal2<Session *, State> SignalState; + sigslot::signal2<Session *, Error> SignalError; + sigslot::signal2<Session *, const SessionMessage &> SignalOutgoingMessage; + sigslot::signal0<> SignalRequestSignaling; + +private: + void SendSessionMessage(SessionMessage::Type type, + const SessionDescription* description, + const std::vector<Candidate>* candidates, + SessionMessage::Cookie* redirect_cookie); + void OnCandidatesReady(const std::vector<Candidate>& candidates); + void OnNetworkError(); + void OnSocketState(); + void OnRequestSignaling(); + void OnRedirectMessage(const SessionMessage &m); + + void set_state(State state); + void set_error(Error error); + + bool initiator_; + SessionManager *session_manager_; + SessionID id_; + SocketManager *socket_manager_; + std::string name_; + std::string remote_address_; + const SessionDescription *description_; + const SessionDescription *remote_description_; + std::string redirect_target_; + State state_; + Error error_; + CriticalSection crit_; +}; + +} // namespace cricket + +#endif // _SESSION_H_ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessiondescription.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessiondescription.h new file mode 100644 index 00000000..28b70845 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessiondescription.h @@ -0,0 +1,42 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _SESSIONDESCRIPTION_H_ +#define _SESSIONDESCRIPTION_H_ + +namespace cricket { + +// The client overrides this with whatever + +class SessionDescription { +public: + virtual ~SessionDescription() {} +}; + +} // namespace cricket + +#endif // _SESSIONDESCRIPTION_H_ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionid.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionid.h new file mode 100644 index 00000000..a12535c0 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionid.h @@ -0,0 +1,94 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _SESSIONID_H_ +#define _SESSIONID_H_ + +#include "talk/base/basictypes.h" +#include <string> +#include <sstream> + +namespace cricket { + +// Each session is identified by a pair (from,id), where id is only +// assumed to be unique to the machine identified by from. +class SessionID { +public: + SessionID() : id_str_("0") { + } + SessionID(const std::string& initiator, uint32 id) + : initiator_(initiator) { + set_id(id); + } + SessionID(const SessionID& sid) + : id_str_(sid.id_str_), initiator_(sid.initiator_) { + } + + void set_id(uint32 id) { + std::stringstream st; + st << id; + st >> id_str_; + } + const std::string id_str() const { + return id_str_; + } + void set_id_str(const std::string &id_str) { + id_str_ = id_str; + } + + const std::string &initiator() const { + return initiator_; + } + void set_initiator(const std::string &initiator) { + initiator_ = initiator; + } + + bool operator <(const SessionID& sid) const { + int r = initiator_.compare(sid.initiator_); + if (r == 0) + r = id_str_.compare(sid.id_str_); + return r < 0; + } + + bool operator ==(const SessionID& sid) const { + return (id_str_ == sid.id_str_) && (initiator_ == sid.initiator_); + } + + SessionID& operator =(const SessionID& sid) { + id_str_ = sid.id_str_; + initiator_ = sid.initiator_; + return *this; + } + +private: + std::string id_str_; + std::string initiator_; +}; + +} // namespace cricket + +#endif // _SESSIONID_H_ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionmanager.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionmanager.cc new file mode 100644 index 00000000..4c1c09d9 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionmanager.cc @@ -0,0 +1,173 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "talk/base/common.h" +#include "talk/p2p/base/helpers.h" +#include "sessionmanager.h" + +namespace cricket { + +SessionManager::SessionManager(PortAllocator *allocator, Thread *worker) { + allocator_ = allocator; + signaling_thread_ = Thread::Current(); + if (worker == NULL) { + worker_thread_ = Thread::Current(); + } else { + worker_thread_ = worker; + } + timeout_ = 50; +} + +SessionManager::~SessionManager() { + // Note: Session::Terminate occurs asynchronously, so it's too late to + // delete them now. They better be all gone. + ASSERT(session_map_.empty()); + //TerminateAll(); +} + +Session *SessionManager::CreateSession(const std::string &name, const std::string& initiator) { + return CreateSession(name, SessionID(initiator, CreateRandomId()), false); +} + +Session *SessionManager::CreateSession(const std::string &name, const SessionID& id, bool received_initiate) { + Session *session = new Session(this, name, id); + session_map_[session->id()] = session; + session->SignalRequestSignaling.connect(this, &SessionManager::OnRequestSignaling); + SignalSessionCreate(session, received_initiate); + return session; +} + +void SessionManager::DestroySession(Session *session) { + if (session != NULL) { + std::map<SessionID, Session *>::iterator it = session_map_.find(session->id()); + if (it != session_map_.end()) { + SignalSessionDestroy(session); + session_map_.erase(it); + delete session; + } + } +} + +Session *SessionManager::GetSession(const SessionID& id) { + // If the id isn't present, the [] operator will make a NULL entry + std::map<SessionID, Session *>::iterator it = session_map_.find(id); + if (it != session_map_.end()) + return (*it).second; + return NULL; +} + +void SessionManager::TerminateAll() { + while (session_map_.begin() != session_map_.end()) { + Session *session = session_map_.begin()->second; + session->Terminate(); + } +} + +void SessionManager::OnIncomingError(const SessionMessage &m) { + // Incoming signaling error. This means, as the result of trying + // to send message m, and error was generated. In all cases, a + // session should already exist + + Session *session; + switch (m.type()) { + case SessionMessage::TYPE_INITIATE: + case SessionMessage::TYPE_ACCEPT: + case SessionMessage::TYPE_MODIFY: + case SessionMessage::TYPE_CANDIDATES: + case SessionMessage::TYPE_REJECT: + case SessionMessage::TYPE_TERMINATE: + session = GetSession(m.session_id()); + break; + + default: + return; + } + + if (session != NULL) + session->OnIncomingError(m); + +} + +void SessionManager::OnIncomingMessage(const SessionMessage &m) { + // In the case of an incoming initiate, there is no session yet, and one needs to be created. + // The other cases have sessions already. + + Session *session; + switch (m.type()) { + case SessionMessage::TYPE_INITIATE: + session = CreateSession(m.name(), m.session_id(), true); + break; + + case SessionMessage::TYPE_ACCEPT: + case SessionMessage::TYPE_MODIFY: + case SessionMessage::TYPE_CANDIDATES: + case SessionMessage::TYPE_REJECT: + case SessionMessage::TYPE_REDIRECT: + case SessionMessage::TYPE_TERMINATE: + session = GetSession(m.session_id()); + break; + + default: + return; + } + + if (session != NULL) + session->OnIncomingMessage(m); +} + +void SessionManager::OnSignalingReady() { + for (std::map<SessionID, Session *>::iterator it = session_map_.begin(); + it != session_map_.end(); ++it) { + it->second->OnSignalingReady(); + } +} + +void SessionManager::OnRequestSignaling() { + SignalRequestSignaling(); +} + +PortAllocator *SessionManager::port_allocator() const { + return allocator_; +} + +Thread *SessionManager::worker_thread() const { + return worker_thread_; +} + +Thread *SessionManager::signaling_thread() const { + return signaling_thread_; +} + +int SessionManager::session_timeout() { + return timeout_; +} + +void SessionManager::set_session_timeout(int timeout) { + timeout_ = timeout; +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionmanager.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionmanager.h new file mode 100644 index 00000000..5ce0e4c5 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionmanager.h @@ -0,0 +1,86 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _SESSIONMANAGER_H_ +#define _SESSIONMANAGER_H_ + +#include "talk/base/thread.h" +#include "talk/p2p/base/portallocator.h" +#include "talk/p2p/base/session.h" +#include "talk/p2p/base/sessionmessage.h" +#include "talk/base/sigslot.h" + +#include <string> +#include <utility> +#include <map> + +namespace cricket { + +class Session; + +// SessionManager manages session instances + +class SessionManager : public sigslot::has_slots<> { +public: + SessionManager(PortAllocator *allocator, Thread *worker_thread = NULL); + virtual ~SessionManager(); + + Session *CreateSession(const std::string &name, const std::string& initiator); + void DestroySession(Session *session); + Session *GetSession(const SessionID& id); + void TerminateAll(); + void OnIncomingMessage(const SessionMessage &m); + void OnIncomingError(const SessionMessage &m); + void OnSignalingReady(); + + PortAllocator *port_allocator() const; + Thread *worker_thread() const; + Thread *signaling_thread() const; + int session_timeout(); + void set_session_timeout(int timeout); + + sigslot::signal2<Session *, bool> SignalSessionCreate; + sigslot::signal1<Session *> SignalSessionDestroy; + + // Note: you can connect this directly to OnSignalingReady(), if a signalling + // check is not required. + sigslot::signal0<> SignalRequestSignaling; + +private: + Session *CreateSession(const std::string &name, const SessionID& id, bool received_initiate); + void OnRequestSignaling(); + + int timeout_; + Thread *worker_thread_; + Thread *signaling_thread_; + PortAllocator *allocator_; + std::map<SessionID, Session *> session_map_; +}; + +} // namespace cricket + +#endif // _SESSIONMANAGER_H_ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionmessage.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionmessage.h new file mode 100644 index 00000000..fc1b0323 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/sessionmessage.h @@ -0,0 +1,133 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _SESSIONMESSAGE_H_ +#define _SESSIONMESSAGE_H_ + +#include "talk/p2p/base/candidate.h" +#include "talk/p2p/base/sessiondescription.h" +#include "talk/p2p/base/sessionid.h" +#include "talk/base/basictypes.h" +#include <string> +#include <vector> +#include <sstream> + +namespace cricket { + +class SessionMessage { +public: + enum Type { + TYPE_INITIATE = 0, // Initiate message + TYPE_ACCEPT, // Accept message + TYPE_MODIFY, // Modify message + TYPE_CANDIDATES, // Candidates message + TYPE_REJECT, // Reject message + TYPE_REDIRECT, // Reject message + TYPE_TERMINATE, // Terminate message + }; + + class Cookie { + public: + virtual ~Cookie() {} + + // Returns a copy of this cookie. + virtual Cookie* Copy() = 0; + }; + + Type type() const { + return type_; + } + void set_type(Type type) { + type_ = type; + } + const SessionID& session_id() const { + return id_; + } + SessionID& session_id() { + return id_; + } + void set_session_id(const SessionID& id) { + id_ = id; + } + const std::string &from() const { + return from_; + } + void set_from(const std::string &from) { + from_ = from; + } + const std::string &to() const { + return to_; + } + void set_to(const std::string &to) { + to_ = to; + } + const std::string &name() const { + return name_; + } + void set_name(const std::string &name) { + name_ = name; + } + const std::string &redirect_target() const { + return redirect_target_; + } + void set_redirect_target(const std::string &redirect_target) { + redirect_target_ = redirect_target; + } + Cookie *redirect_cookie() const { + return redirect_cookie_; + } + void set_redirect_cookie(Cookie* redirect_cookie) { + redirect_cookie_ = redirect_cookie; + } + const SessionDescription *description() const { + return description_; + } + void set_description(const SessionDescription *description) { + description_ = description; + } + const std::vector<Candidate> &candidates() const { + return candidates_; + } + void set_candidates(const std::vector<Candidate> &candidates) { + candidates_ = candidates; + } + +private: + Type type_; + SessionID id_; + std::string from_; + std::string to_; + std::string name_; + const SessionDescription *description_; + std::vector<Candidate> candidates_; + std::string redirect_target_; + Cookie* redirect_cookie_; +}; + +} // namespace cricket + +#endif // _SESSIONMESSAGE_H_ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/socketmanager.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/socketmanager.cc new file mode 100644 index 00000000..2f0d67b8 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/socketmanager.cc @@ -0,0 +1,273 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif +#include "socketmanager.h" +#include <cassert> + +namespace cricket { + +const uint32 MSG_CREATESOCKET = 1; +const uint32 MSG_DESTROYSOCKET = 2; +const uint32 MSG_ONSIGNALINGREADY = 3; +const uint32 MSG_CANDIDATESREADY = 4; +const uint32 MSG_ADDREMOTECANDIDATES = 5; +const uint32 MSG_ONREQUESTSIGNALING = 6; +const uint32 MSG_RESETSOCKETS = 7; + +struct CreateParams { + CreateParams() {} + P2PSocket *socket; + std::string name; +}; + +SocketManager::SocketManager(SessionManager *session_manager) { + session_manager_ = session_manager; + candidates_requested_ = false; + writable_ = false; +} + +SocketManager::~SocketManager() { + assert(Thread::Current() == session_manager_->signaling_thread()); + + // Are the sockets destroyed? If not, destroy them + + critSM_.Enter(); + while (sockets_.size() != 0) { + P2PSocket *socket = sockets_[0]; + critSM_.Leave(); + DestroySocket(socket); + critSM_.Enter(); + } + critSM_.Leave(); + + // Clear queues + + session_manager_->signaling_thread()->Clear(this); + session_manager_->worker_thread()->Clear(this); +} + +P2PSocket *SocketManager::CreateSocket(const std::string &name) { + // Can occur on any thread + CreateParams params; + params.name = name; + params.socket = NULL; + TypedMessageData<CreateParams *> data(¶ms); + session_manager_->worker_thread()->Send(this, MSG_CREATESOCKET, &data); + return data.data()->socket; +} + +P2PSocket *SocketManager::CreateSocket_w(const std::string &name) { + // Only on worker thread + assert(Thread::Current() == session_manager_->worker_thread()); + CritScope cs(&critSM_); + P2PSocket *socket = new P2PSocket(name, session_manager_->port_allocator()); + socket->SignalCandidatesReady.connect(this, &SocketManager::OnCandidatesReady); + socket->SignalState.connect(this, &SocketManager::OnSocketState); + socket->SignalRequestSignaling.connect(this, &SocketManager::OnRequestSignaling); + sockets_.push_back(socket); + socket->StartProcessingCandidates(); + return socket; +} + +void SocketManager::DestroySocket(P2PSocket *socket) { + // Can occur on any thread + TypedMessageData<P2PSocket *> data(socket); + session_manager_->worker_thread()->Send(this, MSG_DESTROYSOCKET, &data); +} + +void SocketManager::DestroySocket_w(P2PSocket *socket) { + // Only on worker thread + assert(Thread::Current() == session_manager_->worker_thread()); + + // Only if socket exists + CritScope cs(&critSM_); + std::vector<P2PSocket *>::iterator it; + it = std::find(sockets_.begin(), sockets_.end(), socket); + if (it == sockets_.end()) + return; + sockets_.erase(it); + delete socket; +} + +void SocketManager::StartProcessingCandidates() { + // Only on signaling thread + assert(Thread::Current() == session_manager_->signaling_thread()); + + // When sockets are created, their candidates are requested. + // When the candidates are ready, the client is signaled + // on the signaling thread + candidates_requested_ = true; + session_manager_->signaling_thread()->Post(this, MSG_CANDIDATESREADY); +} + +void SocketManager::OnSignalingReady() { + session_manager_->worker_thread()->Post(this, MSG_ONSIGNALINGREADY); +} + +void SocketManager::OnSignalingReady_w() { + // Only on worker thread + assert(Thread::Current() == session_manager_->worker_thread()); + for (uint32 i = 0; i < sockets_.size(); ++i) { + sockets_[i]->OnSignalingReady(); + } +} + +void SocketManager::OnCandidatesReady( + P2PSocket *socket, const std::vector<Candidate>& candidates) { + // Only on worker thread + assert(Thread::Current() == session_manager_->worker_thread()); + + // Remember candidates + CritScope cs(&critSM_); + std::vector<Candidate>::const_iterator it; + for (it = candidates.begin(); it != candidates.end(); it++) + candidates_.push_back(*it); + + // If candidates requested, tell signaling thread + if (candidates_requested_) + session_manager_->signaling_thread()->Post(this, MSG_CANDIDATESREADY); +} + +void SocketManager::ResetSockets() { + assert(Thread::Current() == session_manager_->signaling_thread()); + session_manager_->worker_thread()->Post(this, MSG_RESETSOCKETS); +} + +void SocketManager::ResetSockets_w() { + assert(Thread::Current() == session_manager_->worker_thread()); + + for (size_t i = 0; i < sockets_.size(); ++i) + sockets_[i]->Reset(); +} + +void SocketManager::OnSocketState(P2PSocket* socket, P2PSocket::State state) { + assert(Thread::Current() == session_manager_->worker_thread()); + + bool writable = false; + for (uint32 i = 0; i < sockets_.size(); ++i) + if (sockets_[i]->writable()) + writable = true; + + if (writable_ != writable) { + writable_ = writable; + SignalState(); + } +} + +void SocketManager::OnRequestSignaling() { + assert(Thread::Current() == session_manager_->worker_thread()); + session_manager_->signaling_thread()->Post(this, MSG_ONREQUESTSIGNALING); +} + + +void SocketManager::AddRemoteCandidates(const std::vector<Candidate> &remote_candidates) { + assert(Thread::Current() == session_manager_->signaling_thread()); + TypedMessageData<std::vector<Candidate> > *data = new TypedMessageData<std::vector<Candidate> >(remote_candidates); + session_manager_->worker_thread()->Post(this, MSG_ADDREMOTECANDIDATES, data); +} + +void SocketManager::AddRemoteCandidates_w(const std::vector<Candidate> &remote_candidates) { + assert(Thread::Current() == session_manager_->worker_thread()); + + // Local and remote candidates now exist, so connectivity checking can + // commence. Tell the P2PSockets about the remote candidates. + // Group candidates by socket name + + CritScope cs(&critSM_); + std::vector<P2PSocket *>::iterator it_socket; + for (it_socket = sockets_.begin(); it_socket != sockets_.end(); it_socket++) { + // Create a vector of remote candidates for each socket + std::string name = (*it_socket)->name(); + std::vector<Candidate> candidate_bundle; + std::vector<Candidate>::const_iterator it_candidate; + for (it_candidate = remote_candidates.begin(); it_candidate != remote_candidates.end(); it_candidate++) { + if ((*it_candidate).name() == name) + candidate_bundle.push_back(*it_candidate); + } + if (candidate_bundle.size() != 0) + (*it_socket)->AddRemoteCandidates(candidate_bundle); + } +} + +void SocketManager::OnMessage(Message *message) { + switch (message->message_id) { + case MSG_CREATESOCKET: + { + assert(Thread::Current() == session_manager_->worker_thread()); + TypedMessageData<CreateParams *> *params = static_cast<TypedMessageData<CreateParams *> *>(message->pdata); + params->data()->socket = CreateSocket_w(params->data()->name); + } + break; + + case MSG_DESTROYSOCKET: + { + assert(Thread::Current() == session_manager_->worker_thread()); + TypedMessageData<P2PSocket *> *data = static_cast<TypedMessageData<P2PSocket *> *>(message->pdata); + DestroySocket_w(data->data()); + } + break; + + case MSG_ONSIGNALINGREADY: + assert(Thread::Current() == session_manager_->worker_thread()); + OnSignalingReady_w(); + break; + + case MSG_ONREQUESTSIGNALING: + assert(Thread::Current() == session_manager_->signaling_thread()); + SignalRequestSignaling(); + break; + + case MSG_CANDIDATESREADY: + assert(Thread::Current() == session_manager_->signaling_thread()); + if (candidates_requested_) { + CritScope cs(&critSM_); + if (candidates_.size() > 0) { + SignalCandidatesReady(candidates_); + candidates_.clear(); + } + } + break; + + case MSG_ADDREMOTECANDIDATES: + { + assert(Thread::Current() == session_manager_->worker_thread()); + TypedMessageData<const std::vector<Candidate> > *data = static_cast<TypedMessageData<const std::vector<Candidate> > *>(message->pdata); + AddRemoteCandidates_w(data->data()); + delete data; + } + break; + + case MSG_RESETSOCKETS: + ResetSockets_w(); + break; + } +} + +} diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/socketmanager.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/socketmanager.h new file mode 100644 index 00000000..3ca1cf74 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/socketmanager.h @@ -0,0 +1,101 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _SOCKETMANAGER_H_ +#define _SOCKETMANAGER_H_ + +#include "talk/base/criticalsection.h" +#include "talk/base/messagequeue.h" +#include "talk/base/sigslot.h" +#include "talk/p2p/base/sessionmanager.h" +#include "talk/p2p/base/candidate.h" +#include "talk/p2p/base/p2psocket.h" +#include "talk/p2p/base/socketmanager.h" + +#include <string> +#include <vector> + +namespace cricket { + +class SessionManager; + +// Manages P2PSocket creation/destruction/readiness. +// Provides thread separation between session and sockets. +// This allows session to execute on the signaling thread, +// and sockets to execute on the worker thread, if desired, +// which is good for some media types (audio/video for example). + +class SocketManager : public MessageHandler, public sigslot::has_slots<> { +public: + SocketManager(SessionManager *session_manager); + virtual ~SocketManager(); + + // Determines whether any of the created sockets are currently writable. + bool writable() { return writable_; } + + P2PSocket *CreateSocket(const std::string & name); + void DestroySocket(P2PSocket *socket); + + // Start discovering local candidates + void StartProcessingCandidates(); + + // Adds the given candidates that were sent by the remote side. + void AddRemoteCandidates(const std::vector<Candidate>& candidates); + + // signaling channel is up, ready to transmit candidates as they are discovered + void OnSignalingReady(); + + // Put all of the sockets back into the initial state. + void ResetSockets(); + + sigslot::signal1<const std::vector<Candidate>&> SignalCandidatesReady; + sigslot::signal0<> SignalNetworkError; + sigslot::signal0<> SignalState; + sigslot::signal0<> SignalRequestSignaling; + +private: + P2PSocket *CreateSocket_w(const std::string &name); + void DestroySocket_w(P2PSocket *socket); + void OnSignalingReady_w(); + void AddRemoteCandidates_w(const std::vector<Candidate> &candidates); + virtual void OnMessage(Message *message); + void OnCandidatesReady(P2PSocket *socket, const std::vector<Candidate>&); + void OnSocketState(P2PSocket* socket, P2PSocket::State state); + void OnRequestSignaling(void); + void ResetSockets_w(); + + SessionManager *session_manager_; + std::vector<Candidate> candidates_; + CriticalSection critSM_; + std::vector<P2PSocket *> sockets_; + bool candidates_requested_; + bool writable_; +}; + +} + +#endif // _SOCKETMANAGER_H_ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stun.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stun.cc new file mode 100644 index 00000000..6a22b238 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stun.cc @@ -0,0 +1,576 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "talk/base/logging.h" +#include "talk/p2p/base/stun.h" +#include <iostream> +#include <cassert> + +#if defined(_MSC_VER) && _MSC_VER < 1300 +namespace std { + using ::memcpy; +} +#endif + +namespace cricket { + +const std::string STUN_ERROR_REASON_BAD_REQUEST = "BAD REQUEST"; +const std::string STUN_ERROR_REASON_UNAUTHORIZED = "UNAUTHORIZED"; +const std::string STUN_ERROR_REASON_UNKNOWN_ATTRIBUTE = "UNKNOWN ATTRIBUTE"; +const std::string STUN_ERROR_REASON_STALE_CREDENTIALS = "STALE CREDENTIALS"; +const std::string STUN_ERROR_REASON_INTEGRITY_CHECK_FAILURE = "INTEGRITY CHECK FAILURE"; +const std::string STUN_ERROR_REASON_MISSING_USERNAME = "MISSING USERNAME"; +const std::string STUN_ERROR_REASON_USE_TLS = "USE TLS"; +const std::string STUN_ERROR_REASON_SERVER_ERROR = "SERVER ERROR"; +const std::string STUN_ERROR_REASON_GLOBAL_FAILURE = "GLOBAL FAILURE"; + +StunMessage::StunMessage() : type_(0), length_(0), + transaction_id_("0000000000000000") { + assert(transaction_id_.size() == 16); + attrs_ = new std::vector<StunAttribute*>(); +} + +StunMessage::~StunMessage() { + for (unsigned i = 0; i < attrs_->size(); i++) + delete (*attrs_)[i]; + delete attrs_; +} + +void StunMessage::SetTransactionID(const std::string& str) { + assert(str.size() == 16); + transaction_id_ = str; +} + +void StunMessage::AddAttribute(StunAttribute* attr) { + attrs_->push_back(attr); + length_ += attr->length() + 4; +} + +const StunAddressAttribute* +StunMessage::GetAddress(StunAttributeType type) const { + switch (type) { + case STUN_ATTR_MAPPED_ADDRESS: + case STUN_ATTR_RESPONSE_ADDRESS: + case STUN_ATTR_SOURCE_ADDRESS: + case STUN_ATTR_CHANGED_ADDRESS: + case STUN_ATTR_REFLECTED_FROM: + case STUN_ATTR_ALTERNATE_SERVER: + case STUN_ATTR_DESTINATION_ADDRESS: + case STUN_ATTR_SOURCE_ADDRESS2: + return reinterpret_cast<const StunAddressAttribute*>(GetAttribute(type)); + + default: + assert(0); + return 0; + } +} + +const StunUInt32Attribute* +StunMessage::GetUInt32(StunAttributeType type) const { + switch (type) { + case STUN_ATTR_CHANGE_REQUEST: + case STUN_ATTR_LIFETIME: + case STUN_ATTR_BANDWIDTH: + case STUN_ATTR_OPTIONS: + return reinterpret_cast<const StunUInt32Attribute*>(GetAttribute(type)); + + default: + assert(0); + return 0; + } +} + +const StunByteStringAttribute* +StunMessage::GetByteString(StunAttributeType type) const { + switch (type) { + case STUN_ATTR_USERNAME: + case STUN_ATTR_PASSWORD: + case STUN_ATTR_MESSAGE_INTEGRITY: + case STUN_ATTR_DATA: + case STUN_ATTR_MAGIC_COOKIE: + return reinterpret_cast<const StunByteStringAttribute*>(GetAttribute(type)); + + default: + assert(0); + return 0; + } +} + +const StunErrorCodeAttribute* StunMessage::GetErrorCode() const { + return reinterpret_cast<const StunErrorCodeAttribute*>( + GetAttribute(STUN_ATTR_ERROR_CODE)); +} + +const StunUInt16ListAttribute* StunMessage::GetUnknownAttributes() const { + return reinterpret_cast<const StunUInt16ListAttribute*>( + GetAttribute(STUN_ATTR_UNKNOWN_ATTRIBUTES)); +} + +const StunTransportPrefsAttribute* StunMessage::GetTransportPrefs() const { + return reinterpret_cast<const StunTransportPrefsAttribute*>( + GetAttribute(STUN_ATTR_TRANSPORT_PREFERENCES)); +} + +const StunAttribute* StunMessage::GetAttribute(StunAttributeType type) const { + for (unsigned i = 0; i < attrs_->size(); i++) { + if ((*attrs_)[i]->type() == type) + return (*attrs_)[i]; + } + return 0; +} + +bool StunMessage::Read(ByteBuffer* buf) { + if (!buf->ReadUInt16(type_)) + return false; + + if (!buf->ReadUInt16(length_)) + return false; + + std::string transaction_id; + if (!buf->ReadString(transaction_id, 16)) + return false; + assert(transaction_id.size() == 16); + transaction_id_ = transaction_id; + + if (length_ > buf->Length()) + return false; + + attrs_->resize(0); + + size_t rest = buf->Length() - length_; + while (buf->Length() > rest) { + uint16 attr_type, attr_length; + if (!buf->ReadUInt16(attr_type)) + return false; + if (!buf->ReadUInt16(attr_length)) + return false; + + StunAttribute* attr = StunAttribute::Create(attr_type, attr_length); + if (!attr || !attr->Read(buf)) + return false; + + attrs_->push_back(attr); + } + + if (buf->Length() != rest) { + // fixme: shouldn't be doing this + LOG(LERROR) << "wrong message length" + << " (" << (int)rest << " != " << (int)buf->Length() << ")"; + return false; + } + + return true; +} + +void StunMessage::Write(ByteBuffer* buf) const { + buf->WriteUInt16(type_); + buf->WriteUInt16(length_); + buf->WriteString(transaction_id_); + + for (unsigned i = 0; i < attrs_->size(); i++) { + buf->WriteUInt16((*attrs_)[i]->type()); + buf->WriteUInt16((*attrs_)[i]->length()); + (*attrs_)[i]->Write(buf); + } +} + +StunAttribute::StunAttribute(uint16 type, uint16 length) + : type_(type), length_(length) { +} + +StunAttribute* StunAttribute::Create(uint16 type, uint16 length) { + switch (type) { + case STUN_ATTR_MAPPED_ADDRESS: + case STUN_ATTR_RESPONSE_ADDRESS: + case STUN_ATTR_SOURCE_ADDRESS: + case STUN_ATTR_CHANGED_ADDRESS: + case STUN_ATTR_REFLECTED_FROM: + case STUN_ATTR_ALTERNATE_SERVER: + case STUN_ATTR_DESTINATION_ADDRESS: + case STUN_ATTR_SOURCE_ADDRESS2: + if (length != StunAddressAttribute::SIZE) + return 0; + return new StunAddressAttribute(type); + + case STUN_ATTR_CHANGE_REQUEST: + case STUN_ATTR_LIFETIME: + case STUN_ATTR_BANDWIDTH: + case STUN_ATTR_OPTIONS: + if (length != StunUInt32Attribute::SIZE) + return 0; + return new StunUInt32Attribute(type); + + case STUN_ATTR_USERNAME: + case STUN_ATTR_PASSWORD: + case STUN_ATTR_MAGIC_COOKIE: + return (length % 4 == 0) ? new StunByteStringAttribute(type, length) : 0; + + case STUN_ATTR_MESSAGE_INTEGRITY: + return (length == 20) ? new StunByteStringAttribute(type, length) : 0; + + case STUN_ATTR_DATA: + return new StunByteStringAttribute(type, length); + + case STUN_ATTR_ERROR_CODE: + if (length < StunErrorCodeAttribute::MIN_SIZE) + return 0; + return new StunErrorCodeAttribute(type, length); + + case STUN_ATTR_UNKNOWN_ATTRIBUTES: + return (length % 2 == 0) ? new StunUInt16ListAttribute(type, length) : 0; + + case STUN_ATTR_TRANSPORT_PREFERENCES: + if ((length != StunTransportPrefsAttribute::SIZE1) && + (length != StunTransportPrefsAttribute::SIZE2)) + return 0; + return new StunTransportPrefsAttribute(type, length); + + default: + return 0; + } +} + +StunAddressAttribute* StunAttribute::CreateAddress(uint16 type) { + switch (type) { + case STUN_ATTR_MAPPED_ADDRESS: + case STUN_ATTR_RESPONSE_ADDRESS: + case STUN_ATTR_SOURCE_ADDRESS: + case STUN_ATTR_CHANGED_ADDRESS: + case STUN_ATTR_REFLECTED_FROM: + case STUN_ATTR_ALTERNATE_SERVER: + case STUN_ATTR_DESTINATION_ADDRESS: + case STUN_ATTR_SOURCE_ADDRESS2: + return new StunAddressAttribute(type); + + default: + assert(false); + return 0; + } +} + +StunUInt32Attribute* StunAttribute::CreateUInt32(uint16 type) { + switch (type) { + case STUN_ATTR_CHANGE_REQUEST: + case STUN_ATTR_LIFETIME: + case STUN_ATTR_BANDWIDTH: + case STUN_ATTR_OPTIONS: + return new StunUInt32Attribute(type); + + default: + assert(false); + return 0; + } +} + +StunByteStringAttribute* StunAttribute::CreateByteString(uint16 type) { + switch (type) { + case STUN_ATTR_USERNAME: + case STUN_ATTR_PASSWORD: + case STUN_ATTR_MESSAGE_INTEGRITY: + case STUN_ATTR_DATA: + case STUN_ATTR_MAGIC_COOKIE: + return new StunByteStringAttribute(type, 0); + + default: + assert(false); + return 0; + } +} + +StunErrorCodeAttribute* StunAttribute::CreateErrorCode() { + return new StunErrorCodeAttribute( + STUN_ATTR_ERROR_CODE, StunErrorCodeAttribute::MIN_SIZE); +} + +StunUInt16ListAttribute* StunAttribute::CreateUnknownAttributes() { + return new StunUInt16ListAttribute(STUN_ATTR_UNKNOWN_ATTRIBUTES, 0); +} + +StunTransportPrefsAttribute* StunAttribute::CreateTransportPrefs() { + return new StunTransportPrefsAttribute( + STUN_ATTR_TRANSPORT_PREFERENCES, StunTransportPrefsAttribute::SIZE1); +} + +StunAddressAttribute::StunAddressAttribute(uint16 type) + : StunAttribute(type, SIZE), family_(0), port_(0), ip_(0) { +} + +bool StunAddressAttribute::Read(ByteBuffer* buf) { + uint8 dummy; + if (!buf->ReadUInt8(dummy)) + return false; + if (!buf->ReadUInt8(family_)) + return false; + if (!buf->ReadUInt16(port_)) + return false; + if (!buf->ReadUInt32(ip_)) + return false; + return true; +} + +void StunAddressAttribute::Write(ByteBuffer* buf) const { + buf->WriteUInt8(0); + buf->WriteUInt8(family_); + buf->WriteUInt16(port_); + buf->WriteUInt32(ip_); +} + +StunUInt32Attribute::StunUInt32Attribute(uint16 type) + : StunAttribute(type, SIZE), bits_(0) { +} + +bool StunUInt32Attribute::GetBit(int index) const { + assert((0 <= index) && (index < 32)); + return static_cast<bool>((bits_ >> index) & 0x1); +} + +void StunUInt32Attribute::SetBit(int index, bool value) { + assert((0 <= index) && (index < 32)); + bits_ &= ~(1 << index); + bits_ |= value ? (1 << index) : 0; +} + +bool StunUInt32Attribute::Read(ByteBuffer* buf) { + if (!buf->ReadUInt32(bits_)) + return false; + return true; +} + +void StunUInt32Attribute::Write(ByteBuffer* buf) const { + buf->WriteUInt32(bits_); +} + +StunByteStringAttribute::StunByteStringAttribute(uint16 type, uint16 length) + : StunAttribute(type, length), bytes_(0) { +} + +StunByteStringAttribute::~StunByteStringAttribute() { + delete [] bytes_; +} + +void StunByteStringAttribute::SetBytes(char* bytes, uint16 length) { + delete [] bytes_; + bytes_ = bytes; + SetLength(length); +} + +void StunByteStringAttribute::CopyBytes(const char* bytes) { + CopyBytes(bytes, (uint16)strlen(bytes)); +} + +void StunByteStringAttribute::CopyBytes(const void* bytes, uint16 length) { + char* new_bytes = new char[length]; + std::memcpy(new_bytes, bytes, length); + SetBytes(new_bytes, length); +} + +uint8 StunByteStringAttribute::GetByte(int index) const { + assert(bytes_); + assert((0 <= index) && (index < length())); + return static_cast<uint8>(bytes_[index]); +} + +void StunByteStringAttribute::SetByte(int index, uint8 value) { + assert(bytes_); + assert((0 <= index) && (index < length())); + bytes_[index] = value; +} + +bool StunByteStringAttribute::Read(ByteBuffer* buf) { + bytes_ = new char[length()]; + if (!buf->ReadBytes(bytes_, length())) + return false; + return true; +} + +void StunByteStringAttribute::Write(ByteBuffer* buf) const { + buf->WriteBytes(bytes_, length()); +} + +StunErrorCodeAttribute::StunErrorCodeAttribute(uint16 type, uint16 length) + : StunAttribute(type, length), class_(0), number_(0) { +} + +StunErrorCodeAttribute::~StunErrorCodeAttribute() { +} + +void StunErrorCodeAttribute::SetErrorCode(uint32 code) { + class_ = (uint8)((code >> 8) & 0x7); + number_ = (uint8)(code & 0xff); +} + +void StunErrorCodeAttribute::SetReason(const std::string& reason) { + SetLength(MIN_SIZE + (uint16)reason.size()); + reason_ = reason; +} + +bool StunErrorCodeAttribute::Read(ByteBuffer* buf) { + uint32 val; + if (!buf->ReadUInt32(val)) + return false; + + if ((val >> 11) != 0) + LOG(LERROR) << "error-code bits not zero"; + + SetErrorCode(val); + + if (!buf->ReadString(reason_, length() - 4)) + return false; + + return true; +} + +void StunErrorCodeAttribute::Write(ByteBuffer* buf) const { + buf->WriteUInt32(error_code()); + buf->WriteString(reason_); +} + +StunUInt16ListAttribute::StunUInt16ListAttribute(uint16 type, uint16 length) + : StunAttribute(type, length) { + attr_types_ = new std::vector<uint16>(); +} + +StunUInt16ListAttribute::~StunUInt16ListAttribute() { + delete attr_types_; +} + +size_t StunUInt16ListAttribute::Size() const { + return attr_types_->size(); +} + +uint16 StunUInt16ListAttribute::GetType(int index) const { + return (*attr_types_)[index]; +} + +void StunUInt16ListAttribute::SetType(int index, uint16 value) { + (*attr_types_)[index] = value; +} + +void StunUInt16ListAttribute::AddType(uint16 value) { + attr_types_->push_back(value); + SetLength((uint16)attr_types_->size() * 2); +} + +bool StunUInt16ListAttribute::Read(ByteBuffer* buf) { + for (int i = 0; i < length() / 2; i++) { + uint16 attr; + if (!buf->ReadUInt16(attr)) + return false; + attr_types_->push_back(attr); + } + return true; +} + +void StunUInt16ListAttribute::Write(ByteBuffer* buf) const { + for (unsigned i = 0; i < attr_types_->size(); i++) + buf->WriteUInt16((*attr_types_)[i]); +} + +StunTransportPrefsAttribute::StunTransportPrefsAttribute( + uint16 type, uint16 length) + : StunAttribute(type, length), preallocate_(false), prefs_(0), addr_(0) { +} + +StunTransportPrefsAttribute::~StunTransportPrefsAttribute() { + delete addr_; +} + +void StunTransportPrefsAttribute::SetPreallocateAddress( + StunAddressAttribute* addr) { + if (!addr) { + preallocate_ = false; + addr_ = 0; + SetLength(SIZE1); + } else { + preallocate_ = true; + addr_ = addr; + SetLength(SIZE2); + } +} + +bool StunTransportPrefsAttribute::Read(ByteBuffer* buf) { + uint32 val; + if (!buf->ReadUInt32(val)) + return false; + + if ((val >> 3) != 0) + LOG(LERROR) << "transport-preferences bits not zero"; + + preallocate_ = static_cast<bool>((val >> 2) & 0x1); + prefs_ = (uint8)(val & 0x3); + + if (preallocate_ && (prefs_ == 3)) + LOG(LERROR) << "transport-preferences imcompatible P and Typ"; + + if (!preallocate_) { + if (length() != StunUInt32Attribute::SIZE) + return false; + } else { + if (length() != StunUInt32Attribute::SIZE + StunAddressAttribute::SIZE) + return false; + + addr_ = new StunAddressAttribute(STUN_ATTR_SOURCE_ADDRESS); + addr_->Read(buf); + } + + return true; +} + +void StunTransportPrefsAttribute::Write(ByteBuffer* buf) const { + buf->WriteUInt32((preallocate_ ? 4 : 0) | prefs_); + + if (preallocate_) + addr_->Write(buf); +} + +StunMessageType GetStunResponseType(StunMessageType request_type) { + switch (request_type) { + case STUN_SHARED_SECRET_REQUEST: + return STUN_SHARED_SECRET_RESPONSE; + case STUN_ALLOCATE_REQUEST: + return STUN_ALLOCATE_RESPONSE; + case STUN_SEND_REQUEST: + return STUN_SEND_RESPONSE; + default: + return STUN_BINDING_RESPONSE; + } +} + +StunMessageType GetStunErrorResponseType(StunMessageType request_type) { + switch (request_type) { + case STUN_SHARED_SECRET_REQUEST: + return STUN_SHARED_SECRET_ERROR_RESPONSE; + case STUN_ALLOCATE_REQUEST: + return STUN_ALLOCATE_ERROR_RESPONSE; + case STUN_SEND_REQUEST: + return STUN_SEND_ERROR_RESPONSE; + default: + return STUN_BINDING_ERROR_RESPONSE; + } +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stun.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stun.h new file mode 100644 index 00000000..27a8e4be --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stun.h @@ -0,0 +1,364 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __STUN_H__ +#define __STUN_H__ + +// This file contains classes for dealing with the STUN and TURN protocols. +// Both protocols use the same wire format. + +#include "talk/base/basictypes.h" +#include "talk/base/bytebuffer.h" +#include <string> +#include <vector> + +namespace cricket { + +// These are the types of STUN & TURN messages as of last check. +enum StunMessageType { + STUN_BINDING_REQUEST = 0x0001, + STUN_BINDING_RESPONSE = 0x0101, + STUN_BINDING_ERROR_RESPONSE = 0x0111, + STUN_SHARED_SECRET_REQUEST = 0x0002, + STUN_SHARED_SECRET_RESPONSE = 0x0102, + STUN_SHARED_SECRET_ERROR_RESPONSE = 0x0112, + STUN_ALLOCATE_REQUEST = 0x0003, + STUN_ALLOCATE_RESPONSE = 0x0103, + STUN_ALLOCATE_ERROR_RESPONSE = 0x0113, + STUN_SEND_REQUEST = 0x0004, + STUN_SEND_RESPONSE = 0x0104, + STUN_SEND_ERROR_RESPONSE = 0x0114, + STUN_DATA_INDICATION = 0x0115 +}; + +// These are the types of attributes defined in STUN & TURN. Next to each is +// the name of the class (T is StunTAttribute) that implements that type. +enum StunAttributeType { + STUN_ATTR_MAPPED_ADDRESS = 0x0001, // Address + STUN_ATTR_RESPONSE_ADDRESS = 0x0002, // Address + STUN_ATTR_CHANGE_REQUEST = 0x0003, // UInt32 + STUN_ATTR_SOURCE_ADDRESS = 0x0004, // Address + STUN_ATTR_CHANGED_ADDRESS = 0x0005, // Address + STUN_ATTR_USERNAME = 0x0006, // ByteString, multiple of 4 bytes + STUN_ATTR_PASSWORD = 0x0007, // ByteString, multiple of 4 bytes + STUN_ATTR_MESSAGE_INTEGRITY = 0x0008, // ByteString, 20 bytes + STUN_ATTR_ERROR_CODE = 0x0009, // ErrorCode + STUN_ATTR_UNKNOWN_ATTRIBUTES = 0x000a, // UInt16List + STUN_ATTR_REFLECTED_FROM = 0x000b, // Address + STUN_ATTR_TRANSPORT_PREFERENCES = 0x000c, // TransportPrefs + STUN_ATTR_LIFETIME = 0x000d, // UInt32 + STUN_ATTR_ALTERNATE_SERVER = 0x000e, // Address + STUN_ATTR_MAGIC_COOKIE = 0x000f, // ByteString, 4 bytes + STUN_ATTR_BANDWIDTH = 0x0010, // UInt32 + STUN_ATTR_DESTINATION_ADDRESS = 0x0011, // Address + STUN_ATTR_SOURCE_ADDRESS2 = 0x0012, // Address + STUN_ATTR_DATA = 0x0013, // ByteString + STUN_ATTR_OPTIONS = 0x8001 // UInt32 +}; + +enum StunErrorCodes { + STUN_ERROR_BAD_REQUEST = 400, + STUN_ERROR_UNAUTHORIZED = 401, + STUN_ERROR_UNKNOWN_ATTRIBUTE = 420, + STUN_ERROR_STALE_CREDENTIALS = 430, + STUN_ERROR_INTEGRITY_CHECK_FAILURE = 431, + STUN_ERROR_MISSING_USERNAME = 432, + STUN_ERROR_USE_TLS = 433, + STUN_ERROR_SERVER_ERROR = 500, + STUN_ERROR_GLOBAL_FAILURE = 600 +}; + +extern const std::string STUN_ERROR_REASON_BAD_REQUEST; +extern const std::string STUN_ERROR_REASON_UNAUTHORIZED; +extern const std::string STUN_ERROR_REASON_UNKNOWN_ATTRIBUTE; +extern const std::string STUN_ERROR_REASON_STALE_CREDENTIALS; +extern const std::string STUN_ERROR_REASON_INTEGRITY_CHECK_FAILURE; +extern const std::string STUN_ERROR_REASON_MISSING_USERNAME; +extern const std::string STUN_ERROR_REASON_USE_TLS; +extern const std::string STUN_ERROR_REASON_SERVER_ERROR; +extern const std::string STUN_ERROR_REASON_GLOBAL_FAILURE; + +class StunAttribute; +class StunAddressAttribute; +class StunUInt32Attribute; +class StunByteStringAttribute; +class StunErrorCodeAttribute; +class StunUInt16ListAttribute; +class StunTransportPrefsAttribute; + +// Records a complete STUN/TURN message. Each message consists of a type and +// any number of attributes. Each attribute is parsed into an instance of an +// appropriate class (see above). The Get* methods will return instances of +// that attribute class. +class StunMessage { +public: + StunMessage(); + ~StunMessage(); + + StunMessageType type() const { return static_cast<StunMessageType>(type_); } + uint16 length() const { return length_; } + const std::string& transaction_id() const { return transaction_id_; } + + void SetType(StunMessageType type) { type_ = type; } + void SetTransactionID(const std::string& str); + + const StunAddressAttribute* GetAddress(StunAttributeType type) const; + const StunUInt32Attribute* GetUInt32(StunAttributeType type) const; + const StunByteStringAttribute* GetByteString(StunAttributeType type) const; + const StunErrorCodeAttribute* GetErrorCode() const; + const StunUInt16ListAttribute* GetUnknownAttributes() const; + const StunTransportPrefsAttribute* GetTransportPrefs() const; + + void AddAttribute(StunAttribute* attr); + + // Parses the STUN/TURN packet in the given buffer and records it here. The + // return value indicates whether this was successful. + bool Read(ByteBuffer* buf); + + // Writes this object into a STUN/TURN packet. Return value is true if + // successful. + void Write(ByteBuffer* buf) const; + +private: + uint16 type_; + uint16 length_; + std::string transaction_id_; + std::vector<StunAttribute*>* attrs_; + + const StunAttribute* GetAttribute(StunAttributeType type) const; +}; + +// Base class for all STUN/TURN attributes. +class StunAttribute { +public: + virtual ~StunAttribute() {} + + StunAttributeType type() const { + return static_cast<StunAttributeType>(type_); + } + uint16 length() const { return length_; } + + // Reads the body (not the type or length) for this type of attribute from + // the given buffer. Return value is true if successful. + virtual bool Read(ByteBuffer* buf) = 0; + + // Writes the body (not the type or length) to the given buffer. Return + // value is true if successful. + virtual void Write(ByteBuffer* buf) const = 0; + + // Creates an attribute object with the given type and len. + static StunAttribute* Create(uint16 type, uint16 length); + + // Creates an attribute object with the given type and smallest length. + static StunAddressAttribute* CreateAddress(uint16 type); + static StunUInt32Attribute* CreateUInt32(uint16 type); + static StunByteStringAttribute* CreateByteString(uint16 type); + static StunErrorCodeAttribute* CreateErrorCode(); + static StunUInt16ListAttribute* CreateUnknownAttributes(); + static StunTransportPrefsAttribute* CreateTransportPrefs(); + +protected: + StunAttribute(uint16 type, uint16 length); + + void SetLength(uint16 length) { length_ = length; } + +private: + uint16 type_; + uint16 length_; +}; + +// Implements STUN/TURN attributes that record an Internet address. +class StunAddressAttribute : public StunAttribute { +public: + StunAddressAttribute(uint16 type); + +#if (_MSC_VER < 1300) + enum { SIZE = 8 }; +#else + static const uint16 SIZE = 8; +#endif + + uint8 family() const { return family_; } + uint16 port() const { return port_; } + uint32 ip() const { return ip_; } + + void SetFamily(uint8 family) { family_ = family; } + void SetIP(uint32 ip) { ip_ = ip; } + void SetPort(uint16 port) { port_ = port; } + + bool Read(ByteBuffer* buf); + void Write(ByteBuffer* buf) const; + +private: + uint8 family_; + uint16 port_; + uint32 ip_; +}; + +// Implements STUN/TURN attributs that record a 32-bit integer. +class StunUInt32Attribute : public StunAttribute { +public: + StunUInt32Attribute(uint16 type); + +#if (_MSC_VER < 1300) + enum { SIZE = 4 }; +#else + static const uint16 SIZE = 4; +#endif + + uint32 value() const { return bits_; } + + void SetValue(uint32 bits) { bits_ = bits; } + + bool GetBit(int index) const; + void SetBit(int index, bool value); + + bool Read(ByteBuffer* buf); + void Write(ByteBuffer* buf) const; + +private: + uint32 bits_; +}; + +// Implements STUN/TURN attributs that record an arbitrary byte string +class StunByteStringAttribute : public StunAttribute { +public: + StunByteStringAttribute(uint16 type, uint16 length); + ~StunByteStringAttribute(); + + const char* bytes() const { return bytes_; } + + void SetBytes(char* bytes, uint16 length); + + void CopyBytes(const char* bytes); // uses strlen + void CopyBytes(const void* bytes, uint16 length); + + uint8 GetByte(int index) const; + void SetByte(int index, uint8 value); + + bool Read(ByteBuffer* buf); + void Write(ByteBuffer* buf) const; + +private: + char* bytes_; +}; + +// Implements STUN/TURN attributs that record an error code. +class StunErrorCodeAttribute : public StunAttribute { +public: + StunErrorCodeAttribute(uint16 type, uint16 length); + ~StunErrorCodeAttribute(); + +#if (_MSC_VER < 1300) + enum { MIN_SIZE = 4 }; +#else + static const uint16 MIN_SIZE = 4; +#endif + + uint32 error_code() const { return (class_ << 8) | number_; } + uint8 error_class() const { return class_; } + uint8 number() const { return number_; } + const std::string& reason() const { return reason_; } + + void SetErrorCode(uint32 code); + void SetErrorClass(uint8 eclass) { class_ = eclass; } + void SetNumber(uint8 number) { number_ = number; } + void SetReason(const std::string& reason); + + bool Read(ByteBuffer* buf); + void Write(ByteBuffer* buf) const; + +private: + uint8 class_; + uint8 number_; + std::string reason_; +}; + +// Implements STUN/TURN attributs that record a list of attribute names. +class StunUInt16ListAttribute : public StunAttribute { +public: + StunUInt16ListAttribute(uint16 type, uint16 length); + ~StunUInt16ListAttribute(); + + size_t Size() const; + uint16 GetType(int index) const; + void SetType(int index, uint16 value); + void AddType(uint16 value); + + bool Read(ByteBuffer* buf); + void Write(ByteBuffer* buf) const; + +private: + std::vector<uint16>* attr_types_; +}; + +// Implements the TURN TRANSPORT-PREFS attribute, which provides information +// about the ports to allocate. +class StunTransportPrefsAttribute : public StunAttribute { +public: + StunTransportPrefsAttribute(uint16 type, uint16 length); + ~StunTransportPrefsAttribute(); + +#if (_MSC_VER < 1300) + enum { SIZE1 = 4, SIZE2 = 12 }; +#else + static const uint16 SIZE1 = 4; + static const uint16 SIZE2 = 12; +#endif + + bool preallocate() const { return preallocate_; } + uint8 preference_type() const { return prefs_; } + const StunAddressAttribute* address() const { return addr_; } + + void SetPreferenceType(uint8 prefs) { prefs_ = prefs; } + + // Sets the preallocate address to the given value, or if 0 is given, it sets + // to not preallocate. + void SetPreallocateAddress(StunAddressAttribute* addr); + + bool Read(ByteBuffer* buf); + void Write(ByteBuffer* buf) const; + +private: + bool preallocate_; + uint8 prefs_; + StunAddressAttribute* addr_; +}; + +// The special MAGIC-COOKIE attribute is used to distinguish TURN packets from +// other kinds of traffic. +const char STUN_MAGIC_COOKIE_VALUE[] = { 0x72, char(0xc6), 0x4b, char(0xc6) }; + +// Returns the (successful) response type for the given request type. +StunMessageType GetStunResponseType(StunMessageType request_type); + +// Returns the error response type for the given request type. +StunMessageType GetStunErrorResponseType(StunMessageType request_type); + +} // namespace cricket + +#endif // __STUN_H__ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunport.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunport.cc new file mode 100644 index 00000000..6d1dc6b1 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunport.cc @@ -0,0 +1,171 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif +#include "talk/base/logging.h" +#include "talk/p2p/base/stunport.h" +#include "talk/p2p/base/helpers.h" +#include <iostream> +#include <cassert> + +#if defined(_MSC_VER) && _MSC_VER < 1300 +namespace std { + using ::strerror; +} +#endif + +#ifdef POSIX +extern "C" { +#include <errno.h> +} +#endif // POSIX + +namespace cricket { + +const int KEEPALIVE_DELAY = 10 * 1000; // 10 seconds - sort timeouts +const int RETRY_DELAY = 50; // 50ms, from ICE spec +const uint32 RETRY_TIMEOUT = 50 * 1000; // ICE says 50 secs + +// Handles a binding request sent to the STUN server. +class StunPortBindingRequest : public StunRequest { +public: + StunPortBindingRequest(StunPort* port) : port_(port) { + start_time_ = GetMillisecondCount(); + } + + virtual ~StunPortBindingRequest() { + } + + virtual void Prepare(StunMessage* request) { + request->SetType(STUN_BINDING_REQUEST); + } + + virtual void OnResponse(StunMessage* response) { + const StunAddressAttribute* addr_attr = + response->GetAddress(STUN_ATTR_MAPPED_ADDRESS); + if (!addr_attr) { + LOG(LERROR) << "Binding response missing mapped address."; + } else if (addr_attr->family() != 1) { + LOG(LERROR) << "Binding address has bad family"; + } else { + SocketAddress addr(addr_attr->ip(), addr_attr->port()); + if (port_->candidates().empty()) + port_->add_address(addr, "udp"); + } + + // We will do a keep-alive regardless of whether this request suceeds. + // This should have almost no impact on network usage. + port_->requests_.SendDelayed(new StunPortBindingRequest(port_), KEEPALIVE_DELAY); + } + + virtual void OnErrorResponse(StunMessage* response) { + const StunErrorCodeAttribute* attr = response->GetErrorCode(); + if (!attr) { + LOG(LERROR) << "Bad allocate response error code"; + } else { + LOG(LERROR) << "Binding error response:" + << " class=" << attr->error_class() + << " number=" << attr->number() + << " reason='" << attr->reason() << "'"; + } + + if (GetMillisecondCount() - start_time_ <= RETRY_TIMEOUT) + port_->requests_.SendDelayed(new StunPortBindingRequest(port_), KEEPALIVE_DELAY); + } + + virtual void OnTimeout() { + LOG(LERROR) << "Binding request timed out"; + if (GetMillisecondCount() - start_time_ <= RETRY_TIMEOUT) + port_->requests_.SendDelayed(new StunPortBindingRequest(port_), RETRY_DELAY); + } + +private: + uint32 start_time_; + StunPort* port_; +}; + +const std::string STUN_PORT_TYPE("stun"); + +StunPort::StunPort(Thread* thread, SocketFactory* factory, Network* network, + const SocketAddress& local_addr, + const SocketAddress& server_addr) + : UDPPort(thread, STUN_PORT_TYPE, factory, network), + server_addr_(server_addr), requests_(thread), error_(0) { + + socket_ = CreatePacketSocket(PROTO_UDP); + socket_->SignalReadPacket.connect(this, &StunPort::OnReadPacket); + if (socket_->Bind(local_addr) < 0) + PLOG(LERROR, socket_->GetError()) << "bind"; + + requests_.SignalSendPacket.connect(this, &StunPort::OnSendPacket); +} + +StunPort::~StunPort() { + delete socket_; +} + +void StunPort::PrepareAddress() { + requests_.Send(new StunPortBindingRequest(this)); +} + +int StunPort::SendTo( + const void* data, size_t size, const SocketAddress& addr, bool payload) { + int sent = socket_->SendTo(data, size, addr); + if (sent < 0) + error_ = socket_->GetError(); + return sent; +} + +int StunPort::SetOption(Socket::Option opt, int value) { + return socket_->SetOption(opt, value); +} + +int StunPort::GetError() { + return error_; +} + +void StunPort::OnReadPacket( + const char* data, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket) { + assert(socket == socket_); + + // Look for a response to a binding request. + if (requests_.CheckResponse(data, size)) + return; + + // Process this data packet in the normal manner. + UDPPort::OnReadPacket(data, size, remote_addr); +} + +void StunPort::OnSendPacket(const void* data, size_t size) { + if (socket_->SendTo(data, size, server_addr_) < 0) + PLOG(LERROR, socket_->GetError()) << "sendto"; +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunport.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunport.h new file mode 100644 index 00000000..f042ae14 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunport.h @@ -0,0 +1,72 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __STUNPORT_H__ +#define __STUNPORT_H__ + +#include "talk/base/asyncudpsocket.h" +#include "talk/p2p/base/udpport.h" +#include "talk/p2p/base/stunrequest.h" + +namespace cricket { + +extern const std::string STUN_PORT_TYPE; + +// Communicates using the address on the outside of a NAT. +class StunPort : public UDPPort { +public: + StunPort(Thread* thread, SocketFactory* factory, Network* network, + const SocketAddress& local_addr, const SocketAddress& server_addr); + virtual ~StunPort(); + + virtual void PrepareAddress(); + + virtual int SetOption(Socket::Option opt, int value); + virtual int GetError(); + +protected: + virtual int SendTo(const void* data, size_t size, const SocketAddress& addr, bool payload); + + void OnReadPacket( + const char* data, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket); + +private: + AsyncPacketSocket* socket_; + SocketAddress server_addr_; + StunRequestManager requests_; + int error_; + + friend class StunPortBindingRequest; + + // Sends STUN requests to the server. + void OnSendPacket(const void* data, size_t size); +}; + +} // namespace cricket + +#endif // __STUNPORT_H__ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunrequest.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunrequest.cc new file mode 100644 index 00000000..14d64735 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunrequest.cc @@ -0,0 +1,198 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif +#include "talk/base/logging.h" +#include "talk/p2p/base/stunrequest.h" +#include "talk/p2p/base/helpers.h" +#include <iostream> +#include <cassert> + +namespace cricket { + +const uint32 MSG_STUN_SEND = 1; + +const int MAX_SENDS = 9; +const int DELAY_UNIT = 100; // 100 milliseconds +const int DELAY_MAX_FACTOR = 16; + +StunRequestManager::StunRequestManager(Thread* thread) : thread_(thread) { +} + +StunRequestManager::~StunRequestManager() { + while (requests_.begin() != requests_.end()) { + StunRequest *request = requests_.begin()->second; + requests_.erase(requests_.begin()); + delete request; + } +} + +void StunRequestManager::Send(StunRequest* request) { + SendDelayed(request, 0); +} + +void StunRequestManager::SendDelayed(StunRequest* request, int delay) { + request->set_manager(this); + assert(requests_.find(request->id()) == requests_.end()); + requests_[request->id()] = request; + thread_->PostDelayed(delay, request, MSG_STUN_SEND, NULL); +} + +void StunRequestManager::Remove(StunRequest* request) { + assert(request->manager() == this); + RequestMap::iterator iter = requests_.find(request->id()); + if (iter != requests_.end()) { + assert(iter->second == request); + requests_.erase(iter); + thread_->Clear(request); + } +} + +void StunRequestManager::Clear() { + std::vector<StunRequest*> requests; + for (RequestMap::iterator i = requests_.begin(); i != requests_.end(); ++i) + requests.push_back(i->second); + + for (uint32 i = 0; i < requests.size(); ++i) + Remove(requests[i]); +} + +bool StunRequestManager::CheckResponse(StunMessage* msg) { + RequestMap::iterator iter = requests_.find(msg->transaction_id()); + if (iter == requests_.end()) + return false; + + StunRequest* request = iter->second; + if (msg->type() == GetStunResponseType(request->type())) { + request->OnResponse(msg); + } else if (msg->type() == GetStunErrorResponseType(request->type())) { + request->OnErrorResponse(msg); + } else { + LOG(LERROR) << "Received response with wrong type: " << msg->type() + << " (expecting " << GetStunResponseType(request->type()) << ")"; + return false; + } + + delete request; + return true; +} + +bool StunRequestManager::CheckResponse(const char* data, size_t size) { + // Check the appropriate bytes of the stream to see if they match the + // transaction ID of a response we are expecting. + + if (size < 20) + return false; + + std::string id; + id.append(data + 4, 16); + + RequestMap::iterator iter = requests_.find(id); + if (iter == requests_.end()) + return false; + + // Parse the STUN message and continue processing as usual. + + ByteBuffer buf(data, size); + StunMessage msg; + if (!msg.Read(&buf)) + return false; + + return CheckResponse(&msg); +} + +StunRequest::StunRequest() + : manager_(0), id_(CreateRandomString(16)), msg_(0), count_(0), + timeout_(false), tstamp_(0) { +} + +StunRequest::StunRequest(StunMessage* request) + : manager_(0), id_(request->transaction_id()), msg_(request), + count_(0), timeout_(false) { +} + +StunRequest::~StunRequest() { + assert(manager_ != NULL); + if (manager_) { + manager_->Remove(this); + manager_->thread_->Clear(this); + } + delete msg_; +} + +const StunMessageType StunRequest::type() { + assert(msg_); + return msg_->type(); +} + +void StunRequest::set_manager(StunRequestManager* manager) { + assert(!manager_); + manager_ = manager; +} + +void StunRequest::OnMessage(Message* pmsg) { + assert(manager_); + assert(pmsg->message_id == MSG_STUN_SEND); + + if (!msg_) { + msg_ = new StunMessage(); + msg_->SetTransactionID(id_); + Prepare(msg_); + assert(msg_->transaction_id() == id_); + } + + if (timeout_) { + OnTimeout(); + delete this; + return; + } + + tstamp_ = GetMillisecondCount(); + + ByteBuffer buf; + msg_->Write(&buf); + manager_->SignalSendPacket(buf.Data(), buf.Length()); + + int delay = GetNextDelay(); + manager_->thread_->PostDelayed(delay, this, MSG_STUN_SEND, NULL); +} + +uint32 StunRequest::Elapsed() const { + return (GetMillisecondCount() - tstamp_); +} + +int StunRequest::GetNextDelay() { + int delay = DELAY_UNIT * _min(1 << count_, DELAY_MAX_FACTOR); + count_ += 1; + if (count_ == MAX_SENDS) + timeout_ = true; + return delay; +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunrequest.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunrequest.h new file mode 100644 index 00000000..86acff91 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunrequest.h @@ -0,0 +1,126 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __STUNREQUESTMANAGER_H__ +#define __STUNREQUESTMANAGER_H__ + +#include "talk/base/sigslot.h" +#include "talk/base/thread.h" +#include "talk/p2p/base/stun.h" +#include <map> +#include <string> + +namespace cricket { + +class StunRequest; + +// Manages a set of STUN requests, sending and resending until we receive a +// response or determine that the request has timed out. +class StunRequestManager { +public: + StunRequestManager(Thread* thread); + ~StunRequestManager(); + + // Starts sending the given request (perhaps after a delay). + void Send(StunRequest* request); + void SendDelayed(StunRequest* request, int delay); + + // Removes a stun request that was added previously. This will happen + // automatically when a request succeeds, fails, or times out. + void Remove(StunRequest* request); + + // Removes all stun requests that were added previously. + void Clear(); + + // Determines whether the given message is a response to one of the + // outstanding requests, and if so, processes it appropriately. + bool CheckResponse(StunMessage* msg); + bool CheckResponse(const char* data, size_t size); + + // Raised when there are bytes to be sent. + sigslot::signal2<const void*, size_t> SignalSendPacket; + +private: + typedef std::map<std::string, StunRequest*> RequestMap; + + Thread* thread_; + RequestMap requests_; + + friend class StunRequest; +}; + +// Represents an individual request to be sent. The STUN message can either be +// constructed beforehand or built on demand. +class StunRequest : public MessageHandler { +public: + StunRequest(); + StunRequest(StunMessage* request); + virtual ~StunRequest(); + + // The manager handling this request (if it has been scheduled for sending). + StunRequestManager* manager() { return manager_; } + + // Returns the transaction ID of this request. + const std::string& id() { return id_; } + + // Returns the STUN type of the request message. + const StunMessageType type(); + + // Handles messages for sending and timeout. + void OnMessage(Message* pmsg); + + // Time elapsed since last send (in ms) + uint32 Elapsed() const; + +protected: + int count_; + bool timeout_; + + // Fills in the actual request to be sent. Note that the transaction ID will + // already be set and cannot be changed. + virtual void Prepare(StunMessage* request) {} + + // Called when the message receives a response or times out. + virtual void OnResponse(StunMessage* response) {} + virtual void OnErrorResponse(StunMessage* response) {} + virtual void OnTimeout() {} + virtual int GetNextDelay(); + +private: + StunRequestManager* manager_; + std::string id_; + StunMessage* msg_; + uint32 tstamp_; + + void set_manager(StunRequestManager* manager); + + friend class StunRequestManager; +}; + +} // namespace cricket + +#endif // __STUNREQUESTMANAGER_H__ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver.cc new file mode 100644 index 00000000..6e4f6b66 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver.cc @@ -0,0 +1,160 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "talk/base/bytebuffer.h" +#include "talk/p2p/base/stunserver.h" +#include <iostream> + +#ifdef POSIX +extern "C" { +#include <errno.h> +} +#endif // POSIX + +namespace cricket { + +StunServer::StunServer(AsyncUDPSocket* socket) : socket_(socket) { + socket_->SignalReadPacket.connect(this, &StunServer::OnPacket); +} + +StunServer::~StunServer() { + socket_->SignalReadPacket.disconnect(this); +} + +void StunServer::OnPacket( + const char* buf, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket) { + + // TODO: If appropriate, look for the magic cookie before parsing. + + // Parse the STUN message. + ByteBuffer bbuf(buf, size); + StunMessage msg; + if (!msg.Read(&bbuf)) { + SendErrorResponse(msg, remote_addr, 400, "Bad Request"); + return; + } + + // TODO: If this is UDP, then we shouldn't allow non-fully-parsed messages. + + // TODO: If unknown non-optiional (<= 0x7fff) attributes are found, send a + // 420 "Unknown Attribute" response. + + // TODO: Check that a message-integrity attribute was given (or send 401 + // "Unauthorized"). Check that a username attribute was given (or send + // 432 "Missing Username"). Look up the username and password. If it + // is missing or the HMAC is wrong, send 431 "Integrity Check Failure". + + // Send the message to the appropriate handler function. + switch (msg.type()) { + case STUN_BINDING_REQUEST: + OnBindingRequest(&msg, remote_addr); + return; + + case STUN_ALLOCATE_REQUEST: + OnAllocateRequest(&msg, remote_addr); + return; + + default: + SendErrorResponse(msg, remote_addr, 600, "Operation Not Supported"); + } +} + +void StunServer::OnBindingRequest( + StunMessage* msg, const SocketAddress& remote_addr) { + StunMessage response; + response.SetType(STUN_BINDING_RESPONSE); + response.SetTransactionID(msg->transaction_id()); + + // Tell the user the address that we received their request from. + StunAddressAttribute* mapped_addr = + StunAttribute::CreateAddress(STUN_ATTR_MAPPED_ADDRESS); + mapped_addr->SetFamily(1); + mapped_addr->SetPort(remote_addr.port()); + mapped_addr->SetIP(remote_addr.ip()); + response.AddAttribute(mapped_addr); + + // Tell the user the address that we are sending the response from. + SocketAddress local_addr = socket_->GetLocalAddress(); + StunAddressAttribute* source_addr = + StunAttribute::CreateAddress(STUN_ATTR_SOURCE_ADDRESS); + source_addr->SetFamily(1); + source_addr->SetPort(local_addr.port()); + source_addr->SetIP(local_addr.ip()); + response.AddAttribute(source_addr); + + // TODO: Add username and message-integrity. + + // TODO: Add changed-address. (Keep information about three other servers.) + + SendResponse(response, remote_addr); +} + +void StunServer::OnAllocateRequest( + StunMessage* msg, const SocketAddress& addr) { + SendErrorResponse(*msg, addr, 600, "Operation Not Supported"); +} + +void StunServer::OnSharedSecretRequest( + StunMessage* msg, const SocketAddress& addr) { + SendErrorResponse(*msg, addr, 600, "Operation Not Supported"); +} + +void StunServer::OnSendRequest(StunMessage* msg, const SocketAddress& addr) { + SendErrorResponse(*msg, addr, 600, "Operation Not Supported"); +} + +void StunServer::SendErrorResponse( + const StunMessage& msg, const SocketAddress& addr, int error_code, + const char* error_desc) { + + StunMessage err_msg; + err_msg.SetType(GetStunErrorResponseType(msg.type())); + err_msg.SetTransactionID(msg.transaction_id()); + + StunErrorCodeAttribute* err_code = StunAttribute::CreateErrorCode(); + err_code->SetErrorClass(error_code / 100); + err_code->SetNumber(error_code % 100); + err_code->SetReason(error_desc); + err_msg.AddAttribute(err_code); + + SendResponse(err_msg, addr); +} + +void StunServer::SendResponse( + const StunMessage& msg, const SocketAddress& addr) { + + ByteBuffer buf; + msg.Write(&buf); + + // TODO: Allow response addr attribute if sent from another stun server. + + if (socket_->SendTo(buf.Data(), buf.Length(), addr) < 0) + std::cerr << "sendto: " << std::strerror(errno) << std::endl; +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver.h new file mode 100644 index 00000000..3043645d --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver.h @@ -0,0 +1,73 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __STUNSERVER_H__ +#define __STUNSERVER_H__ + +#include "talk/base/asyncudpsocket.h" +#include "talk/p2p/base/stun.h" + +namespace cricket { + +const int STUN_SERVER_PORT = 3478; + +class StunServer : public sigslot::has_slots<> { +public: + // Creates a STUN server, which will listen on the given socket. + StunServer(AsyncUDPSocket* socket); + + // Removes the STUN server from the socket, but does not delete the socket. + ~StunServer(); + +protected: + + // Slot for AsyncSocket.PacketRead: + void OnPacket( + const char* buf, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket); + + // Handlers for the different types of STUN/TURN requests: + void OnBindingRequest(StunMessage* msg, const SocketAddress& addr); + void OnAllocateRequest(StunMessage* msg, const SocketAddress& addr); + void OnSharedSecretRequest(StunMessage* msg, const SocketAddress& addr); + void OnSendRequest(StunMessage* msg, const SocketAddress& addr); + + // Sends an error response to the given message back to the user. + void SendErrorResponse( + const StunMessage& msg, const SocketAddress& addr, int error_code, + const char* error_desc); + + // Sends the given message to the appropriate destination. + void SendResponse(const StunMessage& msg, const SocketAddress& addr); + +private: + AsyncUDPSocket* socket_; +}; + +} // namespace cricket + +#endif // __STUNSERVER_H__ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver.pro b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver.pro new file mode 100644 index 00000000..dce92ec4 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver.pro @@ -0,0 +1,14 @@ +TEMPLATE = app +INCLUDEPATH = ../../.. +DEFINES += POSIX + +include(../../../../../conf.pri) + +# Input +SOURCES += \ + stunserver.cc \ + stunserver_main.cc \ + ../../base/host.cc #\ +# ../../base/socketaddresspair.cc + +LIBS += ../../../liblibjingle.a diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver_main.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver_main.cc new file mode 100644 index 00000000..bd8a96e5 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/stunserver_main.cc @@ -0,0 +1,66 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "talk/base/host.h" +#include "talk/base/thread.h" +#include "talk/p2p/base/stunserver.h" +#include <iostream> + +#ifdef POSIX +extern "C" { +#include <errno.h> +} +#endif // POSIX + +using namespace cricket; + +int main(int argc, char* argv[]) { + if (argc != 1) { + std::cerr << "usage: stunserver" << std::endl; + return 1; + } + + SocketAddress server_addr(LocalHost().networks()[1]->ip(), 7000); + + Thread *pthMain = Thread::Current(); + + AsyncUDPSocket* server_socket = CreateAsyncUDPSocket(pthMain->socketserver()); + if (server_socket->Bind(server_addr) < 0) { + std::cerr << "bind: " << std::strerror(errno) << std::endl; + return 1; + } + + StunServer* server = new StunServer(server_socket); + + std::cout << "Listening at " << server_addr.ToString() << std::endl; + + pthMain->Loop(); + + delete server; + delete server_socket; + return 0; +} diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/tcpport.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/tcpport.cc new file mode 100644 index 00000000..a2d2adc6 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/tcpport.cc @@ -0,0 +1,250 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif +#include "talk/p2p/base/tcpport.h" +#include "talk/base/logging.h" +#ifdef WIN32 +#include "talk/base/winfirewall.h" +#endif // WIN32 +#include <iostream> +#include <cassert> + +#if defined(_MSC_VER) && _MSC_VER < 1300 +namespace std { + using ::strerror; +} +#endif + +#ifdef POSIX +extern "C" { +#include <errno.h> +} +#endif // POSIX + +namespace cricket { + +#ifdef WIN32 +static WinFirewall win_firewall; +#endif // WIN32 + +TCPPort::TCPPort(Thread* thread, SocketFactory* factory, Network* network, + const SocketAddress& address) + : Port(thread, LOCAL_PORT_TYPE, factory, network), error_(0) { + incoming_only_ = (address.port() != 0); + socket_ = thread->socketserver()->CreateAsyncSocket(SOCK_STREAM); + socket_->SignalReadEvent.connect(this, &TCPPort::OnAcceptEvent); + if (socket_->Bind(address) < 0) + LOG(INFO) << "bind: " << std::strerror(socket_->GetError()); +} + +TCPPort::~TCPPort() { + delete socket_; +} + +Connection* TCPPort::CreateConnection(const Candidate& address, CandidateOrigin origin) { + // We only support TCP protocols + if ((address.protocol() != "tcp") && (address.protocol() != "ssltcp")) + return 0; + + // We can't accept TCP connections incoming on other ports + if (origin == ORIGIN_OTHER_PORT) + return 0; + + // Check if we are allowed to make outgoing TCP connections + if (incoming_only_ && (origin == ORIGIN_MESSAGE)) + return 0; + + // We don't know how to act as an ssl server yet + if ((address.protocol() == "ssltcp") && (origin == ORIGIN_THIS_PORT)) + return 0; + + TCPConnection* conn = 0; + if (AsyncTCPSocket * socket = GetIncoming(address.address(), true)) { + socket->SignalReadPacket.disconnect(this); + conn = new TCPConnection(this, address, socket); + } else { + conn = new TCPConnection(this, address); + } + AddConnection(conn); + return conn; +} + +void TCPPort::PrepareAddress() { + assert(socket_); + + bool allow_listen = true; +#ifdef WIN32 + if (win_firewall.Initialize()) { + char module_path[MAX_PATH + 1] = { 0 }; + ::GetModuleFileNameA(NULL, module_path, MAX_PATH); + if (win_firewall.Enabled() && !win_firewall.Authorized(module_path)) { + allow_listen = false; + } + } +#endif // WIN32 + if (allow_listen) { + if (socket_->Listen(5) < 0) + LOG(INFO) << "listen: " << std::strerror(socket_->GetError()); + } else { + LOG(INFO) << "not listening due to firewall restrictions"; + } + // Note: We still add the address, since otherwise the remote side won't recognize + // our incoming TCP connections. + add_address(socket_->GetLocalAddress(), "tcp"); +} + +int TCPPort::SendTo(const void* data, size_t size, const SocketAddress& addr, bool payload) { + AsyncTCPSocket * socket = 0; + + if (TCPConnection * conn = static_cast<TCPConnection*>(GetConnection(addr))) { + socket = conn->socket(); + } else { + socket = GetIncoming(addr); + } + if (!socket) { + LOG(INFO) << "Unknown destination for SendTo: " << addr.ToString(); + return -1; // TODO: Set error_ + } + + //LOG(INFO) << "TCPPort::SendTo(" << size << ", " << addr.ToString() << ")"; + + int sent = socket->Send(data, size); + if (sent < 0) + error_ = socket->GetError(); + return sent; +} + +int TCPPort::SetOption(Socket::Option opt, int value) { + return socket_->SetOption(opt, value); +} + +int TCPPort::GetError() { + assert(socket_); + return error_; +} + +void TCPPort::OnAcceptEvent(AsyncSocket* socket) { + assert(socket == socket_); + + Incoming incoming; + AsyncSocket * newsocket = static_cast<AsyncSocket *>(socket->Accept(&incoming.addr)); + if (!newsocket) { + // TODO: Do something better like forwarding the error to the user. + LOG(INFO) << "accept: " << socket_->GetError() << " " << std::strerror(socket_->GetError()); + return; + } + incoming.socket = new AsyncTCPSocket(newsocket); + incoming.socket->SignalReadPacket.connect(this, &TCPPort::OnReadPacket); + + LOG(INFO) << "accepted incoming connection from " << incoming.addr.ToString(); + incoming_.push_back(incoming); + + // Prime a read event in case data is waiting + newsocket->SignalReadEvent(newsocket); +} + +AsyncTCPSocket * TCPPort::GetIncoming(const SocketAddress& addr, bool remove) { + AsyncTCPSocket * socket = 0; + for (std::list<Incoming>::iterator it = incoming_.begin(); it != incoming_.end(); ++it) { + if (it->addr == addr) { + socket = it->socket; + if (remove) + incoming_.erase(it); + break; + } + } + return socket; +} + +void TCPPort::OnReadPacket(const char* data, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket) { + Port::OnReadPacket(data, size, remote_addr); +} + +TCPConnection::TCPConnection(TCPPort* port, const Candidate& candidate, AsyncTCPSocket* socket) + : Connection(port, 0, candidate), socket_(socket), error_(0) { + bool outgoing = (socket_ == 0); + if (outgoing) { + socket_ = static_cast<AsyncTCPSocket *>(port->CreatePacketSocket( + (candidate.protocol() == "ssltcp") ? PROTO_SSLTCP : PROTO_TCP)); + } + socket_->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket); + socket_->SignalClose.connect(this, &TCPConnection::OnClose); + if (outgoing) { + connected_ = false; + socket_->SignalConnect.connect(this, &TCPConnection::OnConnect); + socket_->Connect(candidate.address()); + LOG(INFO) << "Connecting to " << candidate.address().ToString(); + } +} + +TCPConnection::~TCPConnection() { +} + +int TCPConnection::Send(const void* data, size_t size) { + if (write_state() != STATE_WRITABLE) + return 0; + + int sent = socket_->Send(data, size); + if (sent < 0) { + error_ = socket_->GetError(); + } else { + sent_total_bytes_ += sent; + } + return sent; +} + +int TCPConnection::GetError() { + return error_; +} + +TCPPort* TCPConnection::tcpport() { + return static_cast<TCPPort*>(port_); +} + +void TCPConnection::OnConnect(AsyncTCPSocket* socket) { + assert(socket == socket_); + LOG(INFO) << "tcp connected to " << socket->GetRemoteAddress().ToString(); + set_connected(true); +} + +void TCPConnection::OnClose(AsyncTCPSocket* socket, int error) { + assert(socket == socket_); + LOG(INFO) << "tcp closed with error: " << error; + set_connected(false); +} + +void TCPConnection::OnReadPacket(const char* data, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket) { + assert(socket == socket_); + Connection::OnReadPacket(data, size); +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/tcpport.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/tcpport.h new file mode 100644 index 00000000..f6a9beb7 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/tcpport.h @@ -0,0 +1,116 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __TCPPORT_H__ +#define __TCPPORT_H__ + +#include <list> +#include "talk/base/asynctcpsocket.h" +#include "talk/p2p/base/port.h" + +namespace cricket { + +class TCPConnection; + +extern const std::string LOCAL_PORT_TYPE; // type of TCP ports + +// Communicates using a local TCP port. +// +// This class is designed to allow subclasses to take advantage of the +// connection management provided by this class. A subclass should take of all +// packet sending and preparation, but when a packet is received, it should +// call this TCPPort::OnReadPacket (3 arg) to dispatch to a connection. +class TCPPort : public Port { +public: + TCPPort(Thread* thread, SocketFactory* factory, Network* network, + const SocketAddress& address); + virtual ~TCPPort(); + + virtual Connection* CreateConnection(const Candidate& address, CandidateOrigin origin); + + virtual void PrepareAddress(); + + virtual int SetOption(Socket::Option opt, int value); + virtual int GetError(); + +protected: + // Handles sending using the local TCP socket. + virtual int SendTo(const void* data, size_t size, const SocketAddress& addr, bool payload); + + // Creates TCPConnection for incoming sockets + void OnAcceptEvent(AsyncSocket* socket); + + AsyncSocket* socket() { return socket_; } + +private: + bool incoming_only_; + AsyncSocket* socket_; + int error_; + + struct Incoming { + SocketAddress addr; + AsyncTCPSocket * socket; + }; + std::list<Incoming> incoming_; + + AsyncTCPSocket * GetIncoming(const SocketAddress& addr, bool remove = false); + + // Receives packet signal from the local TCP Socket. + void OnReadPacket(const char* data, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket); + + friend class TCPConnection; +}; + +class TCPConnection : public Connection { +public: + // Connection is outgoing unless socket is specified + TCPConnection(TCPPort* port, const Candidate& candidate, AsyncTCPSocket* socket = 0); + virtual ~TCPConnection(); + + virtual int Send(const void* data, size_t size); + virtual int GetError(); + + AsyncTCPSocket * socket() { return socket_; } + +private: + TCPPort* tcpport(); + AsyncTCPSocket* socket_; + bool connected_; + int error_; + + void OnConnect(AsyncTCPSocket* socket); + void OnClose(AsyncTCPSocket* socket, int error); + void OnReadPacket(const char* data, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket); + + friend class TCPPort; +}; + +} // namespace cricket + +#endif // __TCPPORT_H__ diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/udpport.cc b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/udpport.cc new file mode 100644 index 00000000..fabbb25b --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/udpport.cc @@ -0,0 +1,117 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif +#include "talk/base/logging.h" +#include "talk/p2p/base/udpport.h" +#include <iostream> +#include <cassert> + +#if defined(_MSC_VER) && _MSC_VER < 1300 +namespace std { + using ::strerror; +} +#endif + +#ifdef POSIX +extern "C" { +#include <errno.h> +} +#endif // POSIX + +namespace cricket { + +const std::string LOCAL_PORT_TYPE("local"); + +UDPPort::UDPPort(Thread* thread, SocketFactory* factory, Network* network, + const SocketAddress& address) + : Port(thread, LOCAL_PORT_TYPE, factory, network), error_(0) { + socket_ = CreatePacketSocket(PROTO_UDP); + socket_->SignalReadPacket.connect(this, &UDPPort::OnReadPacketSlot); + if (socket_->Bind(address) < 0) + PLOG(LERROR, socket_->GetError()) << "bind"; +} + +UDPPort::UDPPort(Thread* thread, const std::string &type, + SocketFactory* factory, Network* network) + : Port(thread, type, factory, network), socket_(0), error_(0) { +} + +UDPPort::~UDPPort() { + delete socket_; +} + +void UDPPort::PrepareAddress() { + assert(socket_); + add_address(socket_->GetLocalAddress(), "udp"); +} + +Connection* UDPPort::CreateConnection(const Candidate& address, CandidateOrigin origin) { + if (address.protocol() != "udp") + return 0; + + Connection * conn = new ProxyConnection(this, 0, address); + AddConnection(conn); + return conn; +} + +int UDPPort::SendTo(const void* data, size_t size, const SocketAddress& addr, bool payload) { + assert(socket_); + int sent = socket_->SendTo(data, size, addr); + if (sent < 0) + error_ = socket_->GetError(); + return sent; +} + +int UDPPort::SetOption(Socket::Option opt, int value) { + return socket_->SetOption(opt, value); +} + +int UDPPort::GetError() { + assert(socket_); + return error_; +} + +void UDPPort::OnReadPacketSlot( + const char* data, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket) { + assert(socket == socket_); + OnReadPacket(data, size, remote_addr); +} + +void UDPPort::OnReadPacket( + const char* data, size_t size, const SocketAddress& remote_addr) { + if (Connection* conn = GetConnection(remote_addr)) { + conn->OnReadPacket(data, size); + } else { + Port::OnReadPacket(data, size, remote_addr); + } +} + +} // namespace cricket diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/udpport.h b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/udpport.h new file mode 100644 index 00000000..4bcd113e --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/udpport.h @@ -0,0 +1,81 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __UDPPORT_H__ +#define __UDPPORT_H__ + +#include "talk/base/asyncudpsocket.h" +#include "talk/p2p/base/port.h" + +namespace cricket { + +extern const std::string LOCAL_PORT_TYPE; // type of UDP ports + +// Communicates using a local UDP port. +// +// This class is designed to allow subclasses to take advantage of the +// connection management provided by this class. A subclass should take of all +// packet sending and preparation, but when a packet is received, it should +// call this UDPPort::OnReadPacket (3 arg) to dispatch to a connection. +class UDPPort : public Port { +public: + UDPPort(Thread* thread, SocketFactory* factory, Network* network, + const SocketAddress& address); + virtual ~UDPPort(); + + virtual void PrepareAddress(); + virtual Connection* CreateConnection(const Candidate& address, CandidateOrigin origin); + + virtual int SetOption(Socket::Option opt, int value); + virtual int GetError(); + +protected: + UDPPort(Thread* thread, const std::string &type, SocketFactory* factory, + Network* network); + + // Handles sending using the local UDP socket. + virtual int SendTo(const void* data, size_t size, const SocketAddress& addr, bool payload); + + // Dispatches the given packet to the port or connection as appropriate. + void OnReadPacket( + const char* data, size_t size, const SocketAddress& remote_addr); + + AsyncPacketSocket* socket() { return socket_; } + +private: + AsyncPacketSocket* socket_; + int error_; + + // Receives packet signal from the local UDP Socket. + void OnReadPacketSlot( + const char* data, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket); +}; + +} // namespace cricket + +#endif // __UDPPORT_H__ |