torrentclient.cpp Example File
network/torrent/torrentclient.cpp
#include "connectionmanager.h"
#include "filemanager.h"
#include "metainfo.h"
#include "torrentclient.h"
#include "torrentserver.h"
#include "trackerclient.h"
#include "peerwireclient.h"
#include "ratecontroller.h"
#include <QtCore>
#include <QNetworkInterface>
extern "C" {
#include "3rdparty/sha1.h"
}
These constants could also be configurable by the user.
static const int ServerMinPort = 6881;
static const int ServerMaxPort = 7000;
static const int BlockSize = 16384;
static const int MaxBlocksInProgress = 5;
static const int MaxBlocksInMultiMode = 2;
static const int MaxConnectionPerPeer = 1;
static const int RateControlWindowLength = 10;
static const int RateControlTimerDelay = 1000;
static const int MinimumTimeBeforeRevisit = 30;
static const int MaxUploads = 4;
static const int UploadScheduleInterval = 10000;
static const int EndGamePieces = 5;
class TorrentPiece {
public:
int index;
int length;
QBitArray completedBlocks;
QBitArray requestedBlocks;
bool inProgress;
};
class TorrentClientPrivate
{
public:
TorrentClientPrivate(TorrentClient *qq);
State / error
void setError(TorrentClient::Error error);
void setState(TorrentClient::State state);
TorrentClient::Error error;
TorrentClient::State state;
QString errorString;
QString stateString;
Where to save data
QString destinationFolder;
MetaInfo metaInfo;
Announce tracker and file manager
QByteArray peerId;
QByteArray infoHash;
TrackerClient trackerClient;
FileManager fileManager;
Connections
QList<PeerWireClient *> connections;
QList<TorrentPeer *> peers;
bool schedulerCalled;
void callScheduler();
bool connectingToClients;
void callPeerConnector();
int uploadScheduleTimer;
Pieces
QMap<int, PeerWireClient *> readIds;
QMultiMap<PeerWireClient *, TorrentPiece *> payloads;
QMap<int, TorrentPiece *> pendingPieces;
QBitArray completedPieces;
QBitArray incompletePieces;
int pieceCount;
Progress
int lastProgressValue;
qint64 downloadedBytes;
qint64 uploadedBytes;
int downloadRate[RateControlWindowLength];
int uploadRate[RateControlWindowLength];
int transferRateTimer;
TorrentClient *q;
};
TorrentClientPrivate::TorrentClientPrivate(TorrentClient *qq)
: trackerClient(qq), q(qq)
{
error = TorrentClient::UnknownError;
state = TorrentClient::Idle;
errorString = QT_TRANSLATE_NOOP(TorrentClient, "Unknown error");
stateString = QT_TRANSLATE_NOOP(TorrentClient, "Idle");
schedulerCalled = false;
connectingToClients = false;
uploadScheduleTimer = 0;
lastProgressValue = -1;
pieceCount = 0;
downloadedBytes = 0;
uploadedBytes = 0;
memset(downloadRate, 0, sizeof(downloadRate));
memset(uploadRate, 0, sizeof(uploadRate));
transferRateTimer = 0;
}
void TorrentClientPrivate::setError(TorrentClient::Error errorCode)
{
this->error = errorCode;
switch (error) {
case TorrentClient::UnknownError:
errorString = QT_TRANSLATE_NOOP(TorrentClient, "Unknown error");
break;
case TorrentClient::TorrentParseError:
errorString = QT_TRANSLATE_NOOP(TorrentClient, "Invalid torrent data");
break;
case TorrentClient::InvalidTrackerError:
errorString = QT_TRANSLATE_NOOP(TorrentClient, "Unable to connect to tracker");
break;
case TorrentClient::FileError:
errorString = QT_TRANSLATE_NOOP(TorrentClient, "File error");
break;
case TorrentClient::ServerError:
errorString = QT_TRANSLATE_NOOP(TorrentClient, "Unable to initialize server");
break;
}
emit q->error(errorCode);
}
void TorrentClientPrivate::setState(TorrentClient::State state)
{
this->state = state;
switch (state) {
case TorrentClient::Idle:
stateString = QT_TRANSLATE_NOOP(TorrentClient, "Idle");
break;
case TorrentClient::Paused:
stateString = QT_TRANSLATE_NOOP(TorrentClient, "Paused");
break;
case TorrentClient::Stopping:
stateString = QT_TRANSLATE_NOOP(TorrentClient, "Stopping");
break;
case TorrentClient::Preparing:
stateString = QT_TRANSLATE_NOOP(TorrentClient, "Preparing");
break;
case TorrentClient::Searching:
stateString = QT_TRANSLATE_NOOP(TorrentClient, "Searching");
break;
case TorrentClient::Connecting:
stateString = QT_TRANSLATE_NOOP(TorrentClient, "Connecting");
break;
case TorrentClient::WarmingUp:
stateString = QT_TRANSLATE_NOOP(TorrentClient, "Warming up");
break;
case TorrentClient::Downloading:
stateString = QT_TRANSLATE_NOOP(TorrentClient, "Downloading");
break;
case TorrentClient::Endgame:
stateString = QT_TRANSLATE_NOOP(TorrentClient, "Finishing");
break;
case TorrentClient::Seeding:
stateString = QT_TRANSLATE_NOOP(TorrentClient, "Seeding");
break;
}
emit q->stateChanged(state);
}
void TorrentClientPrivate::callScheduler()
{
if (!schedulerCalled) {
schedulerCalled = true;
QMetaObject::invokeMethod(q, "scheduleDownloads", Qt::QueuedConnection);
}
}
void TorrentClientPrivate::callPeerConnector()
{
if (!connectingToClients) {
connectingToClients = true;
QTimer::singleShot(10000, q, SLOT(connectToPeers()));
}
}
TorrentClient::TorrentClient(QObject *parent)
: QObject(parent), d(new TorrentClientPrivate(this))
{
Connect the file manager
connect(&d->fileManager, SIGNAL(dataRead(int, int, int, const QByteArray &)),
this, SLOT(sendToPeer(int, int, int, const QByteArray &)));
connect(&d->fileManager, SIGNAL(verificationProgress(int)),
this, SLOT(updateProgress(int)));
connect(&d->fileManager, SIGNAL(verificationDone()),
this, SLOT(fullVerificationDone()));
connect(&d->fileManager, SIGNAL(pieceVerified(int, bool)),
this, SLOT(pieceVerified(int, bool)));
connect(&d->fileManager, SIGNAL(error()),
this, SLOT(handleFileError()));
Connect the tracker client
connect(&d->trackerClient, SIGNAL(peerListUpdated(const QList<TorrentPeer> &)),
this, SLOT(addToPeerList(const QList<TorrentPeer> &)));
connect(&d->trackerClient, SIGNAL(stopped()),
this, SIGNAL(stopped()));
}
TorrentClient::~TorrentClient()
{
qDeleteAll(d->peers);
qDeleteAll(d->pendingPieces);
delete d;
}
bool TorrentClient::setTorrent(const QString &fileName)
{
QFile file(fileName);
if (!file.open(QIODevice::ReadOnly) || !setTorrent(file.readAll())) {
d->setError(TorrentParseError);
return false;
}
return true;
}
bool TorrentClient::setTorrent(const QByteArray &torrentData)
{
if (!d->metaInfo.parse(torrentData)) {
d->setError(TorrentParseError);
return false;
}
Calculate SHA1 hash of the "info" section in the torrent
QByteArray infoValue = d->metaInfo.infoValue();
SHA1Context sha;
SHA1Reset(&sha);
SHA1Input(&sha, (const unsigned char *)infoValue.constData(), infoValue.size());
SHA1Result(&sha);
unsigned char *digest = (unsigned char *)sha.Message_Digest;
d->infoHash.resize(20);
for (int i = 0; i < 5; ++i) {
#if Q_BYTE_ORDER == Q_BIG_ENDIAN
d->infoHash[i * 4 + 3] = digest[i * 4 + 3];
d->infoHash[i * 4 + 2] = digest[i * 4 + 2];
d->infoHash[i * 4 + 1] = digest[i * 4 + 1];
d->infoHash[i * 4 + 0] = digest[i * 4 + 0];
#else
d->infoHash[i * 4 + 0] = digest[i * 4 + 3];
d->infoHash[i * 4 + 1] = digest[i * 4 + 2];
d->infoHash[i * 4 + 2] = digest[i * 4 + 1];
d->infoHash[i * 4 + 3] = digest[i * 4 + 0];
#endif
}
return true;
}
MetaInfo TorrentClient::metaInfo() const
{
return d->metaInfo;
}
void TorrentClient::setDestinationFolder(const QString &directory)
{
d->destinationFolder = directory;
}
QString TorrentClient::destinationFolder() const
{
return d->destinationFolder;
}
void TorrentClient::setDumpedState(const QByteArray &dumpedState)
{
Recover partially completed pieces
QDataStream stream(dumpedState);
quint16 version = 0;
stream >> version;
if (version != 2)
return;
stream >> d->completedPieces;
while (!stream.atEnd()) {
int index;
int length;
QBitArray completed;
stream >> index >> length >> completed;
if (stream.status() != QDataStream::Ok) {
d->completedPieces.clear();
break;
}
TorrentPiece *piece = new TorrentPiece;
piece->index = index;
piece->length = length;
piece->completedBlocks = completed;
piece->requestedBlocks.resize(completed.size());
piece->inProgress = false;
d->pendingPieces[index] = piece;
}
}
QByteArray TorrentClient::dumpedState() const
{
QByteArray partials;
QDataStream stream(&partials, QIODevice::WriteOnly);
stream << quint16(2);
stream << d->completedPieces;
Save the state of all partially downloaded pieces into a format
suitable for storing in settings.
QMap<int, TorrentPiece *>::ConstIterator it = d->pendingPieces.constBegin();
while (it != d->pendingPieces.constEnd()) {
TorrentPiece *piece = it.value();
if (blocksLeftForPiece(piece) > 0 && blocksLeftForPiece(piece) < piece->completedBlocks.size()) {
stream << piece->index;
stream << piece->length;
stream << piece->completedBlocks;
}
++it;
}
return partials;
}
qint64 TorrentClient::progress() const
{
return d->lastProgressValue;
}
void TorrentClient::setDownloadedBytes(qint64 bytes)
{
d->downloadedBytes = bytes;
}
qint64 TorrentClient::downloadedBytes() const
{
return d->downloadedBytes;
}
void TorrentClient::setUploadedBytes(qint64 bytes)
{
d->uploadedBytes = bytes;
}
qint64 TorrentClient::uploadedBytes() const
{
return d->uploadedBytes;
}
int TorrentClient::connectedPeerCount() const
{
int tmp = 0;
foreach (PeerWireClient *client, d->connections) {
if (client->state() == QAbstractSocket::ConnectedState)
++tmp;
}
return tmp;
}
int TorrentClient::seedCount() const
{
int tmp = 0;
foreach (PeerWireClient *client, d->connections) {
if (client->availablePieces().count(true) == d->pieceCount)
++tmp;
}
return tmp;
}
TorrentClient::State TorrentClient::state() const
{
return d->state;
}
QString TorrentClient::stateString() const
{
return d->stateString;
}
TorrentClient::Error TorrentClient::error() const
{
return d->error;
}
QString TorrentClient::errorString() const
{
return d->errorString;
}
QByteArray TorrentClient::peerId() const
{
return d->peerId;
}
QByteArray TorrentClient::infoHash() const
{
return d->infoHash;
}
void TorrentClient::start()
{
if (d->state != Idle)
return;
TorrentServer::instance()->addClient(this);
Initialize the file manager
d->setState(Preparing);
d->fileManager.setMetaInfo(d->metaInfo);
d->fileManager.setDestinationFolder(d->destinationFolder);
d->fileManager.setCompletedPieces(d->completedPieces);
d->fileManager.start(QThread::LowestPriority);
d->fileManager.startDataVerification();
}
void TorrentClient::stop()
{
if (d->state == Stopping)
return;
TorrentServer::instance()->removeClient(this);
Update the state
State oldState = d->state;
d->setState(Stopping);
Stop the timer
if (d->transferRateTimer) {
killTimer(d->transferRateTimer);
d->transferRateTimer = 0;
}
Abort all existing connections
foreach (PeerWireClient *client, d->connections) {
RateController::instance()->removeSocket(client);
ConnectionManager::instance()->removeConnection(client);
client->abort();
}
d->connections.clear();
Perhaps stop the tracker
if (oldState > Preparing) {
d->trackerClient.stop();
} else {
d->setState(Idle);
emit stopped();
}
}
void TorrentClient::setPaused(bool paused)
{
if (paused) {
Abort all connections, and set the max number of
connections to 0. Keep the list of peers, so we can quickly
resume later.
d->setState(Paused);
foreach (PeerWireClient *client, d->connections)
client->abort();
d->connections.clear();
TorrentServer::instance()->removeClient(this);
} else {
Restore the max number of connections, and start the peer
connector. We should also quickly start receiving incoming
connections.
d->setState(d->completedPieces.count(true) == d->fileManager.pieceCount()
? Seeding : Searching);
connectToPeers();
TorrentServer::instance()->addClient(this);
}
}
void TorrentClient::timerEvent(QTimerEvent *event)
{
if (event->timerId() == d->uploadScheduleTimer) {
Update the state of who's choked and who's not
scheduleUploads();
return;
}
if (event->timerId() != d->transferRateTimer) {
QObject::timerEvent(event);
return;
}
Calculate average upload/download rate
qint64 uploadBytesPerSecond = 0;
qint64 downloadBytesPerSecond = 0;
for (int i = 0; i < RateControlWindowLength; ++i) {
uploadBytesPerSecond += d->uploadRate[i];
downloadBytesPerSecond += d->downloadRate[i];
}
uploadBytesPerSecond /= qint64(RateControlWindowLength);
downloadBytesPerSecond /= qint64(RateControlWindowLength);
for (int i = RateControlWindowLength - 2; i >= 0; --i) {
d->uploadRate[i + 1] = d->uploadRate[i];
d->downloadRate[i + 1] = d->downloadRate[i];
}
d->uploadRate[0] = 0;
d->downloadRate[0] = 0;
emit uploadRateUpdated(int(uploadBytesPerSecond));
emit downloadRateUpdated(int(downloadBytesPerSecond));
Stop the timer if there is no activity.
if (downloadBytesPerSecond == 0 && uploadBytesPerSecond == 0) {
killTimer(d->transferRateTimer);
d->transferRateTimer = 0;
}
}
void TorrentClient::sendToPeer(int readId, int pieceIndex, int begin, const QByteArray &data)
{
Send the requested block to the peer if the client connection
still exists; otherwise do nothing. This slot is called by the
file manager after it has read a block of data.
PeerWireClient *client = d->readIds.value(readId);
if (client) {
if ((client->peerWireState() & PeerWireClient::ChokingPeer) == 0)
client->sendBlock(pieceIndex, begin, data);
}
d->readIds.remove(readId);
}
void TorrentClient::fullVerificationDone()
{
Update our list of completed and incomplete pieces.
d->completedPieces = d->fileManager.completedPieces();
d->incompletePieces.resize(d->completedPieces.size());
d->pieceCount = d->completedPieces.size();
for (int i = 0; i < d->fileManager.pieceCount(); ++i) {
if (!d->completedPieces.testBit(i))
d->incompletePieces.setBit(i);
}
updateProgress();
If the checksums show that what the dumped state thought was
partial was in fact complete, then we trust the checksums.
QMap<int, TorrentPiece *>::Iterator it = d->pendingPieces.begin();
while (it != d->pendingPieces.end()) {
if (d->completedPieces.testBit(it.key()))
it = d->pendingPieces.erase(it);
else
++it;
}
d->uploadScheduleTimer = startTimer(UploadScheduleInterval);
Start the server
TorrentServer *server = TorrentServer::instance();
if (!server->isListening()) {
Set up the peer wire server
for (int i = ServerMinPort; i <= ServerMaxPort; ++i) {
if (server->listen(QHostAddress::Any, i))
break;
}
if (!server->isListening()) {
d->setError(ServerError);
return;
}
}
d->setState(d->completedPieces.count(true) == d->pieceCount ? Seeding : Searching);
Start the tracker client
d->trackerClient.setUploadCount(d->uploadedBytes);
d->trackerClient.setDownloadCount(d->downloadedBytes);
d->trackerClient.start(d->metaInfo);
}
void TorrentClient::pieceVerified(int pieceIndex, bool ok)
{
TorrentPiece *piece = d->pendingPieces.value(pieceIndex);
Remove this piece from all payloads
QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.begin();
while (it != d->payloads.end()) {
if (it.value()->index == pieceIndex)
it = d->payloads.erase(it);
else
++it;
}
if (!ok) {
If a piece did not pass the SHA1 check, we'll simply clear
its state, and the scheduler will re-request it
piece->inProgress = false;
piece->completedBlocks.fill(false);
piece->requestedBlocks.fill(false);
d->callScheduler();
return;
}
Update the peer list so we know who's still interesting.
foreach (TorrentPeer *peer, d->peers) {
if (!peer->interesting)
continue;
bool interesting = false;
for (int i = 0; i < d->pieceCount; ++i) {
if (peer->pieces.testBit(i) && d->incompletePieces.testBit(i)) {
interesting = true;
break;
}
}
peer->interesting = interesting;
}
Delete the piece and update our structures.
delete piece;
d->pendingPieces.remove(pieceIndex);
d->completedPieces.setBit(pieceIndex);
d->incompletePieces.clearBit(pieceIndex);
Notify connected peers.
foreach (PeerWireClient *client, d->connections) {
if (client->state() == QAbstractSocket::ConnectedState
&& !client->availablePieces().testBit(pieceIndex)) {
client->sendPieceNotification(pieceIndex);
}
}
Notify the tracker if we've entered Seeding status; otherwise
call the scheduler.
int completed = d->completedPieces.count(true);
if (completed == d->pieceCount) {
if (d->state != Seeding) {
d->setState(Seeding);
d->trackerClient.start(d->metaInfo);
}
} else {
if (completed == 1)
d->setState(Downloading);
else if (d->incompletePieces.count(true) < 5 && d->pendingPieces.size() > d->incompletePieces.count(true))
d->setState(Endgame);
d->callScheduler();
}
updateProgress();
}
void TorrentClient::handleFileError()
{
if (d->state == Paused)
return;
setPaused(true);
emit error(FileError);
}
void TorrentClient::connectToPeers()
{
d->connectingToClients = false;
if (d->state == Stopping || d->state == Idle || d->state == Paused)
return;
if (d->state == Searching)
d->setState(Connecting);
Find the list of peers we are not currently connected to, where
the more interesting peers are listed more than once.
QList<TorrentPeer *> weighedPeers = weighedFreePeers();
Start as many connections as we can
while (!weighedPeers.isEmpty() && ConnectionManager::instance()->canAddConnection()
&& (qrand() % (ConnectionManager::instance()->maxConnections() / 2))) {
PeerWireClient *client = new PeerWireClient(ConnectionManager::instance()->clientId(), this);
RateController::instance()->addSocket(client);
ConnectionManager::instance()->addConnection(client);
initializeConnection(client);
d->connections << client;
Pick a random peer from the list of weighed peers.
TorrentPeer *peer = weighedPeers.takeAt(qrand() % weighedPeers.size());
weighedPeers.removeAll(peer);
peer->connectStart = QDateTime::currentDateTime().toTime_t();
peer->lastVisited = peer->connectStart;
Connect to the peer.
client->setPeer(peer);
client->connectToHost(peer->address, peer->port);
}
}
QList<TorrentPeer *> TorrentClient::weighedFreePeers() const
{
QList<TorrentPeer *> weighedPeers;
Generate a list of peers that we want to connect to.
uint now = QDateTime::currentDateTime().toTime_t();
QList<TorrentPeer *> freePeers;
QMap<QString, int> connectionsPerPeer;
foreach (TorrentPeer *peer, d->peers) {
bool busy = false;
foreach (PeerWireClient *client, d->connections) {
if (client->state() == PeerWireClient::ConnectedState
&& client->peerAddress() == peer->address
&& client->peerPort() == peer->port) {
if (++connectionsPerPeer[peer->address.toString()] >= MaxConnectionPerPeer) {
busy = true;
break;
}
}
}
if (!busy && (now - peer->lastVisited) > uint(MinimumTimeBeforeRevisit))
freePeers << peer;
}
Nothing to connect to
if (freePeers.isEmpty())
return weighedPeers;
Assign points based on connection speed and pieces available.
QList<QPair<int, TorrentPeer *> > points;
foreach (TorrentPeer *peer, freePeers) {
int tmp = 0;
if (peer->interesting) {
tmp += peer->numCompletedPieces;
if (d->state == Seeding)
tmp = d->pieceCount - tmp;
if (!peer->connectStart) An unknown peer is as interesting as a seed
tmp += d->pieceCount;
1/5 of the total score for each second below 5 it takes to
connect.
if (peer->connectTime < 5)
tmp += (d->pieceCount / 10) * (5 - peer->connectTime);
}
points << QPair<int, TorrentPeer *>(tmp, peer);
}
qSort(points);
Minimize the list so the point difference is never more than 1.
typedef QPair<int,TorrentPeer*> PointPair;
QMultiMap<int, TorrentPeer *> pointMap;
int lowestScore = 0;
int lastIndex = 0;
foreach (PointPair point, points) {
if (point.first > lowestScore) {
lowestScore = point.first;
++lastIndex;
}
pointMap.insert(lastIndex, point.second);
}
Now make up a list of peers where the ones with more points are
listed many times.
QMultiMap<int, TorrentPeer *>::ConstIterator it = pointMap.constBegin();
while (it != pointMap.constEnd()) {
for (int i = 0; i < it.key() + 1; ++i)
weighedPeers << it.value();
++it;
}
return weighedPeers;
}
void TorrentClient::setupIncomingConnection(PeerWireClient *client)
{
Connect signals
initializeConnection(client);
Initialize this client
RateController::instance()->addSocket(client);
d->connections << client;
client->initialize(d->infoHash, d->pieceCount);
client->sendPieceList(d->completedPieces);
emit peerInfoUpdated();
if (d->state == Searching || d->state == Connecting) {
int completed = d->completedPieces.count(true);
if (completed == 0)
d->setState(WarmingUp);
else if (d->incompletePieces.count(true) < 5 && d->pendingPieces.size() > d->incompletePieces.count(true))
d->setState(Endgame);
}
if (d->connections.isEmpty())
scheduleUploads();
}
void TorrentClient::setupOutgoingConnection()
{
PeerWireClient *client = qobject_cast<PeerWireClient *>(sender());
Update connection statistics.
foreach (TorrentPeer *peer, d->peers) {
if (peer->port == client->peerPort() && peer->address == client->peerAddress()) {
peer->connectTime = peer->lastVisited - peer->connectStart;
break;
}
}
Send handshake and piece list
client->initialize(d->infoHash, d->pieceCount);
client->sendPieceList(d->completedPieces);
emit peerInfoUpdated();
if (d->state == Searching || d->state == Connecting) {
int completed = d->completedPieces.count(true);
if (completed == 0)
d->setState(WarmingUp);
else if (d->incompletePieces.count(true) < 5 && d->pendingPieces.size() > d->incompletePieces.count(true))
d->setState(Endgame);
}
}
void TorrentClient::initializeConnection(PeerWireClient *client)
{
connect(client, SIGNAL(connected()),
this, SLOT(setupOutgoingConnection()));
connect(client, SIGNAL(disconnected()),
this, SLOT(removeClient()));
connect(client, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(removeClient()));
connect(client, SIGNAL(piecesAvailable(const QBitArray &)),
this, SLOT(peerPiecesAvailable(const QBitArray &)));
connect(client, SIGNAL(blockRequested(int, int, int)),
this, SLOT(peerRequestsBlock(int, int, int)));
connect(client, SIGNAL(blockReceived(int, int, const QByteArray &)),
this, SLOT(blockReceived(int, int, const QByteArray &)));
connect(client, SIGNAL(choked()),
this, SLOT(peerChoked()));
connect(client, SIGNAL(unchoked()),
this, SLOT(peerUnchoked()));
connect(client, SIGNAL(bytesWritten(qint64)),
this, SLOT(peerWireBytesWritten(qint64)));
connect(client, SIGNAL(bytesReceived(qint64)),
this, SLOT(peerWireBytesReceived(qint64)));
}
void TorrentClient::removeClient()
{
PeerWireClient *client = static_cast<PeerWireClient *>(sender());
Remove the host from our list of known peers if the connection
failed.
if (client->peer() && client->error() == QAbstractSocket::ConnectionRefusedError)
d->peers.removeAll(client->peer());
Remove the client from RateController and all structures.
RateController::instance()->removeSocket(client);
d->connections.removeAll(client);
QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.find(client);
while (it != d->payloads.end() && it.key() == client) {
TorrentPiece *piece = it.value();
piece->inProgress = false;
piece->requestedBlocks.fill(false);
it = d->payloads.erase(it);
}
Remove pending read requests.
QMapIterator<int, PeerWireClient *> it2(d->readIds);
while (it2.findNext(client))
d->readIds.remove(it2.key());
Delete the client later.
disconnect(client, SIGNAL(disconnected()), this, SLOT(removeClient()));
client->deleteLater();
ConnectionManager::instance()->removeConnection(client);
emit peerInfoUpdated();
d->callPeerConnector();
}
void TorrentClient::peerPiecesAvailable(const QBitArray &pieces)
{
PeerWireClient *client = qobject_cast<PeerWireClient *>(sender());
Find the peer in our list of announced peers. If it's there,
then we can use the piece list into to gather statistics that
help us decide what peers to connect to.
TorrentPeer *peer = 0;
QList<TorrentPeer *>::Iterator it = d->peers.begin();
while (it != d->peers.end()) {
if ((*it)->address == client->peerAddress() && (*it)->port == client->peerPort()) {
peer = *it;
break;
}
++it;
}
If the peer is a seed, and we are in seeding mode, then the
peer is uninteresting.
if (pieces.count(true) == d->pieceCount) {
if (peer)
peer->seed = true;
emit peerInfoUpdated();
if (d->state == Seeding) {
client->abort();
return;
} else {
if (peer)
peer->interesting = true;
if ((client->peerWireState() & PeerWireClient::InterestedInPeer) == 0)
client->sendInterested();
d->callScheduler();
return;
}
}
Update our list of available pieces.
if (peer) {
peer->pieces = pieces;
peer->numCompletedPieces = pieces.count(true);
}
Check for interesting pieces, and tell the peer whether we are
interested or not.
bool interested = false;
int piecesSize = pieces.size();
for (int pieceIndex = 0; pieceIndex < piecesSize; ++pieceIndex) {
if (!pieces.testBit(pieceIndex))
continue;
if (!d->completedPieces.testBit(pieceIndex)) {
interested = true;
if ((client->peerWireState() & PeerWireClient::InterestedInPeer) == 0) {
if (peer)
peer->interesting = true;
client->sendInterested();
}
QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.find(client);
int inProgress = 0;
while (it != d->payloads.end() && it.key() == client) {
if (it.value()->inProgress)
inProgress += it.value()->requestedBlocks.count(true);
++it;
}
if (!inProgress)
d->callScheduler();
break;
}
}
if (!interested && (client->peerWireState() & PeerWireClient::InterestedInPeer)) {
if (peer)
peer->interesting = false;
client->sendNotInterested();
}
}
void TorrentClient::peerRequestsBlock(int pieceIndex, int begin, int length)
{
PeerWireClient *client = qobject_cast<PeerWireClient *>(sender());
Silently ignore requests from choked peers
if (client->peerWireState() & PeerWireClient::ChokingPeer)
return;
Silently ignore requests for pieces we don't have.
if (!d->completedPieces.testBit(pieceIndex))
return;
Request the block from the file manager
d->readIds.insert(d->fileManager.read(pieceIndex, begin, length),
qobject_cast<PeerWireClient *>(sender()));
}
void TorrentClient::blockReceived(int pieceIndex, int begin, const QByteArray &data)
{
PeerWireClient *client = qobject_cast<PeerWireClient *>(sender());
if (data.size() == 0) {
client->abort();
return;
}
Ignore it if we already have this block.
int blockBit = begin / BlockSize;
TorrentPiece *piece = d->pendingPieces.value(pieceIndex);
if (!piece || piece->completedBlocks.testBit(blockBit)) {
Discard blocks that we already have, and fill up the pipeline.
requestMore(client);
return;
}
If we are in warmup or endgame mode, cancel all duplicate
requests for this block.
if (d->state == WarmingUp || d->state == Endgame) {
QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.begin();
while (it != d->payloads.end()) {
PeerWireClient *otherClient = it.key();
if (otherClient != client && it.value()->index == pieceIndex) {
if (otherClient->incomingBlocks().contains(TorrentBlock(pieceIndex, begin, data.size())))
it.key()->cancelRequest(pieceIndex, begin, data.size());
}
++it;
}
}
if (d->state != Downloading && d->state != Endgame && d->completedPieces.count(true) > 0)
d->setState(Downloading);
Store this block
d->fileManager.write(pieceIndex, begin, data);
piece->completedBlocks.setBit(blockBit);
piece->requestedBlocks.clearBit(blockBit);
if (blocksLeftForPiece(piece) == 0) {
Ask the file manager to verify the newly downloaded piece
d->fileManager.verifyPiece(piece->index);
Remove this piece from all payloads
QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.begin();
while (it != d->payloads.end()) {
if (!it.value() || it.value()->index == piece->index)
it = d->payloads.erase(it);
else
++it;
}
}
Fill up the pipeline.
requestMore(client);
}
void TorrentClient::peerWireBytesWritten(qint64 size)
{
if (!d->transferRateTimer)
d->transferRateTimer = startTimer(RateControlTimerDelay);
d->uploadRate[0] += size;
d->uploadedBytes += size;
emit dataSent(size);
}
void TorrentClient::peerWireBytesReceived(qint64 size)
{
if (!d->transferRateTimer)
d->transferRateTimer = startTimer(RateControlTimerDelay);
d->downloadRate[0] += size;
d->downloadedBytes += size;
emit dataSent(size);
}
int TorrentClient::blocksLeftForPiece(const TorrentPiece *piece) const
{
int blocksLeft = 0;
int completedBlocksSize = piece->completedBlocks.size();
for (int i = 0; i < completedBlocksSize; ++i) {
if (!piece->completedBlocks.testBit(i))
++blocksLeft;
}
return blocksLeft;
}
void TorrentClient::scheduleUploads()
{
Generate a list of clients sorted by their transfer
speeds. When leeching, we sort by download speed, and when
seeding, we sort by upload speed. Seeds are left out; there's
no use in unchoking them.
QList<PeerWireClient *> allClients = d->connections;
QMultiMap<int, PeerWireClient *> transferSpeeds;
foreach (PeerWireClient *client, allClients) {
if (client->state() == QAbstractSocket::ConnectedState
&& client->availablePieces().count(true) != d->pieceCount) {
if (d->state == Seeding) {
transferSpeeds.insert(client->uploadSpeed(), client);
} else {
transferSpeeds.insert(client->downloadSpeed(), client);
}
}
}
Unchoke the top 'MaxUploads' downloaders (peers that we are
uploading to) and choke all others.
int maxUploaders = MaxUploads;
QMapIterator<int, PeerWireClient *> it(transferSpeeds);
it.toBack();
while (it.hasPrevious()) {
PeerWireClient *client = it.previous().value();
bool interested = (client->peerWireState() & PeerWireClient::PeerIsInterested);
if (maxUploaders) {
allClients.removeAll(client);
if (client->peerWireState() & PeerWireClient::ChokingPeer)
client->unchokePeer();
--maxUploaders;
continue;
}
if ((client->peerWireState() & PeerWireClient::ChokingPeer) == 0) {
if ((qrand() % 10) == 0)
client->abort();
else
client->chokePeer();
allClients.removeAll(client);
}
if (!interested)
allClients.removeAll(client);
}
Only interested peers are left in allClients. Unchoke one
random peer to allow it to compete for a position among the
downloaders. (This is known as an "optimistic unchoke".)
if (!allClients.isEmpty()) {
PeerWireClient *client = allClients[qrand() % allClients.size()];
if (client->peerWireState() & PeerWireClient::ChokingPeer)
client->unchokePeer();
}
}
void TorrentClient::scheduleDownloads()
{
d->schedulerCalled = false;
if (d->state == Stopping || d->state == Paused || d->state == Idle)
return;
Check what each client is doing, and assign payloads to those
who are either idle or done.
foreach (PeerWireClient *client, d->connections)
schedulePieceForClient(client);
}
void TorrentClient::schedulePieceForClient(PeerWireClient *client)
{
Only schedule connected clients.
if (client->state() != QTcpSocket::ConnectedState)
return;
The peer has choked us; try again later.
if (client->peerWireState() & PeerWireClient::ChokedByPeer)
return;
Make a list of all the client's pending pieces, and count how
many blocks have been requested.
QList<int> currentPieces;
bool somePiecesAreNotInProgress = false;
TorrentPiece *lastPendingPiece = 0;
QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.find(client);
while (it != d->payloads.end() && it.key() == client) {
lastPendingPiece = it.value();
if (lastPendingPiece->inProgress) {
currentPieces << lastPendingPiece->index;
} else {
somePiecesAreNotInProgress = true;
}
++it;
}
Skip clients that already have too many blocks in progress.
if (client->incomingBlocks().size() >= ((d->state == Endgame || d->state == WarmingUp)
? MaxBlocksInMultiMode : MaxBlocksInProgress))
return;
If all pieces are in progress, but we haven't filled up our
block requesting quota, then we need to schedule another piece.
if (!somePiecesAreNotInProgress || client->incomingBlocks().size() > 0)
lastPendingPiece = 0;
TorrentPiece *piece = lastPendingPiece;
In warmup state, all clients request blocks from the same pieces.
if (d->state == WarmingUp && d->pendingPieces.size() >= 4) {
piece = d->payloads.value(client);
if (!piece) {
QList<TorrentPiece *> values = d->pendingPieces.values();
piece = values.value(qrand() % values.size());
piece->inProgress = true;
d->payloads.insert(client, piece);
}
if (piece->completedBlocks.count(false) == client->incomingBlocks().size())
return;
}
If no pieces are currently in progress, schedule a new one.
if (!piece) {
Build up a list of what pieces that we have not completed
are available to this client.
QBitArray incompletePiecesAvailableToClient = d->incompletePieces;
Remove all pieces that are marked as being in progress
already (i.e., pieces that this or other clients are
already waiting for). A special rule applies to warmup and
endgame mode; there, we allow several clients to request
the same piece. In endgame mode, this only applies to
clients that are currently uploading (more than 1.0KB/s).
if ((d->state == Endgame && client->uploadSpeed() < 1024) || d->state != WarmingUp) {
QMap<int, TorrentPiece *>::ConstIterator it = d->pendingPieces.constBegin();
while (it != d->pendingPieces.constEnd()) {
if (it.value()->inProgress)
incompletePiecesAvailableToClient.clearBit(it.key());
++it;
}
}
Remove all pieces that the client cannot download.
incompletePiecesAvailableToClient &= client->availablePieces();
Remove all pieces that this client has already requested.
foreach (int i, currentPieces)
incompletePiecesAvailableToClient.clearBit(i);
Only continue if more pieces can be scheduled. If no pieces
are available and no blocks are in progress, just leave
the connection idle; it might become interesting later.
if (incompletePiecesAvailableToClient.count(true) == 0)
return;
Check if any of the partially completed pieces can be
recovered, and if so, pick a random one of them.
QList<TorrentPiece *> partialPieces;
QMap<int, TorrentPiece *>::ConstIterator it = d->pendingPieces.constBegin();
while (it != d->pendingPieces.constEnd()) {
TorrentPiece *tmp = it.value();
if (incompletePiecesAvailableToClient.testBit(it.key())) {
if (!tmp->inProgress || d->state == WarmingUp || d->state == Endgame) {
partialPieces << tmp;
break;
}
}
++it;
}
if (!partialPieces.isEmpty())
piece = partialPieces.value(qrand() % partialPieces.size());
if (!piece) {
Pick a random piece 3 out of 4 times; otherwise, pick either
one of the most common or the least common pieces available,
depending on the state we're in.
int pieceIndex = 0;
if (d->state == WarmingUp || (qrand() & 4) == 0) {
int *occurrances = new int[d->pieceCount];
memset(occurrances, 0, d->pieceCount * sizeof(int));
Count how many of each piece are available.
foreach (PeerWireClient *peer, d->connections) {
QBitArray peerPieces = peer->availablePieces();
int peerPiecesSize = peerPieces.size();
for (int i = 0; i < peerPiecesSize; ++i) {
if (peerPieces.testBit(i))
++occurrances[i];
}
}
Find the rarest or most common pieces.
int numOccurrances = d->state == WarmingUp ? 0 : 99999;
QList<int> piecesReadyForDownload;
for (int i = 0; i < d->pieceCount; ++i) {
if (d->state == WarmingUp) {
Add common pieces
if (occurrances[i] >= numOccurrances
&& incompletePiecesAvailableToClient.testBit(i)) {
if (occurrances[i] > numOccurrances)
piecesReadyForDownload.clear();
piecesReadyForDownload.append(i);
numOccurrances = occurrances[i];
}
} else {
Add rare pieces
if (occurrances[i] <= numOccurrances
&& incompletePiecesAvailableToClient.testBit(i)) {
if (occurrances[i] < numOccurrances)
piecesReadyForDownload.clear();
piecesReadyForDownload.append(i);
numOccurrances = occurrances[i];
}
}
}
Select one piece randomly
pieceIndex = piecesReadyForDownload.at(qrand() % piecesReadyForDownload.size());
delete [] occurrances;
} else {
Make up a list of available piece indices, and pick
a random one.
QList<int> values;
int incompletePiecesAvailableToClientSize = incompletePiecesAvailableToClient.size();
for (int i = 0; i < incompletePiecesAvailableToClientSize; ++i) {
if (incompletePiecesAvailableToClient.testBit(i))
values << i;
}
pieceIndex = values.at(qrand() % values.size());
}
Create a new TorrentPiece and fill in all initial
properties.
piece = new TorrentPiece;
piece->index = pieceIndex;
piece->length = d->fileManager.pieceLengthAt(pieceIndex);
int numBlocks = piece->length / BlockSize;
if (piece->length % BlockSize)
++numBlocks;
piece->completedBlocks.resize(numBlocks);
piece->requestedBlocks.resize(numBlocks);
d->pendingPieces.insert(pieceIndex, piece);
}
piece->inProgress = true;
d->payloads.insert(client, piece);
}
Request more blocks from all pending pieces.
requestMore(client);
}
void TorrentClient::requestMore(PeerWireClient *client)
{
Make a list of all pieces this client is currently waiting for,
and count the number of blocks in progress.
QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.find(client);
int numBlocksInProgress = client->incomingBlocks().size();
QList<TorrentPiece *> piecesInProgress;
while (it != d->payloads.end() && it.key() == client) {
TorrentPiece *piece = it.value();
if (piece->inProgress || (d->state == WarmingUp || d->state == Endgame))
piecesInProgress << piece;
++it;
}
If no pieces are in progress, call the scheduler.
if (piecesInProgress.isEmpty() && d->incompletePieces.count(true)) {
d->callScheduler();
return;
}
If too many pieces are in progress, there's nothing to do.
int maxInProgress = ((d->state == Endgame || d->state == WarmingUp)
? MaxBlocksInMultiMode : MaxBlocksInProgress);
if (numBlocksInProgress == maxInProgress)
return;
Starting with the first piece that we're waiting for, request
blocks until the quota is filled up.
foreach (TorrentPiece *piece, piecesInProgress) {
numBlocksInProgress += requestBlocks(client, piece, maxInProgress - numBlocksInProgress);
if (numBlocksInProgress == maxInProgress)
break;
}
If we still didn't fill up the quota, we need to schedule more
pieces.
if (numBlocksInProgress < maxInProgress && d->state != WarmingUp)
d->callScheduler();
}
int TorrentClient::requestBlocks(PeerWireClient *client, TorrentPiece *piece, int maxBlocks)
{
Generate the list of incomplete blocks for this piece.
QVector<int> bits;
int completedBlocksSize = piece->completedBlocks.size();
for (int i = 0; i < completedBlocksSize; ++i) {
if (!piece->completedBlocks.testBit(i) && !piece->requestedBlocks.testBit(i))
bits << i;
}
Nothing more to request.
if (bits.size() == 0) {
if (d->state != WarmingUp && d->state != Endgame)
return 0;
bits.clear();
for (int i = 0; i < completedBlocksSize; ++i) {
if (!piece->completedBlocks.testBit(i))
bits << i;
}
}
if (d->state == WarmingUp || d->state == Endgame) {
By randomizing the list of blocks to request, we
significantly speed up the warmup and endgame modes, where
the same blocks are requested from multiple peers. The
speedup comes from an increased chance of receiving
different blocks from the different peers.
for (int i = 0; i < bits.size(); ++i) {
int a = qrand() % bits.size();
int b = qrand() % bits.size();
int tmp = bits[a];
bits[a] = bits[b];
bits[b] = tmp;
}
}
Request no more blocks than we've been asked to.
int blocksToRequest = qMin(maxBlocks, bits.size());
Calculate the offset and size of each block, and send requests.
for (int i = 0; i < blocksToRequest; ++i) {
int blockSize = BlockSize;
if ((piece->length % BlockSize) && bits.at(i) == completedBlocksSize - 1)
blockSize = piece->length % BlockSize;
client->requestBlock(piece->index, bits.at(i) * BlockSize, blockSize);
piece->requestedBlocks.setBit(bits.at(i));
}
return blocksToRequest;
}
void TorrentClient::peerChoked()
{
PeerWireClient *client = qobject_cast<PeerWireClient *>(sender());
if (!client)
return;
When the peer chokes us, we immediately forget about all blocks
we've requested from it. We also remove the piece from out
payload, making it available to other clients.
QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.find(client);
while (it != d->payloads.end() && it.key() == client) {
it.value()->inProgress = false;
it.value()->requestedBlocks.fill(false);
it = d->payloads.erase(it);
}
}
void TorrentClient::peerUnchoked()
{
PeerWireClient *client = qobject_cast<PeerWireClient *>(sender());
if (!client)
return;
We got unchoked, which means we can request more blocks.
if (d->state != Seeding)
d->callScheduler();
}
void TorrentClient::addToPeerList(const QList<TorrentPeer> &peerList)
{
Add peers we don't already know of to our list of peers.
QList<QHostAddress> addresses = QNetworkInterface::allAddresses();
foreach (TorrentPeer peer, peerList) {
if (addresses.contains(peer.address)
&& peer.port == TorrentServer::instance()->serverPort()) {
Skip our own server.
continue;
}
bool known = false;
foreach (TorrentPeer *knownPeer, d->peers) {
if (knownPeer->port == peer.port
&& knownPeer->address == peer.address) {
known = true;
break;
}
}
if (!known) {
TorrentPeer *newPeer = new TorrentPeer;
*newPeer = peer;
newPeer->interesting = true;
newPeer->seed = false;
newPeer->lastVisited = 0;
newPeer->connectStart = 0;
newPeer->connectTime = 999999;
newPeer->pieces.resize(d->pieceCount);
newPeer->numCompletedPieces = 0;
d->peers << newPeer;
}
}
If we've got more peers than we can connect to, we remove some
of the peers that have no (or low) activity.
int maxPeers = ConnectionManager::instance()->maxConnections() * 3;
if (d->peers.size() > maxPeers) {
Find what peers are currently connected & active
QSet<TorrentPeer *> activePeers;
foreach (TorrentPeer *peer, d->peers) {
foreach (PeerWireClient *client, d->connections) {
if (client->peer() == peer && (client->downloadSpeed() + client->uploadSpeed()) > 1024)
activePeers << peer;
}
}
Remove inactive peers from the peer list until we're below
the max connections count.
QList<int> toRemove;
for (int i = 0; i < d->peers.size() && (d->peers.size() - toRemove.size()) > maxPeers; ++i) {
if (!activePeers.contains(d->peers.at(i)))
toRemove << i;
}
QListIterator<int> toRemoveIterator(toRemove);
toRemoveIterator.toBack();
while (toRemoveIterator.hasPrevious())
d->peers.removeAt(toRemoveIterator.previous());
If we still have too many peers, remove the oldest ones.
while (d->peers.size() > maxPeers)
d->peers.takeFirst();
}
if (d->state != Paused && d->state != Stopping && d->state != Idle) {
if (d->state == Searching || d->state == WarmingUp)
connectToPeers();
else
d->callPeerConnector();
}
}
void TorrentClient::trackerStopped()
{
d->setState(Idle);
emit stopped();
}
void TorrentClient::updateProgress(int progress)
{
if (progress == -1 && d->pieceCount > 0) {
int newProgress = (d->completedPieces.count(true) * 100) / d->pieceCount;
if (d->lastProgressValue != newProgress) {
d->lastProgressValue = newProgress;
emit progressUpdated(newProgress);
}
} else if (d->lastProgressValue != progress) {
d->lastProgressValue = progress;
emit progressUpdated(progress);
}
}