diff options
Diffstat (limited to 'libktorrent/net')
-rw-r--r-- | libktorrent/net/Makefile.am | 10 | ||||
-rw-r--r-- | libktorrent/net/address.cpp | 67 | ||||
-rw-r--r-- | libktorrent/net/address.h | 60 | ||||
-rw-r--r-- | libktorrent/net/bufferedsocket.cpp | 217 | ||||
-rw-r--r-- | libktorrent/net/bufferedsocket.h | 150 | ||||
-rw-r--r-- | libktorrent/net/circularbuffer.cpp | 146 | ||||
-rw-r--r-- | libktorrent/net/circularbuffer.h | 89 | ||||
-rw-r--r-- | libktorrent/net/downloadthread.cpp | 137 | ||||
-rw-r--r-- | libktorrent/net/downloadthread.h | 64 | ||||
-rw-r--r-- | libktorrent/net/networkthread.cpp | 165 | ||||
-rw-r--r-- | libktorrent/net/networkthread.h | 113 | ||||
-rw-r--r-- | libktorrent/net/portlist.cpp | 73 | ||||
-rw-r--r-- | libktorrent/net/portlist.h | 103 | ||||
-rw-r--r-- | libktorrent/net/socket.cpp | 326 | ||||
-rw-r--r-- | libktorrent/net/socket.h | 83 | ||||
-rw-r--r-- | libktorrent/net/socketgroup.cpp | 186 | ||||
-rw-r--r-- | libktorrent/net/socketgroup.h | 90 | ||||
-rw-r--r-- | libktorrent/net/socketmonitor.cpp | 173 | ||||
-rw-r--r-- | libktorrent/net/socketmonitor.h | 118 | ||||
-rw-r--r-- | libktorrent/net/speed.cpp | 78 | ||||
-rw-r--r-- | libktorrent/net/speed.h | 51 | ||||
-rw-r--r-- | libktorrent/net/uploadthread.cpp | 91 | ||||
-rw-r--r-- | libktorrent/net/uploadthread.h | 61 |
23 files changed, 2651 insertions, 0 deletions
diff --git a/libktorrent/net/Makefile.am b/libktorrent/net/Makefile.am new file mode 100644 index 0000000..e67354c --- /dev/null +++ b/libktorrent/net/Makefile.am @@ -0,0 +1,10 @@ +INCLUDES = -I$(srcdir)/.. -I$(srcdir)/. $(all_includes) +METASOURCES = AUTO +libnet_la_LDFLAGS = $(all_libraries) +noinst_LTLIBRARIES = libnet.la +noinst_HEADERS = address.h bufferedsocket.h circularbuffer.h downloadthread.h \ + networkthread.h portlist.h socket.h socketmonitor.h speed.h uploadthread.h +libnet_la_SOURCES = address.cpp bufferedsocket.cpp circularbuffer.cpp \ + downloadthread.cpp networkthread.cpp portlist.cpp socket.cpp socketgroup.cpp \ + socketmonitor.cpp speed.cpp uploadthread.cpp +KDE_CXXFLAGS = $(USE_EXCEPTIONS) $(USE_RTTI) diff --git a/libktorrent/net/address.cpp b/libktorrent/net/address.cpp new file mode 100644 index 0000000..4a4da3c --- /dev/null +++ b/libktorrent/net/address.cpp @@ -0,0 +1,67 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <sys/types.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include "address.h" + +namespace net +{ + + Address::Address() : m_ip(0),m_port(0) {} + + Address::Address(const QString & host,Uint16 port) : m_ip(0),m_port(port) + { + struct in_addr a; + if (inet_aton(host.ascii(),&a)) + m_ip = ntohl(a.s_addr); + } + + Address::Address(const Address & addr) : m_ip(addr.ip()),m_port(addr.port()) + { + } + + Address:: ~Address() + {} + + + Address & Address::operator = (const Address & a) + { + m_ip = a.ip(); + m_port = a.port(); + return *this; + } + + + bool Address::operator == (const Address & a) + { + return m_ip == a.ip() && m_port == a.port(); + } + + QString Address::toString() const + { + return QString("%1.%2.%3.%4") + .arg((m_ip & 0xFF000000) >> 24) + .arg((m_ip & 0x00FF0000) >> 16) + .arg((m_ip & 0x0000FF00) >> 8) + .arg(m_ip & 0x000000FF); + } + +} diff --git a/libktorrent/net/address.h b/libktorrent/net/address.h new file mode 100644 index 0000000..28c4e2c --- /dev/null +++ b/libktorrent/net/address.h @@ -0,0 +1,60 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef NETADDRESS_H +#define NETADDRESS_H + +#include <qstring.h> +#include <util/constants.h> + +namespace net +{ + using bt::Uint32; + using bt::Uint16; + + /** + @author Joris Guisson <joris.guisson@gmail.com> + */ + class Address + { + Uint32 m_ip; + Uint16 m_port; + public: + Address(); + Address(const QString & host,Uint16 port); + Address(const Address & addr); + virtual ~Address(); + + + Address & operator = (const Address & a); + bool operator == (const Address & a); + + Uint32 ip() const {return m_ip;} + void setIP(Uint32 ip) {m_ip = ip;} + + Uint16 port() const {return m_port;} + void setPort(Uint16 p) {m_port = p;} + + QString toString() const; + + }; + +} + +#endif diff --git a/libktorrent/net/bufferedsocket.cpp b/libktorrent/net/bufferedsocket.cpp new file mode 100644 index 0000000..2165f70 --- /dev/null +++ b/libktorrent/net/bufferedsocket.cpp @@ -0,0 +1,217 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <util/log.h> +#include <torrent/globals.h> +#include "bufferedsocket.h" +#include "circularbuffer.h" +#include "speed.h" + +using namespace bt; + +namespace net +{ +#define OUTPUT_BUFFER_SIZE 16393 + + BufferedSocket::BufferedSocket(int fd) : Socket(fd),rdr(0),wrt(0),up_gid(0),down_gid(0) + { + bytes_in_output_buffer = 0; + bytes_sent = 0; + down_speed = new Speed(); + up_speed = new Speed(); + output_buffer = new Uint8[OUTPUT_BUFFER_SIZE]; + poll_index = -1; + } + + BufferedSocket::BufferedSocket(bool tcp) : Socket(tcp),rdr(0),wrt(0),up_gid(0),down_gid(0) + { + bytes_in_output_buffer = 0; + bytes_sent = 0; + down_speed = new Speed(); + up_speed = new Speed(); + output_buffer = new Uint8[OUTPUT_BUFFER_SIZE]; + poll_index = -1; + } + + + BufferedSocket::~BufferedSocket() + { + delete [] output_buffer; + delete up_speed; + delete down_speed; + } + + void BufferedSocket::setGroupID(Uint32 gid,bool upload) + { + if (upload) + up_gid = gid; + else + down_gid = gid; + } + + float BufferedSocket::getDownloadRate() const + { + mutex.lock(); + float ret = down_speed->getRate(); + mutex.unlock(); + return ret; + } + + float BufferedSocket::getUploadRate() const + { + mutex.lock(); + float ret = up_speed->getRate(); + mutex.unlock(); + return ret; + } + + static Uint8 input_buffer[OUTPUT_BUFFER_SIZE]; + + Uint32 BufferedSocket::readBuffered(Uint32 max_bytes_to_read,bt::TimeStamp now) + { + Uint32 br = 0; + bool no_limit = (max_bytes_to_read == 0); + + if (bytesAvailable() == 0) + { + close(); + return 0; + } + + while ((br < max_bytes_to_read || no_limit) && bytesAvailable() > 0) + { + Uint32 tr = bytesAvailable(); + if (tr > OUTPUT_BUFFER_SIZE) + tr = OUTPUT_BUFFER_SIZE; + if (!no_limit && tr + br > max_bytes_to_read) + tr = max_bytes_to_read - br; + + int ret = Socket::recv(input_buffer,tr); + if (ret != 0) + { + mutex.lock(); + down_speed->onData(ret,now); + mutex.unlock(); + if (rdr) + rdr->onDataReady(input_buffer,ret); + br += ret; + } + else + { + // connection closed, so just return the number of bytes read + return br; + } + } + return br; + } + + Uint32 BufferedSocket::sendOutputBuffer(Uint32 max,bt::TimeStamp now) + { + if (bytes_in_output_buffer == 0) + return 0; + + if (max == 0 || bytes_in_output_buffer <= max) + { + // try to send everything + Uint32 bw = bytes_in_output_buffer; + Uint32 off = bytes_sent; + Uint32 ret = Socket::send(output_buffer + off,bw); + if (ret > 0) + { + mutex.lock(); + up_speed->onData(ret,now); + mutex.unlock(); + bytes_in_output_buffer -= ret; + bytes_sent += ret; + if (bytes_sent == bytes_in_output_buffer) + bytes_in_output_buffer = bytes_sent = 0; + return ret; + } + else + { + return 0; + } + } + else + { + Uint32 bw = max; + Uint32 off = bytes_sent; + Uint32 ret = Socket::send(output_buffer + off,bw); + if (ret > 0) + { + mutex.lock(); + up_speed->onData(ret,now); + mutex.unlock(); + bytes_in_output_buffer -= ret; + bytes_sent += ret; + return ret; + } + else + { + return 0; + } + } + } + + Uint32 BufferedSocket::writeBuffered(Uint32 max,bt::TimeStamp now) + { + if (!wrt) + return 0; + + Uint32 bw = 0; + bool no_limit = max == 0; + if (bytes_in_output_buffer > 0) + { + Uint32 ret = sendOutputBuffer(max,now); + if (bytes_in_output_buffer > 0) + { + // haven't sent it fully so return + return ret; + } + + bw += ret; + } + + // run as long as we do not hit the limit and we can send everything + while ((no_limit || bw < max) && bytes_in_output_buffer == 0) + { + // fill output buffer + bytes_in_output_buffer = wrt->onReadyToWrite(output_buffer,OUTPUT_BUFFER_SIZE); + bytes_sent = 0; + if (bytes_in_output_buffer > 0) + { + // try to send + bw += sendOutputBuffer(max - bw,now); + } + else + { + // no bytes available in output buffer so break + break; + } + } + + return bw; + } + + void BufferedSocket::updateSpeeds(bt::TimeStamp now) + { + up_speed->update(now); + down_speed->update(now); + } +} diff --git a/libktorrent/net/bufferedsocket.h b/libktorrent/net/bufferedsocket.h new file mode 100644 index 0000000..2c0c3ec --- /dev/null +++ b/libktorrent/net/bufferedsocket.h @@ -0,0 +1,150 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef NETBUFFEREDSOCKET_H +#define NETBUFFEREDSOCKET_H + +#include <qmutex.h> +#include <net/socket.h> + +namespace net +{ + using bt::Uint8; + using bt::Uint32; + + class Speed; + + class SocketReader + { + public: + SocketReader() {} + virtual ~SocketReader() {} + + /** + * Function which will be called whenever data has been read from the socket. + * This data should be dealt with, otherwise it will be discarded. + * @param buf The buffer + * @param size The size of the buffer + */ + virtual void onDataReady(Uint8* buf,Uint32 size) = 0; + }; + + class SocketWriter + { + public: + SocketWriter() {} + virtual ~SocketWriter() {} + + /** + * The socket is ready to write, the writer is asked to provide the data. + * The data will be fully sent, before another request is done. + * @param data The data + * @param max_to_write The maximum number of bytes to put in the buffer + * @param The number of bytes placed in the buffer + */ + virtual Uint32 onReadyToWrite(Uint8* data,Uint32 max_to_write) = 0; + + /// Check if data is ready to write + virtual bool hasBytesToWrite() const = 0; + + }; + + /** + * @author Joris Guisson <joris.guisson@gmail.com> + * + * Extends the Socket class with + */ + class BufferedSocket : public Socket + { + mutable QMutex mutex; + SocketReader* rdr; + SocketWriter* wrt; + Uint8* output_buffer; + Uint32 bytes_in_output_buffer; // bytes in the output buffer + Uint32 bytes_sent; // bytes written of the output buffer + Speed* down_speed; + Speed* up_speed; + int poll_index; + + Uint32 up_gid; + Uint32 down_gid; // group id which this torrent belongs to, group 0 means the default group + + public: + BufferedSocket(int fd); + BufferedSocket(bool tcp); + virtual ~BufferedSocket(); + + /** + * Set the group ID of the socket + * @param gid THe ID (0 is default group) + * @param upload Wether this is an upload group or a download group + */ + void setGroupID(Uint32 gid,bool upload); + + /// Get the download group ID + Uint32 downloadGroupID() const {return down_gid;} + + /// Get the upload group ID + Uint32 uploadGroupID() const {return up_gid;} + + void setReader(SocketReader* r) {rdr = r;} + void setWriter(SocketWriter* r) {wrt = r;} + + /** + * Reads data from the socket to the buffer. + * @param max_bytes_to_read Maximum number of bytes to read (0 is no limit) + * @param now Current time stamp + * @return The number of bytes read + */ + Uint32 readBuffered(Uint32 max_bytes_to_read,bt::TimeStamp now); + + /** + * Writes data from the buffer to the socket. + * @param max The maximum number of bytes to send over the socket (0 = no limit) + * * @param now Current time stamp + * @return The number of bytes written + */ + Uint32 writeBuffered(Uint32 max,bt::TimeStamp now); + + /// See if the socket has something ready to write + bool bytesReadyToWrite() const + { + return bytes_in_output_buffer > 0 || (!wrt ? false : wrt->hasBytesToWrite()); + } + + + /// Get the current download rate + float getDownloadRate() const; + + /// Get the current download rate + float getUploadRate() const; + + /// Update up and down speed + void updateSpeeds(bt::TimeStamp now); + + int getPollIndex() const {return poll_index;} + void setPollIndex(int pi) {poll_index = pi;} + + private: + Uint32 sendOutputBuffer(Uint32 max,bt::TimeStamp now); + }; + +} + +#endif diff --git a/libktorrent/net/circularbuffer.cpp b/libktorrent/net/circularbuffer.cpp new file mode 100644 index 0000000..abce80a --- /dev/null +++ b/libktorrent/net/circularbuffer.cpp @@ -0,0 +1,146 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <util/log.h> +#include <torrent/globals.h> +#include "circularbuffer.h" +#include "bufferedsocket.h" + +using namespace bt; + +namespace net +{ + + CircularBuffer::CircularBuffer(Uint32 max_size) : buf(0),max_size(max_size),first(0),size(0) + { + buf = new Uint8[max_size]; + } + + + CircularBuffer::~CircularBuffer() + { + delete [] buf; + } + + Uint32 CircularBuffer::freeSpace() const + { + return max_size - size; + } + + Uint32 CircularBuffer::write(const Uint8* data,Uint32 dsize) + { + if (size == max_size) + return 0; + + mutex.lock(); + Uint32 wp = (first + size) % max_size; + Uint32 j = 0; + while (size < max_size && (dsize == 0 || j < dsize)) + { + buf[wp] = data[j]; + j++; + wp = (wp + 1) % max_size; + size++; + } + + mutex.unlock(); + return j; + } + + Uint32 CircularBuffer::read(Uint8* data,Uint32 max_to_read) + { + if (!size) + return 0; + + mutex.lock(); + Uint32 j = 0; + while (size > 0 && j < max_to_read) + { + data[j] = buf[first]; + j++; + first = (first + 1) % max_size; + size--; + } + mutex.unlock(); + return j; + } + + Uint32 CircularBuffer::send(BufferedSocket* s,Uint32 max) + { + if (!size) + return 0; + + Uint32 ret = 0; + mutex.lock(); + + if (first + size <= max_size) + { + Uint32 ts = size; + if (max > 0 && size > max) + ts = max; + ret = s->send(buf + first,ts); + first += ret; + size -= ret; + } + else if (max > 0) // if there is a limit + { + // write from first to the end of the buffer + Uint32 to_send = max_size - first; + if (to_send > max) + to_send = max; + + ret = s->send(buf + first,to_send); + + // update first, wrap around if necessary + first = (first + ret) % max_size; + size -= ret; // ret bytes less in the buffer + max -= ret; // decrease limit + + if (max > 0 && ret == to_send && size > 0) + { + // we have sent everything so we can send more + to_send = size > max ? max : size; + Uint32 ret2 = s->send(buf,to_send); + + ret += ret2; + first += ret2; + size -= ret2; + } + } + else // no limit + { + Uint32 to_send = max_size - first; + ret = s->send(buf + first,to_send); + // update first, wrap around if necessary + first = (first + ret) % max_size; + size -= ret; // ret bytes less in the buffer + if (ret == to_send && size > 0) + { + // we have sent everything so we can send more + Uint32 ret2 = s->send(buf,size); + ret += ret2; + first += ret2; + size -= ret2; + } + } + mutex.unlock(); + return ret; + } + +} diff --git a/libktorrent/net/circularbuffer.h b/libktorrent/net/circularbuffer.h new file mode 100644 index 0000000..63e271e --- /dev/null +++ b/libktorrent/net/circularbuffer.h @@ -0,0 +1,89 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef NETCIRCULARBUFFER_H +#define NETCIRCULARBUFFER_H + +#include <qmutex.h> +#include <util/constants.h> + +namespace net +{ + using bt::Uint8; + using bt::Uint32; + + class BufferedSocket; + + /** + * @author Joris Guisson <joris.guisson@gmail.com> + * + * Simple circular buffer, to simulate a queue. + * Writes happen at the end, reads at the beginning. + * The buffer is protected by a mutex. + */ + class CircularBuffer + { + Uint8* buf; + Uint32 max_size; + Uint32 first; // index of first byte in the buffer + Uint32 size; // number of bytes in use + mutable QMutex mutex; + public: + /** + * Create the buffer. + * @param max_size Maximum size of the buffer. + */ + CircularBuffer(Uint32 max_size); + virtual ~CircularBuffer(); + + /// How much capacity does the buffer have + Uint32 capacity() const {return max_size;} + + /// How much free space is there + Uint32 freeSpace() const; + + + /** + * Write a bunch of data at the back of the buffer. + * @param data Data to write + * @param size How many bytes to write + * @return The number of bytes written in the buffer + */ + Uint32 write(const Uint8* data,Uint32 size); + + /** + * Read from the buffer. + * @param data Buffer to store read data + * @param max_to_read Maximum amount of bytes to read + * @return The number of bytes read + */ + Uint32 read(Uint8* data,Uint32 max_to_read); + + /** + * Send the data in the buffer over the socket + * @param s THe socket + * @param max Maximum bytes to send + * @return The number of bytes written + */ + Uint32 send(BufferedSocket* s,Uint32 max); + }; + +} + +#endif diff --git a/libktorrent/net/downloadthread.cpp b/libktorrent/net/downloadthread.cpp new file mode 100644 index 0000000..ae0f0b9 --- /dev/null +++ b/libktorrent/net/downloadthread.cpp @@ -0,0 +1,137 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <math.h> +#include <sys/poll.h> +#include <util/functions.h> +#include "socketgroup.h" +#include "downloadthread.h" +#include "socketmonitor.h" +#include "bufferedsocket.h" + +using namespace bt; + +namespace net +{ + Uint32 DownloadThread::dcap = 0; + Uint32 DownloadThread::sleep_time = 3; + + DownloadThread::DownloadThread(SocketMonitor* sm) : NetworkThread(sm) + { + } + + + DownloadThread::~DownloadThread() + {} + + void DownloadThread::update() + { + sm->lock(); + int num = fillPollVector(); + sm->unlock(); + + int timeout = 10; + if (poll(&fd_vec[0],num,timeout) > 0) + { + sm->lock(); + TimeStamp now = bt::Now(); + Uint32 num_ready = 0; + SocketMonitor::Itr itr = sm->begin(); + while (itr != sm->end()) + { + BufferedSocket* s = *itr; + int pi = s->getPollIndex(); + if (pi >= 0 && s->ok() && fd_vec[pi].revents & POLLIN) + { + // add to the correct group + Uint32 gid = s->downloadGroupID(); + SocketGroup* g = groups.find(gid); + if (!g) + g = groups.find(0); + + g->add(s); + num_ready++; + } + itr++; + } + + if (num_ready > 0) + doGroups(num_ready,now,dcap); + prev_run_time = now; + sm->unlock(); + } + + if (dcap > 0 || groups.count() > 0) + msleep(sleep_time); + } + + int DownloadThread::fillPollVector() + { + TimeStamp ts = bt::Now(); + int i = 0; + + // fill the poll vector with all sockets + SocketMonitor::Itr itr = sm->begin(); + while (itr != sm->end()) + { + BufferedSocket* s = *itr; + if (s && s->ok() && s->fd() > 0) + { + if (fd_vec.size() <= i) + { + // expand pollfd vector if necessary + struct pollfd pfd; + pfd.fd = s->fd(); + pfd.revents = 0; + pfd.events = POLLIN; + fd_vec.push_back(pfd); + } + else + { + // use existing slot + struct pollfd & pfd = fd_vec[i]; + pfd.fd = s->fd(); + pfd.revents = 0; + pfd.events = POLLIN; + } + s->setPollIndex(i); + i++; + s->updateSpeeds(ts); + } + else + { + s->setPollIndex(-1); + } + itr++; + } + + return i; + } + + void DownloadThread::setSleepTime(Uint32 stime) + { + if (stime >= 1 && stime <= 10) + sleep_time = stime; + } + + bool DownloadThread::doGroup(SocketGroup* g,Uint32 & allowance,bt::TimeStamp now) + { + return g->download(allowance,now); + } +} diff --git a/libktorrent/net/downloadthread.h b/libktorrent/net/downloadthread.h new file mode 100644 index 0000000..08e9e46 --- /dev/null +++ b/libktorrent/net/downloadthread.h @@ -0,0 +1,64 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef NETDOWNLOADTHREAD_H +#define NETDOWNLOADTHREAD_H + +#include <vector> +#include "networkthread.h" + +struct pollfd; + +namespace net +{ + + /** + * @author Joris Guisson <joris.guisson@gmail.com> + * + * Thread which processes incoming data + */ + class DownloadThread : public NetworkThread + { + static bt::Uint32 dcap; + static bt::Uint32 sleep_time; + + std::vector<struct pollfd> fd_vec; + + public: + DownloadThread(SocketMonitor* sm); + virtual ~DownloadThread(); + + + /// Set the download cap + static void setCap(bt::Uint32 cap) {dcap = cap;} + + /// Set the sleep time when using download caps + static void setSleepTime(bt::Uint32 stime); + private: + int fillPollVector(); + + virtual void update(); + virtual bool doGroup(SocketGroup* g,Uint32 & allowance,bt::TimeStamp now); + +// void processIncomingData(bt::TimeStamp now); + }; + +} + +#endif diff --git a/libktorrent/net/networkthread.cpp b/libktorrent/net/networkthread.cpp new file mode 100644 index 0000000..40791c9 --- /dev/null +++ b/libktorrent/net/networkthread.cpp @@ -0,0 +1,165 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <math.h> +#include <util/functions.h> +#include <util/log.h> +#include "socketgroup.h" +#include "socketmonitor.h" +#include "networkthread.h" + +using namespace bt; + +namespace net +{ + + NetworkThread::NetworkThread(SocketMonitor* sm) + : sm(sm),running(false) + { + groups.setAutoDelete(true); + groups.insert(0,new SocketGroup(0)); + } + + + NetworkThread::~NetworkThread() + {} + + void NetworkThread::run() + { + running = true; + prev_run_time = bt::Now(); + while (running) + update(); + } + + void NetworkThread::addGroup(Uint32 gid,Uint32 limit) + { + // if group already exists, just change the limit + SocketGroup* g = groups.find(gid); + if (g) + { + g->setLimit(limit); + } + else + { + g = new SocketGroup(limit); + groups.insert(gid,g); + } + } + + void NetworkThread::removeGroup(Uint32 gid) + { + // make sure the 0 group is never erased + if (gid != 0) + groups.erase(gid); + } + + void NetworkThread::setGroupLimit(Uint32 gid,Uint32 limit) + { + SocketGroup* g = groups.find(gid); + if (g) + { + g->setLimit(limit); + } + } + + Uint32 NetworkThread::doGroupsLimited(Uint32 num_ready,bt::TimeStamp now,Uint32 & allowance) + { + Uint32 num_still_ready = 0; + + // this is one pass over all the groups + bt::PtrMap<Uint32,SocketGroup>::iterator itr = groups.begin(); + while (itr != groups.end() && allowance > 0) + { + SocketGroup* g = itr->second; + if (g->numSockets() > 0) + { + Uint32 group_allowance = (Uint32)ceil(((double)g->numSockets() / num_ready) * allowance); + + // lets not do to much and make sure we don't pass 0 to the socket group (0 is unlimited) + if (group_allowance > allowance || group_allowance == 0) + group_allowance = allowance; + + Uint32 ga = group_allowance; + + if (!doGroup(g,ga,now)) + g->clear(); // group is done, so clear it + else + num_still_ready += g->numSockets(); // keep track of the number of sockets which are still ready + + Uint32 done = group_allowance - ga; + if (allowance >= done) + allowance -= done; + else + allowance = 0; + } + itr++; + } + + return num_still_ready > 0; + } + + void NetworkThread::doGroups(Uint32 num_ready,bt::TimeStamp now,bt::Uint32 limit) + { + if (limit == 0) + { + Uint32 allowance = 0; + bt::PtrMap<Uint32,SocketGroup>::iterator itr = groups.begin(); + while (itr != groups.end()) + { + SocketGroup* g = itr->second; + if (g->numSockets() > 0) + { + g->calcAllowance(now); + doGroup(g,allowance,now); + g->clear(); + } + itr++; + } + } + else + { + // calculate group allowance for each group + bt::PtrMap<Uint32,SocketGroup>::iterator itr = groups.begin(); + while (itr != groups.end()) + { + SocketGroup* g = itr->second; + g->calcAllowance(now); + itr++; + } + + Uint32 allowance = (Uint32)ceil(1.02 * limit * (now - prev_run_time) * 0.001); + + while (allowance > 0 && num_ready > 0) + { + // loop until nobody is ready anymore or the allowance is up + num_ready = doGroupsLimited(num_ready,now,allowance); + } + + // make sure all groups are cleared + itr = groups.begin(); + while (itr != groups.end()) + { + SocketGroup* g = itr->second; + g->clear(); + itr++; + } + } + } +} diff --git a/libktorrent/net/networkthread.h b/libktorrent/net/networkthread.h new file mode 100644 index 0000000..7472c15 --- /dev/null +++ b/libktorrent/net/networkthread.h @@ -0,0 +1,113 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef NETNETWORKTHREAD_H +#define NETNETWORKTHREAD_H + +#include <qthread.h> +#include <util/constants.h> +#include <util/ptrmap.h> + +using bt::Uint32; + +namespace net +{ + class SocketMonitor; + class SocketGroup; + class BufferedSocket; + + /** + @author Joris Guisson <joris.guisson@gmail.com> + + Base class for the 2 networking threads. Handles the socket groups. + */ + class NetworkThread : public QThread + { + protected: + SocketMonitor* sm; + bool running; + bt::PtrMap<Uint32,SocketGroup> groups; + bt::TimeStamp prev_run_time; + + public: + NetworkThread(SocketMonitor* sm); + virtual ~NetworkThread(); + + + /** + * Add a new group with a given limit + * @param gid The group ID (cannot be 0, 0 is the default group) + * @param limit The limit in bytes per sec + */ + void addGroup(Uint32 gid,Uint32 limit); + + /** + * Remove a group + * @param gid The group ID + */ + void removeGroup(Uint32 gid); + + /** + * Set the limit for a group + * @param gid The group ID + * @param limit The limit + */ + void setGroupLimit(Uint32 gid,Uint32 limit); + + /** + * The main function of the thread + */ + void run(); + + /** + * Subclasses must implement this function + */ + virtual void update() = 0; + + /** + * Do one SocketGroup + * @param g The group + * @param allowance The groups allowance + * @param now The current time + * @return true if the group can go again + */ + virtual bool doGroup(SocketGroup* g,Uint32 & allowance,bt::TimeStamp now) = 0; + + /// Stop before the next update + void stop() {running = false;} + + /// Is the thread running + bool isRunning() const {return running;} + + protected: + /** + * Go over all groups and do them + * @param num_ready The number of ready sockets + * @param now The current time + * @param limit The global limit in bytes per sec + */ + void doGroups(Uint32 num_ready,bt::TimeStamp now,bt::Uint32 limit); + + private: + Uint32 doGroupsLimited(Uint32 num_ready,bt::TimeStamp now,Uint32 & allowance); + }; + +} + +#endif diff --git a/libktorrent/net/portlist.cpp b/libktorrent/net/portlist.cpp new file mode 100644 index 0000000..56076ed --- /dev/null +++ b/libktorrent/net/portlist.cpp @@ -0,0 +1,73 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include "portlist.h" + +namespace net +{ + Port::Port() : number(0),proto(TCP),forward(false) + { + } + + Port::Port(bt::Uint16 number,Protocol proto,bool forward) + : number(number),proto(proto),forward(forward) + { + } + + Port::Port(const Port & p) : number(p.number),proto(p.proto),forward(p.forward) + { + } + + bool Port::operator == (const Port & p) const + { + return number == p.number && proto == p.proto; + } + + PortList::PortList() : lst(0) + {} + + + PortList::~PortList() + {} + + + void PortList::addNewPort(bt::Uint16 number,Protocol proto,bool forward) + { + Port p = Port(number,proto,forward); + append(p); + if (lst) + lst->portAdded(p); + } + + + void PortList::removePort(bt::Uint16 number,Protocol proto) + { + PortList::iterator itr = find(Port(number,proto,false)); + if (itr == end()) + return; + + if (lst) + lst->portRemoved(*itr); + + erase(itr); + } + + + +} diff --git a/libktorrent/net/portlist.h b/libktorrent/net/portlist.h new file mode 100644 index 0000000..af60c1c --- /dev/null +++ b/libktorrent/net/portlist.h @@ -0,0 +1,103 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef NETPORTLIST_H +#define NETPORTLIST_H + +#include <qvaluelist.h> +#include <util/constants.h> + +namespace net +{ + enum Protocol + { + TCP, + UDP + }; + + struct Port + { + bt::Uint16 number; + Protocol proto; + bool forward; + + Port(); + Port(bt::Uint16 number,Protocol proto,bool forward); + Port(const Port & p); + + bool operator == (const Port & p) const; + }; + + /** + * Listener class for the PortList. + */ + class PortListener + { + public: + /** + * A port has been added. + * @param port The port + */ + virtual void portAdded(const Port & port) = 0; + + /** + * A port has been removed + * @param port The port + */ + virtual void portRemoved(const Port & port) = 0; + }; + + /** + * @author Joris Guisson <joris.guisson@gmail.com> + * + * List of ports which are currently being used. + * + */ + class PortList : public QValueList<Port> + { + PortListener* lst; + public: + PortList(); + virtual ~PortList(); + + /** + * When a port is in use, this function needs to be called. + * @param number Port number + * @param proto Protocol + * @param forward Wether or not it needs to be forwarded + */ + void addNewPort(bt::Uint16 number,Protocol proto,bool forward); + + /** + * Needs to be called when a port is not being using anymore. + * @param number Port number + * @param proto Protocol + */ + void removePort(bt::Uint16 number,Protocol proto); + + /** + * Set the port listener. + * @param pl Port listener + */ + void setListener(PortListener* pl) {lst = pl;} + }; + +} + +#endif diff --git a/libktorrent/net/socket.cpp b/libktorrent/net/socket.cpp new file mode 100644 index 0000000..b9a53f3 --- /dev/null +++ b/libktorrent/net/socket.cpp @@ -0,0 +1,326 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ + +#include <qglobal.h> + +#include <unistd.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/ioctl.h> +#include <errno.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> + +#if defined(Q_OS_LINUX) && !defined(__FreeBSD_kernel__) +#include <asm/ioctls.h> +#endif + +#ifdef Q_OS_SOLARIS +#include <sys/filio.h> +#endif + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#include <unistd.h> +#include <fcntl.h> + +#include <torrent/globals.h> +#include <util/log.h> +#include "socket.h" + +using namespace bt; + +namespace net +{ + + Socket::Socket(int fd) : m_fd(fd),m_state(IDLE) + { +#if defined(Q_OS_MACX) || defined(Q_OS_DARWIN) || (defined(Q_OS_FREEBSD) && !defined(__DragonFly__) && __FreeBSD_version < 600020) + int val = 1; + if (setsockopt(m_fd,SOL_SOCKET,SO_NOSIGPIPE,&val,sizeof(int)) < 0) + { + Out(SYS_CON|LOG_NOTICE) << QString("Failed to set the NOSIGPIPE option : %1").arg(strerror(errno)) << endl; + } +#endif + cacheAddress(); + } + + Socket::Socket(bool tcp) : m_fd(-1),m_state(IDLE) + { + int fd = socket(PF_INET,tcp ? SOCK_STREAM : SOCK_DGRAM,0); + if (fd < 0) + { + Out(SYS_GEN|LOG_IMPORTANT) << QString("Cannot create socket : %1").arg(strerror(errno)) << endl; + } + m_fd = fd; +#if defined(Q_OS_MACX) || defined(Q_OS_DARWIN) || (defined(Q_OS_FREEBSD) && !defined(__DragonFly__) && __FreeBSD_version < 600020) + int val = 1; + if (setsockopt(m_fd,SOL_SOCKET,SO_NOSIGPIPE,&val,sizeof(int)) < 0) + { + Out(SYS_CON|LOG_NOTICE) << QString("Failed to set the NOSIGPIPE option : %1").arg(strerror(errno)) << endl; + } +#endif + } + + Socket::~Socket() + { + if (m_fd >= 0) + { + shutdown(m_fd, SHUT_RDWR); + ::close(m_fd); + } + } + + void Socket::close() + { + if (m_fd >= 0) + { + shutdown(m_fd, SHUT_RDWR); + ::close(m_fd); + m_fd = -1; + m_state = CLOSED; + } + } + + void Socket::setNonBlocking() + { + fcntl(m_fd, F_SETFL, O_NONBLOCK); + } + + bool Socket::connectTo(const Address & a) + { + struct sockaddr_in addr; + memset(&addr,0,sizeof(struct sockaddr_in)); + addr.sin_family = AF_INET; + addr.sin_port = htons(a.port()); + addr.sin_addr.s_addr = htonl(a.ip()); + + if (::connect(m_fd,(struct sockaddr*)&addr,sizeof(struct sockaddr)) < 0) + { + if (errno == EINPROGRESS) + { + // Out(SYS_CON|LOG_DEBUG) << "Socket is connecting" << endl; + m_state = CONNECTING; + return false; + } + else + { + Out(SYS_CON|LOG_NOTICE) << QString("Cannot connect to host %1:%2 : %3") + .arg(a.toString()).arg(a.port()).arg(strerror(errno)) << endl; + return false; + } + } + m_state = CONNECTED; + cacheAddress(); + return true; + } + + bool Socket::bind(Uint16 port,bool also_listen) + { + struct sockaddr_in addr; + memset(&addr,0,sizeof(struct sockaddr_in)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + + if (::bind(m_fd,(struct sockaddr*)&addr,sizeof(struct sockaddr)) < 0) + { + Out(SYS_CON|LOG_IMPORTANT) << QString("Cannot bind to port %1 : %2").arg(port).arg(strerror(errno)) << endl; + return false; + } + + if (also_listen && listen(m_fd,5) < 0) + { + Out(SYS_CON|LOG_IMPORTANT) << QString("Cannot listen to port %1 : %2").arg(port).arg(strerror(errno)) << endl; + return false; + } + + int val = 1; + if (setsockopt(m_fd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(int)) < 0) + { + Out(SYS_CON|LOG_NOTICE) << QString("Failed to set the reuseaddr option : %1").arg(strerror(errno)) << endl; + } + m_state = BOUND; + return true; + } + + int Socket::send(const bt::Uint8* buf,int len) + { + int ret = ::send(m_fd,buf,len,MSG_NOSIGNAL); + if (ret < 0) + { + if (errno != EAGAIN && errno != EWOULDBLOCK) + { + // Out(SYS_CON|LOG_DEBUG) << "Send error : " << QString(strerror(errno)) << endl; + close(); + } + return 0; + } + return ret; + } + + int Socket::recv(bt::Uint8* buf,int max_len) + { + int ret = ::recv(m_fd,buf,max_len,0); + if (ret < 0) + { + if (errno != EAGAIN && errno != EWOULDBLOCK) + { + // Out(SYS_CON|LOG_DEBUG) << "Receive error : " << QString(strerror(errno)) << endl; + close(); + } + return 0; + } + else if (ret == 0) + { + // connection closed + close(); + return 0; + } + return ret; + } + + int Socket::sendTo(const bt::Uint8* buf,int len,const Address & a) + { + struct sockaddr_in addr; + memset(&addr,0,sizeof(struct sockaddr_in)); + addr.sin_family = AF_INET; + addr.sin_port = htons(a.port()); + addr.sin_addr.s_addr = htonl(a.ip()); + + int ns = 0; + while (ns < len) + { + int left = len - ns; + int ret = ::sendto(m_fd,(char*)buf + ns,left,0,(struct sockaddr*)&addr,sizeof(struct sockaddr)); + if (ret < 0) + { + Out(SYS_CON|LOG_DEBUG) << "Send error : " << QString(strerror(errno)) << endl; + return 0; + } + + ns += ret; + } + return ns; + } + + int Socket::recvFrom(bt::Uint8* buf,int max_len,Address & a) + { + struct sockaddr_in addr; + memset(&addr,0,sizeof(struct sockaddr_in)); + socklen_t sl = sizeof(struct sockaddr); + + int ret = ::recvfrom(m_fd,buf,max_len,0,(struct sockaddr*)&addr,&sl); + if (ret < 0) + { + Out(SYS_CON|LOG_DEBUG) << "Receive error : " << QString(strerror(errno)) << endl; + return 0; + } + + a.setPort(ntohs(addr.sin_port)); + a.setIP(ntohl(addr.sin_addr.s_addr)); + return ret; + } + + int Socket::accept(Address & a) + { + struct sockaddr_in addr; + memset(&addr,0,sizeof(struct sockaddr_in)); + socklen_t slen = sizeof(struct sockaddr_in); + + int sfd = ::accept(m_fd,(struct sockaddr*)&addr,&slen); + if (sfd < 0) + { + Out(SYS_CON|LOG_DEBUG) << "Accept error : " << QString(strerror(errno)) << endl; + return -1; + } + + a.setPort(ntohs(addr.sin_port)); + a.setIP(ntohl(addr.sin_addr.s_addr)); + + Out(SYS_CON|LOG_DEBUG) << "Accepted connection from " << QString(inet_ntoa(addr.sin_addr)) << endl; + return sfd; + } + + bool Socket::setTOS(unsigned char type_of_service) + { +#if defined(Q_OS_MACX) || defined(Q_OS_DARWIN) || (defined(Q_OS_FREEBSD) && __FreeBSD_version < 600020) || defined(Q_OS_NETBSD) || defined(Q_OS_OPENBSD) || defined(Q_OS_BSD4) + unsigned int c = type_of_service; +#else + unsigned char c = type_of_service; +#endif + if (setsockopt(m_fd,IPPROTO_IP,IP_TOS,&c,sizeof(c)) < 0) + { + Out(SYS_CON|LOG_NOTICE) << QString("Failed to set TOS to %1 : %2") + .arg(type_of_service).arg(strerror(errno)) << endl; + return false; + } + return true; + } + + Uint32 Socket::bytesAvailable() const + { + int ret = 0; + if (ioctl(m_fd,FIONREAD,&ret) < 0) + return 0; + + return ret; + } + + bool Socket::connectSuccesFull() + { + if (m_state != CONNECTING) + return false; + + int err = 0; + socklen_t len = sizeof(int); + if (getsockopt(m_fd,SOL_SOCKET,SO_ERROR,&err,&len) < 0) + return false; + + if (err == 0) + { + m_state = CONNECTED; + cacheAddress(); + } + + return err == 0; + } + + void Socket::cacheAddress() + { + struct sockaddr_in raddr; + socklen_t slen = sizeof(struct sockaddr_in); + if (getpeername(m_fd,(struct sockaddr*)&raddr,&slen) == 0) + addr = Address(inet_ntoa(raddr.sin_addr),ntohs(raddr.sin_port)); + } + + /* + void Socket::setReadBufferSize(int rbs) + { + if (setsockopt(m_fd, SOL_SOCKET, SO_RCVBUF, (char *)&rbs,sizeof(int)) < 0) + { + Out(SYS_CON|LOG_DEBUG) << "Failed to set read buffer size " << endl; + } + } + */ +} diff --git a/libktorrent/net/socket.h b/libktorrent/net/socket.h new file mode 100644 index 0000000..db8953b --- /dev/null +++ b/libktorrent/net/socket.h @@ -0,0 +1,83 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef NETSOCKET_H +#define NETSOCKET_H + +#include <util/constants.h> +#include "address.h" + +namespace net +{ + + /** + @author Joris Guisson <joris.guisson@gmail.com> + */ + class Socket + { + public: + enum State + { + IDLE, + CONNECTING, + CONNECTED, + BOUND, + CLOSED + }; + + Socket(int fd); + Socket(bool tcp); + virtual ~Socket(); + + void setNonBlocking(); + bool connectTo(const Address & addr); + /// See if a connectTo was succesfull in non blocking mode + bool connectSuccesFull(); + bool bind(Uint16 port,bool also_listen); + int send(const bt::Uint8* buf,int len); + int recv(bt::Uint8* buf,int max_len); + int sendTo(const bt::Uint8* buf,int size,const Address & addr); + int recvFrom(bt::Uint8* buf,int max_size,Address & addr); + int accept(Address & a); + bool ok() const {return m_fd >= 0;} + int fd() const {return m_fd;} + bool setTOS(unsigned char type_of_service); + const Address & getPeerName() const {return addr;} + void close(); + State state() const {return m_state;} + + /** + * Set the size of the TCP read buffer. + * @param rbs + */ +// void setReadBufferSize(Uint32 rbs); + + Uint32 bytesAvailable() const; + private: + void cacheAddress(); + + private: + int m_fd; + State m_state; + Address addr; + }; + +} + +#endif diff --git a/libktorrent/net/socketgroup.cpp b/libktorrent/net/socketgroup.cpp new file mode 100644 index 0000000..8c9c5e7 --- /dev/null +++ b/libktorrent/net/socketgroup.cpp @@ -0,0 +1,186 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <math.h> +#include <util/log.h> +#include <util/functions.h> +#include "socketgroup.h" +#include "bufferedsocket.h" + +using namespace bt; + +namespace net +{ + + SocketGroup::SocketGroup(Uint32 limit) : limit(limit) + { + prev_run_time = bt::GetCurrentTime(); + group_allowance = 0; + } + + + SocketGroup::~SocketGroup() + {} + + void SocketGroup::processUnlimited(bool up,bt::TimeStamp now) + { + std::list<BufferedSocket*>::iterator i = sockets.begin(); + while (i != sockets.end()) + { + BufferedSocket* s = *i; + if (s) + { + if (up) + s->writeBuffered(0,now); + else + s->readBuffered(0,now); + } + i++; + } + } + + bool SocketGroup::processLimited(bool up,bt::TimeStamp now,Uint32 & allowance) + { + Uint32 bslot = allowance / sockets.size() + 1; + + std::list<BufferedSocket*>::iterator itr = sockets.begin(); + + // while we can send and there are sockets left to send + while (sockets.size() > 0 && allowance > 0) + { + Uint32 as = bslot; + if (as > allowance) + as = allowance; + + BufferedSocket* s = *itr; + if (s) + { + Uint32 ret = 0; + if (up) + ret = s->writeBuffered(as,now); + else + ret = s->readBuffered(as,now); + + // if this socket did what it was supposed to do, + // it can have another go if stuff is leftover + // if it doesn't, we erase it from the list + if (ret != as) + itr = sockets.erase(itr); + else + itr++; + + if (ret > allowance) + allowance = 0; + else + allowance -= ret; + } + else + { + // 0 pointer so just erase + itr = sockets.erase(itr); + } + + // wrap around if necessary + if (itr == sockets.end()) + itr = sockets.begin(); + } + + return sockets.size() > 0; + } + + bool SocketGroup::download(Uint32 & global_allowance,bt::TimeStamp now) + { + return process(false,now,global_allowance); + } + + bool SocketGroup::upload(Uint32 & global_allowance,bt::TimeStamp now) + { + return process(true,now,global_allowance); + } + + void SocketGroup::calcAllowance(bt::TimeStamp now) + { + if (limit > 0) + group_allowance = (Uint32)ceil(1.02 * limit * (now - prev_run_time) * 0.001); + else + group_allowance = 0; + prev_run_time = now; + } + + bool SocketGroup::process(bool up,bt::TimeStamp now,Uint32 & global_allowance) + { + if (limit > 0) + { + bool ret = false; + if (global_allowance == 0) + { + Uint32 p = group_allowance; + ret = processLimited(up,now,p); + group_allowance = p; + } + else if (global_allowance <= group_allowance) + { + Uint32 tmp = global_allowance; + ret = processLimited(up,now,tmp); + + Uint32 done = (global_allowance - tmp); + if (group_allowance < done) + group_allowance = 0; + else + group_allowance -= done; + + global_allowance = tmp; + } + else + { + Uint32 p = group_allowance; + ret = processLimited(up,now,p); + + Uint32 done = (group_allowance - p); + if (global_allowance < done) + global_allowance = 0; + else + global_allowance -= done; + + group_allowance = p; + } + + // if group allowance is used up, this group can no longer do anything + if (group_allowance == 0) + { + clear(); + return false; + } + else + return ret; + } + else if (global_allowance > 0) + { + return processLimited(up,now,global_allowance); + } + else + { + processUnlimited(up,now); + return false; + } + } + + + +} diff --git a/libktorrent/net/socketgroup.h b/libktorrent/net/socketgroup.h new file mode 100644 index 0000000..ba08029 --- /dev/null +++ b/libktorrent/net/socketgroup.h @@ -0,0 +1,90 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef NETSOCKETGROUP_H +#define NETSOCKETGROUP_H + +#include <list> +#include <util/constants.h> + +namespace net +{ + using bt::Uint32; + + class BufferedSocket; + + /** + @author Joris Guisson <joris.guisson@gmail.com> + */ + class SocketGroup + { + Uint32 limit; + std::list<BufferedSocket*> sockets; + bt::TimeStamp prev_run_time; + Uint32 group_allowance; + public: + SocketGroup(Uint32 limit); + virtual ~SocketGroup(); + + /// Clear the lists of sockets + void clear() {sockets.clear();} + + /// Add a socket for processing + void add(BufferedSocket* s) {sockets.push_back(s);} + + /** + Process all the sockets in the vector for download. + @param global_allowance How much the group can do, this will be updated, 0 means no limit + @param now Current time + @return true if we can download more data, false otherwise + */ + bool download(Uint32 & global_allowance,bt::TimeStamp now); + + /** + Process all the sockets in the vector for upload + @param global_allowance How much the group can do, this will be updated, 0 means no limit + @param now Current time + @return true if we can upload more data, false otherwise + */ + bool upload(Uint32 & global_allowance,bt::TimeStamp now); + + /** + * Set the group limit in bytes per sec + * @param lim The limit + */ + void setLimit(Uint32 lim) {limit = lim;} + + /// Get the number of sockets + Uint32 numSockets() const {return sockets.size();} + + /** + * Calculate the allowance for this group + * @param now Current timestamp + */ + void calcAllowance(bt::TimeStamp now); + private: + void processUnlimited(bool up,bt::TimeStamp now); + bool processLimited(bool up,bt::TimeStamp now,Uint32 & allowance); + bool process(bool up,bt::TimeStamp now,Uint32 & global_allowance); + }; + + +} + +#endif diff --git a/libktorrent/net/socketmonitor.cpp b/libktorrent/net/socketmonitor.cpp new file mode 100644 index 0000000..38225ab --- /dev/null +++ b/libktorrent/net/socketmonitor.cpp @@ -0,0 +1,173 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <math.h> +#include <unistd.h> +#include <util/functions.h> +#include <util/log.h> +#include <torrent/globals.h> +#include "socketmonitor.h" +#include "bufferedsocket.h" +#include "uploadthread.h" +#include "downloadthread.h" + +using namespace bt; + +namespace net +{ + SocketMonitor SocketMonitor::self; + + SocketMonitor::SocketMonitor() : ut(0),dt(0),next_group_id(1) + { + dt = new DownloadThread(this); + ut = new UploadThread(this); + } + + + SocketMonitor::~SocketMonitor() + { + if (ut && ut->isRunning()) + { + ut->stop(); + ut->signalDataReady(); // kick it in the nuts, if the thread is waiting for data + if (!ut->wait(250)) + { + ut->terminate(); + ut->wait(); + } + } + + + if (dt && dt->isRunning()) + { + dt->stop(); + if (!dt->wait(250)) + { + dt->terminate(); + dt->wait(); + } + } + + delete ut; + delete dt; + } + + void SocketMonitor::lock() + { + mutex.lock(); + } + + void SocketMonitor::unlock() + { + mutex.unlock(); + } + + void SocketMonitor::setDownloadCap(Uint32 bytes_per_sec) + { + DownloadThread::setCap(bytes_per_sec); + } + + void SocketMonitor::setUploadCap(Uint32 bytes_per_sec) + { + UploadThread::setCap(bytes_per_sec); + } + + void SocketMonitor::setSleepTime(Uint32 sleep_time) + { + DownloadThread::setSleepTime(sleep_time); + UploadThread::setSleepTime(sleep_time); + } + + void SocketMonitor::add(BufferedSocket* sock) + { + QMutexLocker lock(&mutex); + + bool start_threads = smap.count() == 0; + smap.append(sock); + + if (start_threads) + { + Out(SYS_CON|LOG_DEBUG) << "Starting socketmonitor threads" << endl; + + if (!dt->isRunning()) + dt->start(QThread::IdlePriority); + if (!ut->isRunning()) + ut->start(QThread::IdlePriority); + } + } + + void SocketMonitor::remove(BufferedSocket* sock) + { + QMutexLocker lock(&mutex); + if (smap.count() == 0) + return; + + smap.remove(sock); + if (smap.count() == 0) + { + Out(SYS_CON|LOG_DEBUG) << "Stopping socketmonitor threads" << endl; + if (dt && dt->isRunning()) + dt->stop(); + if (ut && ut->isRunning()) + { + ut->stop(); + ut->signalDataReady(); + } + } + } + + void SocketMonitor::signalPacketReady() + { + if (ut) + ut->signalDataReady(); + } + + Uint32 SocketMonitor::newGroup(GroupType type,Uint32 limit) + { + lock(); + Uint32 gid = next_group_id++; + if (type == UPLOAD_GROUP) + ut->addGroup(gid,limit); + else + dt->addGroup(gid,limit); + unlock(); + return gid; + } + + void SocketMonitor::setGroupLimit(GroupType type,Uint32 gid,Uint32 limit) + { + lock(); + if (type == UPLOAD_GROUP) + ut->setGroupLimit(gid,limit); + else + dt->setGroupLimit(gid,limit); + unlock(); + } + + void SocketMonitor::removeGroup(GroupType type,Uint32 gid) + { + lock(); + if (type == UPLOAD_GROUP) + ut->removeGroup(gid); + else + dt->removeGroup(gid); + unlock(); + } + +} diff --git a/libktorrent/net/socketmonitor.h b/libktorrent/net/socketmonitor.h new file mode 100644 index 0000000..79e4a2e --- /dev/null +++ b/libktorrent/net/socketmonitor.h @@ -0,0 +1,118 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef NETSOCKETMONITOR_H +#define NETSOCKETMONITOR_H + + +#include <qmutex.h> +#include <qptrlist.h> +#include <util/constants.h> + + +namespace net +{ + using bt::Uint32; + + class BufferedSocket; + class UploadThread; + class DownloadThread; + + /** + * @author Joris Guisson <joris.guisson@gmail.com> + * + * Monitors all sockets for upload and download traffic. + * It uses two threads to do this. + */ + class SocketMonitor + { + static SocketMonitor self; + + QMutex mutex; + UploadThread* ut; + DownloadThread* dt; + QPtrList<BufferedSocket> smap; + Uint32 next_group_id; + + SocketMonitor(); + public: + virtual ~SocketMonitor(); + + /// Add a new socket, will start the threads if necessary + void add(BufferedSocket* sock); + + /// Remove a socket, will stop threads if no more sockets are left + void remove(BufferedSocket* sock); + + enum GroupType + { + UPLOAD_GROUP, + DOWNLOAD_GROUP + }; + + + /** + * Creata a new upload or download group + * @param type Wether it is an upload or download group + * @param limit Limit of group in bytes/s + * @return The group ID + */ + Uint32 newGroup(GroupType type,Uint32 limit); + + /** + * Change the group limit + * @param type The group type + * @param gid The group id + * @param limit The limit + */ + void setGroupLimit(GroupType type,Uint32 gid,Uint32 limit); + + /** + * Remove a group + * @param type The group type + * @param gid The group id + */ + void removeGroup(GroupType type,Uint32 gid); + + typedef QPtrList<BufferedSocket>::iterator Itr; + + /// Get the begin of the list of sockets + Itr begin() {return smap.begin();} + + /// Get the end of the list of sockets + Itr end() {return smap.end();} + + /// lock the monitor + void lock(); + + /// unlock the monitor + void unlock(); + + /// Tell upload thread a packet is ready + void signalPacketReady(); + + static void setDownloadCap(Uint32 bytes_per_sec); + static void setUploadCap(Uint32 bytes_per_sec); + static void setSleepTime(Uint32 sleep_time); + static SocketMonitor & instance() {return self;} + }; + +} + +#endif diff --git a/libktorrent/net/speed.cpp b/libktorrent/net/speed.cpp new file mode 100644 index 0000000..aa57513 --- /dev/null +++ b/libktorrent/net/speed.cpp @@ -0,0 +1,78 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <util/log.h> +#include <util/timer.h> +#include <util/functions.h> +#include "speed.h" + +using namespace bt; + +namespace net +{ + const Uint64 SPEED_INTERVAL = 5000; + + Speed::Speed() : rate(0),bytes(0) + {} + + + Speed::~Speed() + {} + + void Speed::onData(Uint32 b,bt::TimeStamp ts) + { + dlrate.append(qMakePair(b,ts)); + bytes += b; + } + + void Speed::update(bt::TimeStamp now) + { + QValueList<QPair<Uint32,TimeStamp> >::iterator i = dlrate.begin(); + while (i != dlrate.end()) + { + QPair<Uint32,TimeStamp> & p = *i; + if (now - p.second > SPEED_INTERVAL || now < p.second) + { + if (bytes >= p.first) // make sure we don't wrap around + bytes -= p.first; // subtract bytes + else + bytes = 0; + i = dlrate.erase(i); + } + else + { + // seeing that newer entries are appended, they are in the list chronologically + // so once we hit an entry which is in the interval, we can just break out of the loop + // because all following entries will be in the interval + break; + } + } + + if (bytes == 0) + { + rate = 0; + } + else + { + // Out() << "bytes = " << bytes << " d = " << d << endl; + rate = (float) bytes / (float)(SPEED_INTERVAL * 0.001); + } + } + +} diff --git a/libktorrent/net/speed.h b/libktorrent/net/speed.h new file mode 100644 index 0000000..d5825e9 --- /dev/null +++ b/libktorrent/net/speed.h @@ -0,0 +1,51 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef NETSPEED_H +#define NETSPEED_H + +#include <qpair.h> +#include <qvaluelist.h> +#include <util/constants.h> + +namespace net +{ + + /** + @author Joris Guisson <joris.guisson@gmail.com> + + Measures the download and upload speed. + */ + class Speed + { + float rate; + bt::Uint32 bytes; + QValueList<QPair<bt::Uint32,bt::TimeStamp> > dlrate; + public: + Speed(); + virtual ~Speed(); + + void onData(bt::Uint32 bytes,bt::TimeStamp ts); + void update(bt::TimeStamp now); + float getRate() const {return rate;} + }; + +} + +#endif diff --git a/libktorrent/net/uploadthread.cpp b/libktorrent/net/uploadthread.cpp new file mode 100644 index 0000000..0023cf6 --- /dev/null +++ b/libktorrent/net/uploadthread.cpp @@ -0,0 +1,91 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <math.h> +#include <util/functions.h> +#include "uploadthread.h" +#include "socketmonitor.h" +#include "bufferedsocket.h" +#include "socketgroup.h" + +using namespace bt; + +namespace net +{ + Uint32 UploadThread::ucap = 0; + Uint32 UploadThread::sleep_time = 3; + + UploadThread::UploadThread(SocketMonitor* sm) : NetworkThread(sm) + {} + + + UploadThread::~UploadThread() + {} + + + void UploadThread::update() + { + sm->lock(); + bt::TimeStamp now = bt::Now(); + + Uint32 num_ready = 0; + // loop over all sockets and see which ones have data ready + SocketMonitor::Itr itr = sm->begin(); + while (itr != sm->end()) + { + BufferedSocket* s = *itr; + if (s && s->ok() && s->bytesReadyToWrite()) + { + SocketGroup* g = groups.find(s->uploadGroupID()); + if (!g) + g = groups.find(0); + + g->add(s); + num_ready++; + } + itr++; + } + + if (num_ready > 0) + doGroups(num_ready,now,ucap); + prev_run_time = now; + sm->unlock(); + + if (num_ready == 0) // nobody was ready so go to sleep + data_ready.wait(); + else + msleep(sleep_time); + } + + void UploadThread::signalDataReady() + { + data_ready.wakeOne(); + } + + void UploadThread::setSleepTime(Uint32 stime) + { + if (stime >= 1 && stime <= 10) + sleep_time = stime; + } + + bool UploadThread::doGroup(SocketGroup* g,Uint32 & allowance,bt::TimeStamp now) + { + return g->upload(allowance,now); + } +} diff --git a/libktorrent/net/uploadthread.h b/libktorrent/net/uploadthread.h new file mode 100644 index 0000000..265abac --- /dev/null +++ b/libktorrent/net/uploadthread.h @@ -0,0 +1,61 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef NETUPLOADTHREAD_H +#define NETUPLOADTHREAD_H + + + +#include <qwaitcondition.h> +#include "networkthread.h" + +namespace net +{ + class SocketMonitor; + class BufferedSocket; + + /** + @author Joris Guisson <joris.guisson@gmail.com> + */ + class UploadThread : public NetworkThread + { + static bt::Uint32 ucap; + static bt::Uint32 sleep_time; + + QWaitCondition data_ready; + public: + UploadThread(SocketMonitor* sm); + virtual ~UploadThread(); + + /// Wake up thread, data is ready to be sent + void signalDataReady(); + + /// Set the upload cap + static void setCap(bt::Uint32 uc) {ucap = uc;} + + /// Set the sleep time when using upload caps + static void setSleepTime(bt::Uint32 stime); + private: + virtual void update(); + virtual bool doGroup(SocketGroup* g,Uint32 & allowance,bt::TimeStamp now); + }; + +} + +#endif |