www.gusucode.com > eMule电驴下载VC++源代码-源码程序 > eMule电驴下载VC++源代码-源码程序\code\srchybrid\UploadBandwidthThrottler.cpp
//Download by http://www.NewXing.com //this file is part of eMule //Copyright (C)2002 Merkur ( merkur-@users.sourceforge.net / http://www.emule-project.net ) // //This program is free software; you can redistribute it and/or //modify it under the terms of the GNU General Public License //as published by the Free Software Foundation; either //version 2 of the License, or (at your option) any later version. // //This program is distributed in the hope that it will be useful, //but WITHOUT ANY WARRANTY; without even the implied warranty of //MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //GNU General Public License for more details. // //You should have received a copy of the GNU General Public License //along with this program; if not, write to the Free Software //Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. #include "stdafx.h" #include "emule.h" #include "UploadBandwidthThrottler.h" #include "EMSocket.h" #include "opcodes.h" #include "UploadQueue.h" #include "LastCommonRouteFinder.h" #include "OtherFunctions.h" #ifndef _CONSOLE #include "emuledlg.h" #endif #ifdef _DEBUG #undef THIS_FILE static char THIS_FILE[]=__FILE__; #define new DEBUG_NEW #endif /** * The constructor starts the thread. */ UploadBandwidthThrottler::UploadBandwidthThrottler(void) { m_SentBytesSinceLastCall = 0; m_SentBytesSinceLastCallExcludingOverhead = 0; m_highestNumberOfFullyActivatedSlots = 0; threadEndedEvent = new CEvent(0, 1); doRun = true; AfxBeginThread(RunProc, (LPVOID)this); } /** * The destructor stops the thread. If the thread has already stoppped, destructor does nothing. */ UploadBandwidthThrottler::~UploadBandwidthThrottler(void) { EndThread(); delete threadEndedEvent; } void UploadBandwidthThrottler::SetAllowedDataRate(uint32 newValue) { sendLocker.Lock(); m_allowedDataRate = newValue; sendLocker.Unlock(); } /** * Find out how many bytes that has been put on the sockets since the last call to this * method. Includes overhead of control packets. * * @return the number of bytes that has been put on the sockets since the last call */ uint64 UploadBandwidthThrottler::GetNumberOfSentBytesSinceLastCallAndReset() { sendLocker.Lock(); uint64 numberOfSentBytesSinceLastCall = m_SentBytesSinceLastCall; m_SentBytesSinceLastCall = 0; sendLocker.Unlock(); return numberOfSentBytesSinceLastCall; } /** * Find out how many bytes that has been put on the sockets since the last call to this * method. Excludes overhead of control packets. * * @return the number of bytes that has been put on the sockets since the last call */ uint64 UploadBandwidthThrottler::GetNumberOfSentBytesExcludingOverheadSinceLastCallAndReset() { sendLocker.Lock(); uint64 numberOfSentBytesSinceLastCall = m_SentBytesSinceLastCallExcludingOverhead; m_SentBytesSinceLastCallExcludingOverhead = 0; sendLocker.Unlock(); return numberOfSentBytesSinceLastCall; } /** * Find out the highest number of slots that has been fed data in the normal standard loop * of the thread since the last call of this method. This means all slots that haven't * been in the trickle state during the entire time since the last call. * * @return the highest number of fully activated slots during any loop since last call */ uint32 UploadBandwidthThrottler::GetHighestNumberOfFullyActivatedSlotsSinceLastCallAndReset() { sendLocker.Lock(); uint64 highestNumberOfFullyActivatedSlots = m_highestNumberOfFullyActivatedSlots; m_highestNumberOfFullyActivatedSlots = 0; sendLocker.Unlock(); return highestNumberOfFullyActivatedSlots; } /** * Add a socket to the list of sockets that have upload slots. The main thread will * continously call send on these sockets, to give them chance to work off their queues. * The sockets are called in the order they exist in the list, so the top socket (index 0) * will be given a chance first to use bandwidth, and then the next socket (index 1) etc. * * It is possible to add a socket several times to the list without removing it inbetween, * but that should be avoided. * * @param index insert the socket at this place in the list. An index that is higher than the * current number of sockets in the list will mean that the socket should be inserted * last in the list. * * @param socket the address to the socket that should be added to the list. If the address is NULL, * this method will do nothing. */ void UploadBandwidthThrottler::AddToStandardList(uint32 index, CEMSocket* socket) { if(socket != NULL) { sendLocker.Lock(); RemoveFromStandardListNoLock(socket); if(index > (uint32)m_StandardOrder_list.GetSize()) { index = m_StandardOrder_list.GetSize(); } m_StandardOrder_list.InsertAt(index, socket); sendLocker.Unlock(); } else { //if (thePrefs.GetVerbose()) // theApp.emuledlg->AddDebugLogLine(true,"Tried to add NULL socket to UploadBandwidthThrottler Standard list! Prevented."); } } /** * Remove a socket from the list of sockets that have upload slots. * * If the socket has mistakenly been added several times to the list, this method * will return all of the entries for the socket. * * @param socket the address of the socket that should be removed from the list. If this socket * does not exist in the list, this method will do nothing. */ void UploadBandwidthThrottler::RemoveFromStandardList(CEMSocket* socket) { sendLocker.Lock(); RemoveFromStandardListNoLock(socket); sendLocker.Unlock(); } /** * Remove a socket from the list of sockets that have upload slots. NOT THREADSAFE! * This is an internal method that doesn't take the necessary lock before it removes * the socket. This method should only be called when the current thread already owns * the sendLocker lock! * * @param socket address of the socket that should be removed from the list. If this socket * does not exist in the list, this method will do nothing. */ void UploadBandwidthThrottler::RemoveFromStandardListNoLock(CEMSocket* socket) { // Find the slot int slotCounter = 0; bool foundSocket = false; while(slotCounter < m_StandardOrder_list.GetSize() && foundSocket == false) { if(m_StandardOrder_list.GetAt(slotCounter) == socket) { // Remove the slot m_StandardOrder_list.RemoveAt(slotCounter); foundSocket = true; } else { slotCounter++; } } } /** * Notifies the send thread that it should try to call controlpacket send * for the supplied socket. It is allowed to call this method several times * for the same socket, without having controlpacket send called for the socket * first. The doublette entries are never filtered, since it is incurs less cpu * overhead to simply call Send() in the socket for each double. Send() will * already have done its work when the second Send() is called, and will just * return with little cpu overhead. * * @param socket address to the socket that requests to have controlpacket send * to be called on it */ void UploadBandwidthThrottler::QueueForSendingControlPacket(CEMSocket* socket) { // Get critical section tempQueueLocker.Lock(); if(doRun) { m_TempControlQueue_list.AddTail(socket); } // End critical section tempQueueLocker.Unlock(); } /** * Remove the socket from all lists and queues. This will make it safe to * erase/delete the socket. It will also cause the main thread to stop calling * send() for the socket. * * @param socket address to the socket that should be removed */ void UploadBandwidthThrottler::RemoveFromAllQueues(CEMSocket* socket) { // Get critical section sendLocker.Lock(); if(doRun) { // Remove this socket from control packet queue { POSITION pos1, pos2; for (pos1 = m_ControlQueue_list.GetHeadPosition();( pos2 = pos1 ) != NULL;) { m_ControlQueue_list.GetNext(pos1); CEMSocket* socketinQueue = m_ControlQueue_list.GetAt(pos2); if(socketinQueue == socket) { m_ControlQueue_list.RemoveAt(pos2); } } } tempQueueLocker.Lock(); { POSITION pos1, pos2; for (pos1 = m_TempControlQueue_list.GetHeadPosition();( pos2 = pos1 ) != NULL;) { m_TempControlQueue_list.GetNext(pos1); CEMSocket* socketinQueue = m_TempControlQueue_list.GetAt(pos2); if(socketinQueue == socket) { m_TempControlQueue_list.RemoveAt(pos2); } } } tempQueueLocker.Unlock(); // And remove it from upload slots RemoveFromStandardListNoLock(socket); } // End critical section sendLocker.Unlock(); } /** * Make the thread exit. This method will not return until the thread has stopped * looping. This guarantees that the thread will not access the CEMSockets after this * call has exited. */ void UploadBandwidthThrottler::EndThread() { sendLocker.Lock(); // signal the thread to stop looping and exit. doRun = false; sendLocker.Unlock(); // wait for the thread to signal that it has stopped looping. threadEndedEvent->Lock(); } /** * Start the thread. Called from the constructor in this class. * * @param pParam * * @return */ UINT AFX_CDECL UploadBandwidthThrottler::RunProc(LPVOID pParam) { DbgSetThreadName("UploadBandwidthThrottler"); UploadBandwidthThrottler* uploadBandwidthThrottler = (UploadBandwidthThrottler*)pParam; return uploadBandwidthThrottler->RunInternal(); } /** * The thread method that handles calling send for the individual sockets. * * This method decides to which slot the currently available bandwidth chunk * will go to. There are several algorithms that could be inserted here. The * SlotFocus algorithm available in ZZUL tries to send as fast as possible to each * slot, before going to the next slot. The current experimental algorithm tries to * feed all slots an equal amount. If not all slots can get at least UPLOAD_CLIENT_DATARATE * each, the scheduler puts the last slots on trickle, and just gives UPLOAD_CLIENT_DATARATE * to as many slots as it has enough bandwidth for. * * Control packets will always be tried to be sent first. * * Upload slots will not be allowed to go without having sent * called for more than a defined amount of time (i.e. two seconds). * * @return always returns 0. */ UINT UploadBandwidthThrottler::RunInternal() { DWORD lastLoopTick = ::GetTickCount(); sint64 bytesToSpend = 0; uint32 allowedDataRate = 0; while(doRun) { DWORD timeSinceLastLoop = ::GetTickCount() - lastLoopTick; #define TIME_BETWEEN_UPLOAD_LOOPS 10 if(timeSinceLastLoop < TIME_BETWEEN_UPLOAD_LOOPS) { Sleep(TIME_BETWEEN_UPLOAD_LOOPS-timeSinceLastLoop); } sendLocker.Lock(); // PENDING: This would be used if UploadSpeedSense wasn't there. // This direct connection could be removed between UploadSpeedSense // and the throttler, by moving the value via CUploadQueue::UploadTimer, // but I haven't decided about that yet. // allowedDataRate = m_allowedDataRate; // Get current speed from UploadSpeedSense allowedDataRate = theApp.lastCommonRouteFinder->GetUpload(); //uint32 minFragSize = 512; uint32 minFragSize = allowedDataRate / 50; if(minFragSize < 512) { minFragSize = 512; } else if(minFragSize > 2800) { minFragSize = 2800; } const DWORD thisLoopTick = ::GetTickCount(); timeSinceLastLoop = thisLoopTick - lastLoopTick; if(timeSinceLastLoop > 1*1000) { // theApp.QueueDebugLogLine(false,"UploadBandwidthThrottler: Time since last loop too long (%i).", timeSinceLastLoop); timeSinceLastLoop = 1*1000; lastLoopTick = thisLoopTick - timeSinceLastLoop; } // Calculate how many bytes we can spend if(allowedDataRate != 0) { bytesToSpend += allowedDataRate*(thisLoopTick-lastLoopTick)/1000; } else { bytesToSpend = _I64_MAX; } //lastLoopTick = thisLoopTick; uint64 spentBytes = 0; uint64 spentOverhead = 0; tempQueueLocker.Lock(); // are there any sockets in m_TempControlQueue_list? Move them to normal m_ControlQueue_list; while(!m_TempControlQueue_list.IsEmpty()) { CEMSocket* moveSocket = m_TempControlQueue_list.RemoveHead(); m_ControlQueue_list.AddTail(moveSocket); } tempQueueLocker.Unlock(); // Send any queued up control packets first while(bytesToSpend > 0 && spentBytes <= (uint64)bytesToSpend && !m_ControlQueue_list.IsEmpty()) { CEMSocket* socket = m_ControlQueue_list.RemoveHead(); if(socket != NULL) { SocketSentBytes socketSentBytes = socket->Send(bytesToSpend-spentBytes, true); spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets; spentOverhead += socketSentBytes.sentBytesControlPackets; } } // Check if any sockets haven't gotten data for a long time. Then trickle them a package. for(uint32 slotCounter = 0; slotCounter < (uint32)m_StandardOrder_list.GetSize(); slotCounter++) { CEMSocket* socket = m_StandardOrder_list.GetAt(slotCounter); if(socket != NULL) { if((thisLoopTick-socket->GetLastCalledSend()) > 1000) { // trickle SocketSentBytes socketSentBytes = socket->Send(512); spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets; spentOverhead += socketSentBytes.sentBytesControlPackets; } } else { theApp.QueueDebugLogLine(false,"UploadBandwidthThrottler: There was a NULL socket in the standard list (trickle)! Prevented usage. Index: %i Size: %i", slotCounter, m_StandardOrder_list.GetSize()); } } uint32 leftoverDueToRounding = 0; // how many slots are fully saturated? uint32 currentFullyActivatedSlots = 0; if(bytesToSpend > 0 && (uint64)bytesToSpend > spentBytes) { // calc number of clients to feed equally uint64 numberOfClientsToFeed = 0; if(thisLoopTick-lastLoopTick > 0) { numberOfClientsToFeed = (uint64)(bytesToSpend-spentBytes)*1000/(UPLOAD_CLIENT_DATARATE*(thisLoopTick-lastLoopTick)); } if(numberOfClientsToFeed > (uint64)m_StandardOrder_list.GetSize()) { numberOfClientsToFeed = m_StandardOrder_list.GetSize(); } uint64 bytesPerClient = bytesToSpend-spentBytes; if(numberOfClientsToFeed > 1) { bytesPerClient = (bytesToSpend-spentBytes)/numberOfClientsToFeed; leftoverDueToRounding = (bytesToSpend-spentBytes)%numberOfClientsToFeed; } // "Full" speed sockets for(uint32 slotCounter = 0; slotCounter < (uint32)m_StandardOrder_list.GetSize() && bytesToSpend > leftoverDueToRounding && spentBytes <= (uint64)bytesToSpend-leftoverDueToRounding; slotCounter++) { CEMSocket* socket = m_StandardOrder_list.GetAt(slotCounter); if(socket != NULL) { SocketSentBytes socketSentBytes = socket->Send(min(bytesPerClient, bytesToSpend-leftoverDueToRounding-spentBytes)); spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets; spentOverhead += socketSentBytes.sentBytesControlPackets; } else { theApp.QueueDebugLogLine(false,"UploadBandwidthThrottler: There was a NULL socket in the standard list (full)! Prevented usage. Index: %i Size: %i", slotCounter, m_StandardOrder_list.GetSize()); } } // Any data that is left over at this point is given to any slot that wants it. First come first serve. uint32 saturatedSlotCounter = 0; for(saturatedSlotCounter = 0; saturatedSlotCounter < (uint32)m_StandardOrder_list.GetSize() && bytesToSpend > leftoverDueToRounding+minFragSize && spentBytes <= (uint64)bytesToSpend-(leftoverDueToRounding+minFragSize); saturatedSlotCounter++) { CEMSocket* socket = m_StandardOrder_list.GetAt(saturatedSlotCounter); if(socket != NULL) { SocketSentBytes socketSentBytes = socket->Send(bytesToSpend-leftoverDueToRounding-spentBytes); spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets; spentOverhead += socketSentBytes.sentBytesControlPackets; } else { theApp.QueueDebugLogLine(false,"UploadBandwidthThrottler: There was a NULL socket in the standard list (leftovers)! Prevented usage. Index: %i Size: %i", saturatedSlotCounter, m_StandardOrder_list.GetSize()); } } if(bytesToSpend-spentBytes > leftoverDueToRounding+minFragSize) { currentFullyActivatedSlots = saturatedSlotCounter; } else if(saturatedSlotCounter > 0) { currentFullyActivatedSlots = saturatedSlotCounter-1; } m_highestNumberOfFullyActivatedSlots = max(m_highestNumberOfFullyActivatedSlots, currentFullyActivatedSlots); } lastLoopTick = thisLoopTick; bytesToSpend -= spentBytes; // These are old limiting calculation valid for SlotFocus packet scheduler. I haven't redone them for // the "equal for all slots" packet scheduler yet, since that scheduler isn't finished yet. if(bytesToSpend < -((sint64)m_StandardOrder_list.GetSize()*minFragSize)) { sint64 newBytesToSpend = -((sint64)m_StandardOrder_list.GetSize()*minFragSize); TRACE("UploadBandwidthThrottler: Overcharged bytesToSpend. Limiting negative value. Old value: %I64i New value: %i\n", bytesToSpend, newBytesToSpend); bytesToSpend = newBytesToSpend; } else if(bytesToSpend > leftoverDueToRounding+minFragSize) { //theApp.QueueDebugLogLine(false,"UploadBandwidthThrottler::RunInternal(): Too much in bytesToSpend. Limiting positive value. Old value: %I64i New value: %i", bytesToSpend, leftoverDueToRounding+minFragSize); bytesToSpend = leftoverDueToRounding+minFragSize; //m_highestNumberOfFullyActivatedSlots = m_StandardOrder_list.GetSize(); } m_SentBytesSinceLastCall += spentBytes; m_SentBytesSinceLastCallExcludingOverhead += spentBytes-spentOverhead; sendLocker.Unlock(); } threadEndedEvent->SetEvent(); tempQueueLocker.Lock(); m_TempControlQueue_list.RemoveAll(); tempQueueLocker.Unlock(); sendLocker.Lock(); m_ControlQueue_list.RemoveAll(); m_StandardOrder_list.RemoveAll(); sendLocker.Unlock(); return 0; }