www.gusucode.com > VC_C++源码,界面编程,网页爬虫源码程序 > VC_C++源码,界面编程,网页爬虫源码程序/code/webpageloader_SourceCode/ThreadManager.cpp

    // ThreadManager.cpp: implementation of the CThreadManager class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "WebPageLoader.h"
#include "ThreadManager.h"

#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif

//////////////////////////////////////////////////////////////////////
// Construction/Destruction

CThreadManager::CThreadManager()
{
   m_nActiveThreads = m_nTotalThreads = m_nMaxThreads = 0;
}

CThreadManager::~CThreadManager()
{
   TRACE("Running ThreadManager cleanup...\n");
   // Remove all thread queues and stuff
   CSingleLock lock(&m_cs, TRUE);
   TRY {
      while( !m_IdleQueue.IsEmpty() ) {
         CSession *pSession = m_IdleQueue.RemoveHead();
         ASSERT_VALID(pSession);
         TRACE(_T("Deleting thread %d.\n"), pSession->m_hThread);
         delete pSession;
      };
      while( !m_StoppedQueue.IsEmpty() ) {
         CSession *pSession = m_StoppedQueue.RemoveHead();
         ASSERT_VALID(pSession);
         TRACE(_T("Deleting thread %d.\n"), pSession->m_hThread);
         delete pSession;
      };
      while( !m_SessionQueue.IsEmpty() ) {
         CSession *pSession = m_SessionQueue.RemoveHead();
         ASSERT_VALID(pSession);
         TRACE(_T("Deleting thread %d.\n"), pSession->m_hThread);
         delete pSession;
      };
   }
   CATCH_ALL( e )
   {
      // dummy
   }
   END_CATCH_ALL
}


//////////////////////////////////////////////////////////////////////
// Master functions

void CThreadManager::Close()
{
   // Stop running threads
   StopRunningThreads();
   // Wait for running threads
   DWORD dwTick = ::GetTickCount();
   while( HasRunningThreads() ) {
      ::Sleep(200L);
      if( ::GetTickCount() - dwTick > 2000L ) break;
   }
}

BOOL CThreadManager::ScheduleRun()
// LOCKS! Locks in each function call.
{
   BOOL bRefresh = FALSE;
   bRefresh |= MoveStoppedThreads();
   bRefresh |= ScheduleNewThreads();
   bRefresh |= CheckForKilledThreads();
   bRefresh |= RemoveKilledThreads();
   return bRefresh;
};

CSession *CThreadManager::FindSession(LONG ID, SessionQueueType Type/*=QUEUE_ALL*/)
// DOES NOT LOCK!
{
   CSession *session;
   POSITION pos;
   if( Type & QUEUE_WAIT ) {
      pos = m_IdleQueue.GetHeadPosition();
      while( pos!=NULL ) {
         session = m_IdleQueue.GetNext(pos);
         ASSERT_VALID(session);
         if( session->m_iUniqueID==ID ) return session;
      };
   };
   if( Type & QUEUE_RUNNING ) {
      pos = m_SessionQueue.GetHeadPosition();
      while( pos!=NULL ) {
         session = m_SessionQueue.GetNext(pos);
         ASSERT_VALID(session);
         if( session->m_iUniqueID==ID ) return session;
      };
   };
   if( Type & QUEUE_STOPPED ) {
      pos = m_StoppedQueue.GetHeadPosition();
      while( pos!=NULL ) {
         session = m_StoppedQueue.GetNext(pos);
         ASSERT_VALID(session);
         if( session->m_iUniqueID==ID ) return session;
      };
   };
   return NULL;
};

void CThreadManager::GetSessions(CSessionList &List, SessionQueueType Type/*=QUEUE_ALL*/)
// DOES NOT LOCK!
{
   List.RemoveAll();
   POSITION pos;
   if( Type & QUEUE_WAIT ) {
      pos = m_IdleQueue.GetHeadPosition();
      while( pos!=NULL ) {
         List.AddTail( m_IdleQueue.GetNext(pos) );
      };
   };
   if( Type & QUEUE_RUNNING ) {
      pos = m_SessionQueue.GetHeadPosition();
      while( pos!=NULL ) {
         List.AddTail( m_SessionQueue.GetNext(pos) );
      };
   };
   if( Type & QUEUE_STOPPED ) {
      pos = m_StoppedQueue.GetHeadPosition();
      while( pos!=NULL ) {
         List.AddTail( m_StoppedQueue.GetNext(pos) );
      };
   };
};


//////////////////////////////////////////////////////////////////////
// Management functions

void CThreadManager::AddSession(CSession *pSession)
// LOCKS!
{
   ASSERT_VALID(pSession);
   CSingleLock lock(&m_cs, TRUE);
   pSession->SetState(STATE_QUEUED);
   m_IdleQueue.AddTail( pSession );
   AfxGetMainWnd()->PostMessage(WM_SCHEDULE);
};

void CThreadManager::ThreadSleep()
{
};

void CThreadManager::ThreadResume()
{
};

void CThreadManager::ThreadStop()
{
};

BOOL CThreadManager::MoveStoppedThreads()
// LOCKS!
{
   CSingleLock lock(&m_cs, TRUE);
   BOOL bProcessed = FALSE;

   // Move stopped threads to idle list...
   POSITION pos;
   for( pos = m_SessionQueue.GetHeadPosition(); pos!=NULL; ) {
      POSITION oldpos = pos;
      CSession *pSession = m_SessionQueue.GetNext(pos);
      ASSERT_VALID(pSession);
      if( pSession->m_State==STATE_STOPPED ) {
         // Lock the sesison
         CSingleLock lock( *pSession, TRUE );
         // Remove the session from the running queue
         m_SessionQueue.RemoveAt(oldpos);
         // Sessions state is no longer running
         pSession->m_bRunning = false;
         // Now add session to idle queue...
         m_StoppedQueue.AddHead(pSession);         
         // Statistics...
         m_nActiveThreads--;
         // Done
         bProcessed = TRUE;
         // Start from the head again...
         pos = m_SessionQueue.GetHeadPosition();      
      };
   };
   return bProcessed;
}

BOOL CThreadManager::CheckForKilledThreads()
// LOCKS!
{
   CSingleLock lock(&m_cs, TRUE);
   BOOL bProcessed = FALSE;
   // If any session has been marked as killed, we should
   // make sure it will be removed when the thread is done!
   CSessionList all;
   GetSessions(all);

   POSITION pos = all.GetHeadPosition();
   while( pos!=NULL ) {
      CSession *pSession = all.GetNext(pos);
      bool bRemove = false;
      switch( pSession->m_State ) {
      case STATE_STOPPED:
         if( pSession->m_bKillRequest ) bRemove = true;
         if( pSession->m_Preferences->m_bDeleteThreadWhenDone ) bRemove = true;
         break;
      case STATE_QUEUED:
      case STATE_SLEEPING:
         if( pSession->m_bKillRequest ) bRemove = true;
         break;
      case STATE_RUNNING:
      case STATE_CONNECTING:
         if( pSession->m_bKillRequest ) {
            ::Sleep(200);
            // Nuke the download file
            CSingleLock lock( *pSession, TRUE );
            CHttpFile *pFile = pSession->m_pCurrentDownloadFile;
            if( pFile!=NULL ) {
               TRACE("Have to abort download!\n");
               TRY
               {
                  pFile->Abort();
               }
               CATCH_ALL(e)
               {
                  TRACE("Download abort failed!!!\n");
               }
               END_CATCH_ALL
               lock.Unlock();
               bProcessed = TRUE; // yes, refresh again
            }
         }
         break;
      };
      if( bRemove ) {
         ASSERT_VALID(pSession);
         CSingleLock lock( *pSession, TRUE );
         // Session is now dead.
         // Cleanup code will remove it from queue later.
         pSession->m_State = STATE_KILLED;         
         bProcessed = TRUE;
      }
   };

   return bProcessed;
};

BOOL CThreadManager::ScheduleNewThreads()
// LOCKS!
{
   CSingleLock lock(&m_cs, TRUE);
   BOOL bProcessed = FALSE;
   
   // If any vacant slots, we remove the first thread from the
   // session queue and schedule it...
   if( m_SessionQueue.GetCount() < m_nMaxThreads && m_IdleQueue.GetCount()>0 ) {
      CSession *pSession = m_IdleQueue.RemoveHead();
      ASSERT_VALID(pSession);
      if( !pSession->m_sURL.IsEmpty() ) {
         {
            CSingleLock lock( *pSession, TRUE );
            // Session is now connecting
            pSession->m_State = STATE_CONNECTING;
            pSession->m_bRunning = true;
            // So add it to the running queue
            m_SessionQueue.AddTail(pSession);
         }
         // Spawn download thread
         CWinThread *pThread = AfxBeginThread( DownloadSessionThread, pSession );
         {
            CSingleLock lock( *pSession, TRUE );
            pSession->m_hThread = pThread->m_hThread;
         }
         m_nActiveThreads++;
         m_nTotalThreads++;
         // Done for now..
         bProcessed = TRUE;
      };
   };

   return bProcessed;
};

BOOL CThreadManager::RemoveKilledThreads()
// LOCKS!
{
   CSingleLock lock(&m_cs, TRUE);
   BOOL bProcessed = FALSE;

   // Move stopped threads to idle list...
   CSessionList all;
   GetSessions(all);

   POSITION pos = all.GetHeadPosition();
   while( pos!=NULL ) {
      CSession *pSession = all.GetNext(pos);
      ASSERT_VALID(pSession);
      if( pSession->m_State==STATE_KILLED ) {
         CWaitCursor cursor;
         // Ok, we cannot use CSingleLock here, because we're going
         // to nuke the object from memory
         CSyncObject *pLock = *pSession;
         ASSERT_VALID(pLock);
         pLock->Lock();
         TRACE(_T("Checking thread %d for termination.\n"), pSession->m_hThread);
         // First, remove it from queues...
         POSITION delpos;
#define DeleteQueueEntry(queue,entry) delpos = queue.Find(entry); if( delpos!=NULL ) queue.RemoveAt(delpos)
         DeleteQueueEntry(m_StoppedQueue,pSession);
         DeleteQueueEntry(m_IdleQueue,pSession);
         DeleteQueueEntry(m_SessionQueue,pSession);
         DeleteQueueEntry(all,pSession);
#undef DeleteQueueEntry
         // Check if thread is really dead?
         DWORD dwCode;
         if( ::GetExitCodeThread(pSession->m_hThread, &dwCode)==FALSE ) {
            dwCode = ::GetLastError();
            TRACE(_T("Thread %d not available. Err Code=%d!\n"), pSession->m_hThread, dwCode);
         }
         if( dwCode==STILL_ACTIVE ) {
            pLock->Unlock();
            TRACE(_T("Thread %d not complete. Err=%d!\n"), pSession->m_hThread, ::GetLastError());
            // Give it some time to die
            ::Sleep(500);
            // TODO: Do a loop until thread is killed!
            //       Beware 'm_hThread' may actually point to a new thread (which re-uses
            //       the same id)? So we don't do any fancy stuff here!!!
            //       MSDN "Multithreading: Terminating Threads" explains how to do it correctly.
            pLock->Lock();
         }
         // It dead...
         TRY 
         {
            delete pSession;
         }
         CATCH_ALL(e)
         {
            // Dummy
         }
         END_CATCH_ALL
         // We don't unlock!!! Destructor will do it!
         bProcessed = TRUE;
         // Restart search...
         pos = all.GetHeadPosition();
      };
   };

   return bProcessed;
};

BOOL CThreadManager::HasRunningThreads()
// LOCKS!
{
   CSingleLock lock(&m_cs, TRUE);

   CSessionList all;
   GetSessions(all);

   POSITION pos = all.GetHeadPosition();
   while( pos!=NULL ) {
      CSession *pSession = all.GetNext(pos);
      switch( pSession->m_State ) {
      case STATE_RUNNING:
      case STATE_CONNECTING:
         return TRUE;
      }
   }
   return FALSE;
};

void CThreadManager::StopRunningThreads()
// LOCKS!
{
   int nCount = 0;
   {
      CSingleLock lock(&m_cs, TRUE);
      nCount = m_SessionQueue.GetCount();
   }
   for( int i = 0; i < nCount; i++ ) AfxGetMainWnd()->SendMessage(WM_COMMAND, MAKEWPARAM(ID_EDIT_DELETE, 0), 0L);
   MoveStoppedThreads();
   CheckForKilledThreads();
   RemoveKilledThreads();
}