diff options
Diffstat (limited to 'libktorrent/torrent/downloader.cpp')
-rw-r--r-- | libktorrent/torrent/downloader.cpp | 688 |
1 files changed, 688 insertions, 0 deletions
diff --git a/libktorrent/torrent/downloader.cpp b/libktorrent/torrent/downloader.cpp new file mode 100644 index 0000000..b8acdc7 --- /dev/null +++ b/libktorrent/torrent/downloader.cpp @@ -0,0 +1,688 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * joris.guisson@gmail.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <util/file.h> +#include <util/log.h> +#include "downloader.h" +#include "chunkmanager.h" +#include "torrent.h" +#include "peermanager.h" +#include <util/error.h> +#include "chunkdownload.h" +#include <util/sha1hash.h> +#include <util/array.h> +#include "peer.h" +#include "piece.h" +#include "peerdownloader.h" +#include <interfaces/functions.h> +#include <interfaces/monitorinterface.h> +#include "packetwriter.h" +#include "chunkselector.h" +#include "ipblocklist.h" +#include "ktversion.h" + +namespace bt +{ + + + + Downloader::Downloader(Torrent & tor,PeerManager & pman,ChunkManager & cman) + : tor(tor),pman(pman),cman(cman),downloaded(0),tmon(0) + { + chunk_selector = new ChunkSelector(cman,*this,pman); + Uint64 total = tor.getFileLength(); + downloaded = (total - cman.bytesLeft()); + curr_chunks_downloaded = 0; + unnecessary_data = 0; + + current_chunks.setAutoDelete(true); + connect(&pman,SIGNAL(newPeer(Peer* )),this,SLOT(onNewPeer(Peer* ))); + connect(&pman,SIGNAL(peerKilled(Peer* )),this,SLOT(onPeerKilled(Peer*))); + } + + + Downloader::~Downloader() + { + delete chunk_selector; + } + + void Downloader::pieceRecieved(const Piece & p) + { + if (cman.completed()) + return; + + ChunkDownload* cd = 0; + + for (CurChunkItr j = current_chunks.begin();j != current_chunks.end();++j) + { + if (p.getIndex() != j->first) + continue; + + cd = j->second; + break; + } + + if (!cd) + { + unnecessary_data += p.getLength(); + Out(SYS_DIO|LOG_DEBUG) << + "Unnecessary piece, total unnecessary data : " << kt::BytesToString(unnecessary_data) << endl; + return; + } + + // if the chunk is not in memory, reload it + if (cd->getChunk()->getStatus() == Chunk::ON_DISK) + { + cman.prepareChunk(cd->getChunk(),true); + } + + bool ok = false; + + if (cd->piece(p,ok)) + { + if (tmon) + tmon->downloadRemoved(cd); + + if (ok) + downloaded += p.getLength(); + + if (!finished(cd)) + { + // if the chunk fails don't count the bytes downloaded + if (cd->getChunk()->getSize() > downloaded) + downloaded = 0; + else + downloaded -= cd->getChunk()->getSize(); + } + current_chunks.erase(p.getIndex()); + update(); // run an update to assign new pieces + } + else + { + if (ok) + downloaded += p.getLength(); + + // save to disk again, if it is idle + if (cd->isIdle() && cd->getChunk()->getStatus() == Chunk::MMAPPED) + { + cman.saveChunk(cd->getChunk()->getIndex(),false); + } + } + + if (!ok) + { + unnecessary_data += p.getLength(); + Out(SYS_DIO|LOG_DEBUG) << + "Unnecessary piece, total unnecessary data : " << kt::BytesToString(unnecessary_data) << endl; + } + } + + void Downloader::update() + { + if (cman.completed()) + return; + + /* + Normal update should now handle all modes properly. + */ + normalUpdate(); + + // now see if there aren't any timed out pieces + for (Uint32 i = 0;i < pman.getNumConnectedPeers();i++) + { + Peer* p = pman.getPeer(i); + p->getPeerDownloader()->checkTimeouts(); + } + } + + + void Downloader::normalUpdate() + { + for (CurChunkItr j = current_chunks.begin();j != current_chunks.end();++j) + { + ChunkDownload* cd = j->second; + if (cd->isIdle()) // idle chunks do not need to be in memory + { + Chunk* c = cd->getChunk(); + if (c->getStatus() == Chunk::MMAPPED) + { + cman.saveChunk(cd->getChunk()->getIndex(),false); + } + } + else if (cd->isChoked()) + { + cd->releaseAllPDs(); + Chunk* c = cd->getChunk(); + if (c->getStatus() == Chunk::MMAPPED) + { + cman.saveChunk(cd->getChunk()->getIndex(),false); + } + } + else if (cd->needsToBeUpdated()) + { + cd->update(); + } + } + + for (Uint32 i = 0; i < pman.getNumConnectedPeers();++i) + { + PeerDownloader* pd = pman.getPeer(i)->getPeerDownloader(); + + if (pd->isNull()) + continue; + + bool ok = + (pd->getNumGrabbed() < pd->getMaxChunkDownloads() || + pd->isNearlyDone()) && + pd->canAddRequest(); + + + if (ok) + { + if (!pd->isChoked()) + downloadFrom(pd); + + pd->setNearlyDone(false); + } + } + } + + Uint32 Downloader::maxMemoryUsage() + { + Uint32 max = 1024 * 1024; + switch (mem_usage) + { + case 1: // Medium + max *= 60; // 60 MB + break; + case 2: // High + max *= 80; // 90 MB + break; + case 0: // LOW + default: + max *= 40; // 30 MB + break; + } + return max; + } + + Uint32 Downloader::numNonIdle() + { + Uint32 num_non_idle = 0; + for (CurChunkItr j = current_chunks.begin();j != current_chunks.end();++j) + { + ChunkDownload* cd = j->second; + if (!cd->isIdle()) + num_non_idle++; + } + return num_non_idle; + } + + ChunkDownload* Downloader::selectCD(PeerDownloader* pd,Uint32 num) + { + ChunkDownload* sel = 0; + Uint32 sel_left = 0xFFFFFFFF; + + for (CurChunkItr j = current_chunks.begin();j != current_chunks.end();++j) + { + ChunkDownload* cd = j->second; + if (pd->isChoked() || !pd->hasChunk(cd->getChunk()->getIndex())) + continue; + + if (cd->getNumDownloaders() == num) + { + // lets favor the ones which are nearly finished + if (!sel || cd->getTotalPieces() - cd->getPiecesDownloaded() < sel_left) + { + sel = cd; + sel_left = sel->getTotalPieces() - sel->getPiecesDownloaded(); + } + } + } + return sel; + } + + bool Downloader::findDownloadForPD(PeerDownloader* pd,bool warmup) + { + ChunkDownload* sel = 0; + + // first see if there are ChunkDownload's which need a PeerDownloader + sel = selectCD(pd,0); + + if (!sel && warmup) + { + // if we couldn't find one, try to select another + // which only has one downloader + // so that during warmup, there are at the most 2 downloaders + // assigned to one peer + sel = selectCD(pd,1); + } + + if (sel) + { + // if it is on disk, reload it + if (sel->getChunk()->getStatus() == Chunk::ON_DISK) + cman.prepareChunk(sel->getChunk(),true); + + sel->assignPeer(pd); + return true; + } + + return false; + } + + ChunkDownload* Downloader::selectWorst(PeerDownloader* pd) + { + ChunkDownload* cdmin = NULL; + for (CurChunkItr j = current_chunks.begin();j != current_chunks.end();++j) + { + ChunkDownload* cd = j->second; + if (!pd->hasChunk(cd->getChunk()->getIndex()) || cd->containsPeer(pd)) + continue; + + if (!cdmin) + cdmin = cd; + else if (cd->getDownloadSpeed() < cdmin->getDownloadSpeed()) + cdmin = cd; + else if (cd->getNumDownloaders() < cdmin->getNumDownloaders()) + cdmin = cd; + } + + return cdmin; + } + + void Downloader::downloadFrom(PeerDownloader* pd) + { + // calculate the max memory usage + Uint32 max = maxMemoryUsage(); + // calculate number of non idle chunks + Uint32 num_non_idle = numNonIdle(); + + // first see if we can use an existing dowload + if (findDownloadForPD(pd,cman.getNumChunks() - cman.chunksLeft() <= 4)) + return; + + bool limit_exceeded = num_non_idle * tor.getChunkSize() >= max; + + Uint32 chunk = 0; + if (!limit_exceeded && chunk_selector->select(pd,chunk)) + { + Chunk* c = cman.getChunk(chunk); + if (cman.prepareChunk(c)) + { + ChunkDownload* cd = new ChunkDownload(c); + current_chunks.insert(chunk,cd); + cd->assignPeer(pd); + if (tmon) + tmon->downloadStarted(cd); + } + } + else if (pd->getNumGrabbed() == 0) + { + // If the peer hasn't got a chunk we want, + ChunkDownload *cdmin = selectWorst(pd); + + if (cdmin) + { + // if it is on disk, reload it + if (cdmin->getChunk()->getStatus() == Chunk::ON_DISK) + { + cman.prepareChunk(cdmin->getChunk(),true); + } + + cdmin->assignPeer(pd); + } + } + } + + + bool Downloader::areWeDownloading(Uint32 chunk) const + { + return current_chunks.find(chunk) != 0; + } + + void Downloader::onNewPeer(Peer* peer) + { + PeerDownloader* pd = peer->getPeerDownloader(); + connect(pd,SIGNAL(downloaded(const Piece& )), + this,SLOT(pieceRecieved(const Piece& ))); + } + + void Downloader::onPeerKilled(Peer* peer) + { + PeerDownloader* pd = peer->getPeerDownloader(); + if (pd) + { + for (CurChunkItr i = current_chunks.begin();i != current_chunks.end();++i) + { + ChunkDownload* cd = i->second; + cd->peerKilled(pd); + } + } + } + + bool Downloader::finished(ChunkDownload* cd) + { + Chunk* c = cd->getChunk(); + // verify the data + SHA1Hash h; + if (cd->usingContinuousHashing()) + h = cd->getHash(); + else + h = SHA1Hash::generate(c->getData(),c->getSize()); + + if (tor.verifyHash(h,c->getIndex())) + { + // hash ok so save it + try + { + cman.saveChunk(c->getIndex()); + Out(SYS_GEN|LOG_NOTICE) << "Chunk " << c->getIndex() << " downloaded " << endl; + // tell everybody we have the Chunk + for (Uint32 i = 0;i < pman.getNumConnectedPeers();i++) + { + pman.getPeer(i)->getPacketWriter().sendHave(c->getIndex()); + } + } + catch (Error & e) + { + Out(SYS_DIO|LOG_IMPORTANT) << "Error " << e.toString() << endl; + emit ioError(e.toString()); + return false; + } + } + else + { + Out(SYS_GEN|LOG_IMPORTANT) << "Hash verification error on chunk " << c->getIndex() << endl; + Out(SYS_GEN|LOG_IMPORTANT) << "Is : " << h << endl; + Out(SYS_GEN|LOG_IMPORTANT) << "Should be : " << tor.getHash(c->getIndex()) << endl; + + cman.resetChunk(c->getIndex()); + chunk_selector->reinsert(c->getIndex()); + Uint32 pid; + if (cd->getOnlyDownloader(pid)) + { + Peer* p = pman.findPeer(pid); + if (!p) + return false; + QString IP(p->getIPAddresss()); + Out(SYS_GEN|LOG_NOTICE) << "Peer " << IP << " sent bad data" << endl; + IPBlocklist & ipfilter = IPBlocklist::instance(); + ipfilter.insert( IP ); + p->kill(); + } + return false; + } + return true; + } + + void Downloader::clearDownloads() + { + for (CurChunkItr i = current_chunks.begin();i != current_chunks.end();++i) + { + Uint32 ch = i->first; + Chunk* c = i->second->getChunk(); + if (c->getStatus() == Chunk::MMAPPED) + cman.saveChunk(ch,false); + + c->setStatus(Chunk::NOT_DOWNLOADED); + } + current_chunks.clear(); + } + + Uint32 Downloader::downloadRate() const + { + // sum of the download rate of each peer + Uint32 rate = 0; + for (Uint32 i = 0;i < pman.getNumConnectedPeers();i++) + { + Peer* p = pman.getPeer(i); + rate += p->getDownloadRate(); + } + return rate; + } + + void Downloader::setMonitor(kt::MonitorInterface* tmo) + { + tmon = tmo; + if (!tmon) + return; + + for (CurChunkItr i = current_chunks.begin();i != current_chunks.end();++i) + { + ChunkDownload* cd = i->second; + tmon->downloadStarted(cd); + } + } + + + + void Downloader::saveDownloads(const QString & file) + { + File fptr; + if (!fptr.open(file,"wb")) + return; + + // Save all the current downloads to a file + CurrentChunksHeader hdr; + hdr.magic = CURRENT_CHUNK_MAGIC; + hdr.major = kt::MAJOR; + hdr.minor = kt::MINOR; + hdr.num_chunks = current_chunks.count(); + fptr.write(&hdr,sizeof(CurrentChunksHeader)); + +// Out() << "sizeof(CurrentChunksHeader)" << sizeof(CurrentChunksHeader) << endl; + Out() << "Saving " << current_chunks.count() << " chunk downloads" << endl; + for (CurChunkItr i = current_chunks.begin();i != current_chunks.end();++i) + { + ChunkDownload* cd = i->second; + cd->save(fptr); + } + } + + void Downloader::loadDownloads(const QString & file) + { + // don't load stuff if download is finished + if (cman.completed()) + return; + + // Load all partial downloads + File fptr; + if (!fptr.open(file,"rb")) + return; + + // recalculate downloaded bytes + downloaded = (tor.getFileLength() - cman.bytesLeft()); + + CurrentChunksHeader chdr; + fptr.read(&chdr,sizeof(CurrentChunksHeader)); + if (chdr.magic != CURRENT_CHUNK_MAGIC) + { + Out() << "Warning : current_chunks file corrupted" << endl; + return; + } + + Out() << "Loading " << chdr.num_chunks << " active chunk downloads" << endl; + for (Uint32 i = 0;i < chdr.num_chunks;i++) + { + ChunkDownloadHeader hdr; + // first read header + fptr.read(&hdr,sizeof(ChunkDownloadHeader)); + Out() << "Loading chunk " << hdr.index << endl; + if (hdr.index >= tor.getNumChunks()) + { + Out() << "Warning : current_chunks file corrupted, invalid index " << hdr.index << endl; + return; + } + + if (!cman.getChunk(hdr.index) || current_chunks.contains(hdr.index)) + { + Out() << "Illegal chunk " << hdr.index << endl; + return; + } + Chunk* c = cman.getChunk(hdr.index); + if (!c->isExcluded() && cman.prepareChunk(c)) + { + ChunkDownload* cd = new ChunkDownload(c); + bool ret = false; + try + { + ret = cd->load(fptr,hdr); + } + catch (...) + { + ret = false; + } + + if (!ret) + { + delete cd; + } + else + { + current_chunks.insert(hdr.index,cd); + downloaded += cd->bytesDownloaded(); + + if (tmon) + tmon->downloadStarted(cd); + } + } + } + + // reset curr_chunks_downloaded to 0 + curr_chunks_downloaded = 0; + } + + Uint32 Downloader::getDownloadedBytesOfCurrentChunksFile(const QString & file) + { + // Load all partial downloads + File fptr; + if (!fptr.open(file,"rb")) + return 0; + + // read the number of chunks + CurrentChunksHeader chdr; + fptr.read(&chdr,sizeof(CurrentChunksHeader)); + if (chdr.magic != CURRENT_CHUNK_MAGIC) + { + Out() << "Warning : current_chunks file corrupted" << endl; + return 0; + } + Uint32 num_bytes = 0; + + // load all chunks and calculate how much is downloaded + for (Uint32 i = 0;i < chdr.num_chunks;i++) + { + // read the chunkdownload header + ChunkDownloadHeader hdr; + fptr.read(&hdr,sizeof(ChunkDownloadHeader)); + + Chunk* c = cman.getChunk(hdr.index); + if (!c) + return num_bytes; + + Uint32 last_size = c->getSize() % MAX_PIECE_LEN; + if (last_size == 0) + last_size = MAX_PIECE_LEN; + + // create the bitset and read it + BitSet bs(hdr.num_bits); + fptr.read(bs.getData(),bs.getNumBytes()); + + for (Uint32 j = 0;j < hdr.num_bits;j++) + { + if (bs.get(j)) + num_bytes += j == hdr.num_bits - 1 ? + last_size : MAX_PIECE_LEN; + } + + if (hdr.buffered) + fptr.seek(File::CURRENT,c->getSize()); + } + curr_chunks_downloaded = num_bytes; + return num_bytes; + } + + bool Downloader::isFinished() const + { + return cman.completed(); + } + + void Downloader::onExcluded(Uint32 from,Uint32 to) + { + for (Uint32 i = from;i <= to;i++) + { + ChunkDownload* cd = current_chunks.find(i); + // let only seed chunks finish + if (!cd || cman.getChunk(i)->getPriority() == ONLY_SEED_PRIORITY) + continue; + + cd->cancelAll(); + cd->releaseAllPDs(); + if (tmon) + tmon->downloadRemoved(cd); + current_chunks.erase(i); + cman.resetChunk(i); // reset chunk it is not fully downloaded yet + } + } + + void Downloader::onIncluded(Uint32 from,Uint32 to) + { + chunk_selector->reincluded(from,to); + } + + void Downloader::corrupted(Uint32 chunk) + { + chunk_selector->reinsert(chunk); + } + + Uint32 Downloader::mem_usage = 0; + + void Downloader::setMemoryUsage(Uint32 m) + { + mem_usage = m; +// PeerDownloader::setMemoryUsage(m); + } + + void Downloader::dataChecked(const BitSet & ok_chunks) + { + for (Uint32 i = 0;i < ok_chunks.getNumBits();i++) + { + ChunkDownload* cd = current_chunks.find(i); + if (ok_chunks.get(i) && cd) + { + // we have a chunk and we are downloading it so kill it + cd->releaseAllPDs(); + if (tmon) + tmon->downloadRemoved(cd); + + current_chunks.erase(i); + } + } + chunk_selector->dataChecked(ok_chunks); + } + + void Downloader::recalcDownloaded() + { + Uint64 total = tor.getFileLength(); + downloaded = (total - cman.bytesLeft()); + } +} + +#include "downloader.moc" |