www.gusucode.com > VC_C++源码,界面编程,网页爬虫源码程序 > VC_C++源码,界面编程,网页爬虫源码程序/code/webpageloader_SourceCode/ThreadManager.cpp
// ThreadManager.cpp: implementation of the CThreadManager class. // ////////////////////////////////////////////////////////////////////// #include "stdafx.h" #include "WebPageLoader.h" #include "ThreadManager.h" #ifdef _DEBUG #undef THIS_FILE static char THIS_FILE[]=__FILE__; #define new DEBUG_NEW #endif ////////////////////////////////////////////////////////////////////// // Construction/Destruction CThreadManager::CThreadManager() { m_nActiveThreads = m_nTotalThreads = m_nMaxThreads = 0; } CThreadManager::~CThreadManager() { TRACE("Running ThreadManager cleanup...\n"); // Remove all thread queues and stuff CSingleLock lock(&m_cs, TRUE); TRY { while( !m_IdleQueue.IsEmpty() ) { CSession *pSession = m_IdleQueue.RemoveHead(); ASSERT_VALID(pSession); TRACE(_T("Deleting thread %d.\n"), pSession->m_hThread); delete pSession; }; while( !m_StoppedQueue.IsEmpty() ) { CSession *pSession = m_StoppedQueue.RemoveHead(); ASSERT_VALID(pSession); TRACE(_T("Deleting thread %d.\n"), pSession->m_hThread); delete pSession; }; while( !m_SessionQueue.IsEmpty() ) { CSession *pSession = m_SessionQueue.RemoveHead(); ASSERT_VALID(pSession); TRACE(_T("Deleting thread %d.\n"), pSession->m_hThread); delete pSession; }; } CATCH_ALL( e ) { // dummy } END_CATCH_ALL } ////////////////////////////////////////////////////////////////////// // Master functions void CThreadManager::Close() { // Stop running threads StopRunningThreads(); // Wait for running threads DWORD dwTick = ::GetTickCount(); while( HasRunningThreads() ) { ::Sleep(200L); if( ::GetTickCount() - dwTick > 2000L ) break; } } BOOL CThreadManager::ScheduleRun() // LOCKS! Locks in each function call. { BOOL bRefresh = FALSE; bRefresh |= MoveStoppedThreads(); bRefresh |= ScheduleNewThreads(); bRefresh |= CheckForKilledThreads(); bRefresh |= RemoveKilledThreads(); return bRefresh; }; CSession *CThreadManager::FindSession(LONG ID, SessionQueueType Type/*=QUEUE_ALL*/) // DOES NOT LOCK! { CSession *session; POSITION pos; if( Type & QUEUE_WAIT ) { pos = m_IdleQueue.GetHeadPosition(); while( pos!=NULL ) { session = m_IdleQueue.GetNext(pos); ASSERT_VALID(session); if( session->m_iUniqueID==ID ) return session; }; }; if( Type & QUEUE_RUNNING ) { pos = m_SessionQueue.GetHeadPosition(); while( pos!=NULL ) { session = m_SessionQueue.GetNext(pos); ASSERT_VALID(session); if( session->m_iUniqueID==ID ) return session; }; }; if( Type & QUEUE_STOPPED ) { pos = m_StoppedQueue.GetHeadPosition(); while( pos!=NULL ) { session = m_StoppedQueue.GetNext(pos); ASSERT_VALID(session); if( session->m_iUniqueID==ID ) return session; }; }; return NULL; }; void CThreadManager::GetSessions(CSessionList &List, SessionQueueType Type/*=QUEUE_ALL*/) // DOES NOT LOCK! { List.RemoveAll(); POSITION pos; if( Type & QUEUE_WAIT ) { pos = m_IdleQueue.GetHeadPosition(); while( pos!=NULL ) { List.AddTail( m_IdleQueue.GetNext(pos) ); }; }; if( Type & QUEUE_RUNNING ) { pos = m_SessionQueue.GetHeadPosition(); while( pos!=NULL ) { List.AddTail( m_SessionQueue.GetNext(pos) ); }; }; if( Type & QUEUE_STOPPED ) { pos = m_StoppedQueue.GetHeadPosition(); while( pos!=NULL ) { List.AddTail( m_StoppedQueue.GetNext(pos) ); }; }; }; ////////////////////////////////////////////////////////////////////// // Management functions void CThreadManager::AddSession(CSession *pSession) // LOCKS! { ASSERT_VALID(pSession); CSingleLock lock(&m_cs, TRUE); pSession->SetState(STATE_QUEUED); m_IdleQueue.AddTail( pSession ); AfxGetMainWnd()->PostMessage(WM_SCHEDULE); }; void CThreadManager::ThreadSleep() { }; void CThreadManager::ThreadResume() { }; void CThreadManager::ThreadStop() { }; BOOL CThreadManager::MoveStoppedThreads() // LOCKS! { CSingleLock lock(&m_cs, TRUE); BOOL bProcessed = FALSE; // Move stopped threads to idle list... POSITION pos; for( pos = m_SessionQueue.GetHeadPosition(); pos!=NULL; ) { POSITION oldpos = pos; CSession *pSession = m_SessionQueue.GetNext(pos); ASSERT_VALID(pSession); if( pSession->m_State==STATE_STOPPED ) { // Lock the sesison CSingleLock lock( *pSession, TRUE ); // Remove the session from the running queue m_SessionQueue.RemoveAt(oldpos); // Sessions state is no longer running pSession->m_bRunning = false; // Now add session to idle queue... m_StoppedQueue.AddHead(pSession); // Statistics... m_nActiveThreads--; // Done bProcessed = TRUE; // Start from the head again... pos = m_SessionQueue.GetHeadPosition(); }; }; return bProcessed; } BOOL CThreadManager::CheckForKilledThreads() // LOCKS! { CSingleLock lock(&m_cs, TRUE); BOOL bProcessed = FALSE; // If any session has been marked as killed, we should // make sure it will be removed when the thread is done! CSessionList all; GetSessions(all); POSITION pos = all.GetHeadPosition(); while( pos!=NULL ) { CSession *pSession = all.GetNext(pos); bool bRemove = false; switch( pSession->m_State ) { case STATE_STOPPED: if( pSession->m_bKillRequest ) bRemove = true; if( pSession->m_Preferences->m_bDeleteThreadWhenDone ) bRemove = true; break; case STATE_QUEUED: case STATE_SLEEPING: if( pSession->m_bKillRequest ) bRemove = true; break; case STATE_RUNNING: case STATE_CONNECTING: if( pSession->m_bKillRequest ) { ::Sleep(200); // Nuke the download file CSingleLock lock( *pSession, TRUE ); CHttpFile *pFile = pSession->m_pCurrentDownloadFile; if( pFile!=NULL ) { TRACE("Have to abort download!\n"); TRY { pFile->Abort(); } CATCH_ALL(e) { TRACE("Download abort failed!!!\n"); } END_CATCH_ALL lock.Unlock(); bProcessed = TRUE; // yes, refresh again } } break; }; if( bRemove ) { ASSERT_VALID(pSession); CSingleLock lock( *pSession, TRUE ); // Session is now dead. // Cleanup code will remove it from queue later. pSession->m_State = STATE_KILLED; bProcessed = TRUE; } }; return bProcessed; }; BOOL CThreadManager::ScheduleNewThreads() // LOCKS! { CSingleLock lock(&m_cs, TRUE); BOOL bProcessed = FALSE; // If any vacant slots, we remove the first thread from the // session queue and schedule it... if( m_SessionQueue.GetCount() < m_nMaxThreads && m_IdleQueue.GetCount()>0 ) { CSession *pSession = m_IdleQueue.RemoveHead(); ASSERT_VALID(pSession); if( !pSession->m_sURL.IsEmpty() ) { { CSingleLock lock( *pSession, TRUE ); // Session is now connecting pSession->m_State = STATE_CONNECTING; pSession->m_bRunning = true; // So add it to the running queue m_SessionQueue.AddTail(pSession); } // Spawn download thread CWinThread *pThread = AfxBeginThread( DownloadSessionThread, pSession ); { CSingleLock lock( *pSession, TRUE ); pSession->m_hThread = pThread->m_hThread; } m_nActiveThreads++; m_nTotalThreads++; // Done for now.. bProcessed = TRUE; }; }; return bProcessed; }; BOOL CThreadManager::RemoveKilledThreads() // LOCKS! { CSingleLock lock(&m_cs, TRUE); BOOL bProcessed = FALSE; // Move stopped threads to idle list... CSessionList all; GetSessions(all); POSITION pos = all.GetHeadPosition(); while( pos!=NULL ) { CSession *pSession = all.GetNext(pos); ASSERT_VALID(pSession); if( pSession->m_State==STATE_KILLED ) { CWaitCursor cursor; // Ok, we cannot use CSingleLock here, because we're going // to nuke the object from memory CSyncObject *pLock = *pSession; ASSERT_VALID(pLock); pLock->Lock(); TRACE(_T("Checking thread %d for termination.\n"), pSession->m_hThread); // First, remove it from queues... POSITION delpos; #define DeleteQueueEntry(queue,entry) delpos = queue.Find(entry); if( delpos!=NULL ) queue.RemoveAt(delpos) DeleteQueueEntry(m_StoppedQueue,pSession); DeleteQueueEntry(m_IdleQueue,pSession); DeleteQueueEntry(m_SessionQueue,pSession); DeleteQueueEntry(all,pSession); #undef DeleteQueueEntry // Check if thread is really dead? DWORD dwCode; if( ::GetExitCodeThread(pSession->m_hThread, &dwCode)==FALSE ) { dwCode = ::GetLastError(); TRACE(_T("Thread %d not available. Err Code=%d!\n"), pSession->m_hThread, dwCode); } if( dwCode==STILL_ACTIVE ) { pLock->Unlock(); TRACE(_T("Thread %d not complete. Err=%d!\n"), pSession->m_hThread, ::GetLastError()); // Give it some time to die ::Sleep(500); // TODO: Do a loop until thread is killed! // Beware 'm_hThread' may actually point to a new thread (which re-uses // the same id)? So we don't do any fancy stuff here!!! // MSDN "Multithreading: Terminating Threads" explains how to do it correctly. pLock->Lock(); } // It dead... TRY { delete pSession; } CATCH_ALL(e) { // Dummy } END_CATCH_ALL // We don't unlock!!! Destructor will do it! bProcessed = TRUE; // Restart search... pos = all.GetHeadPosition(); }; }; return bProcessed; }; BOOL CThreadManager::HasRunningThreads() // LOCKS! { CSingleLock lock(&m_cs, TRUE); CSessionList all; GetSessions(all); POSITION pos = all.GetHeadPosition(); while( pos!=NULL ) { CSession *pSession = all.GetNext(pos); switch( pSession->m_State ) { case STATE_RUNNING: case STATE_CONNECTING: return TRUE; } } return FALSE; }; void CThreadManager::StopRunningThreads() // LOCKS! { int nCount = 0; { CSingleLock lock(&m_cs, TRUE); nCount = m_SessionQueue.GetCount(); } for( int i = 0; i < nCount; i++ ) AfxGetMainWnd()->SendMessage(WM_COMMAND, MAKEWPARAM(ID_EDIT_DELETE, 0), 0L); MoveStoppedThreads(); CheckForKilledThreads(); RemoveKilledThreads(); }