www.gusucode.com > VC++三子棋游戏源码(类似五子棋)-源码程序 > VC++三子棋游戏源码(类似五子棋)-源码程序\code\Server\Tcpipcon.cpp

    #include "DataType.h"
#include "Global.h"
#include "tcpipcon.h"
////////////////////////////////////////////////////////////////////
// Download by http://www.NewXing.com
UINT CTcpIpConnection::ListenClientsThread(LPVOID lpParam)
{
	printf("Enter ListenClientsThread(..)...\n");

	CTcpIpConnection *pConnection = (CTcpIpConnection *)lpParam;
	pConnection->m_nThreadCount++;

	pConnection->ProcessListenClients();

	printf("Leave ListenClientsThread(..)...\n");

	return 0;
}

UINT CTcpIpConnection::ReceiveMessageThread(LPVOID lpParam)
{
	printf("Enter ReceiveMessageThread(..)...\n");

	CTcpIpConnection *pConnection = (CTcpIpConnection *)lpParam;
	pConnection->m_nThreadCount++;

	pConnection->ProcessRecvMessage();

	printf("Leave ReceiveMessageThread(..)...\n");

	return 0;
}

UINT CTcpIpConnection::SendMessageThread(LPVOID lpParam)
{
	printf("Enter SendMessageThread(..)...\n");

	CTcpIpConnection *pConnection = (CTcpIpConnection *)lpParam;
	pConnection->m_nThreadCount++;

	pConnection->ProcessSendMessage();

	printf("Leave SendMessageThread(..)...\n");

	return 0;
}

////////////////////////////////////////////////////////////////////
CTcpIpConnection::CTcpIpConnection()
{
	////
	m_bListening = FALSE;
	m_hListenThreadHandle = NULL;

	m_bThreadRunning = FALSE;
	m_hRecvMsgThreadHandle = NULL;
	m_hSendMsgThreadHandle = NULL;
	////

	////
	m_tTCPIPInfo.tConInfo.byDeviceNo = 0;
	m_tTCPIPInfo.tConInfo.nSocket = INVALID_SOCKET;
	m_tTCPIPInfo.pNext = NULL;

	FD_ZERO(&m_tRecvFDSetBuf);
	FD_ZERO(&m_tListenFDSetBuf);
	////
}

CTcpIpConnection::~CTcpIpConnection()
{
}

void CTcpIpConnection::AddTCPIPInfo(SOCKET nSocket, BYTE byDevNo)
{
	CMutexLock lock("LOCK_TCPIPINFO");

	CTCPIPInfo *pTCPIPInfo = &m_tTCPIPInfo;
	while( NULL != pTCPIPInfo->pNext )
	{
		pTCPIPInfo = pTCPIPInfo->pNext;
	}

	CTCPIPInfo *pNewTCPIPInfo = new CTCPIPInfo();
	pNewTCPIPInfo->pNext = NULL;
	pNewTCPIPInfo->tConInfo.nSocket = nSocket;
	pNewTCPIPInfo->tConInfo.byDeviceNo = byDevNo;

	pTCPIPInfo->pNext = pNewTCPIPInfo;

	m_tTCPIPInfo.tConInfo.byDeviceNo++;

	FD_SET(nSocket, &m_tRecvFDSetBuf);
}

void CTcpIpConnection::RemoveTCPIPInfo(BYTE byDevNo /*= INVALID_ID_B*/)
{
	SOCKET nSocket = GetConnection(byDevNo);

	RemoveTCPIPInfo(nSocket);
}

void CTcpIpConnection::RemoveTCPIPInfo(SOCKET nSocket /*= INVALID_SOCKET*/)
{
	CMutexLock lock("LOCK_TCPIPINFO");

	if( INVALID_SOCKET == nSocket )
	{
		CTCPIPInfo *pTCPIPInfo = m_tTCPIPInfo.pNext;
		CTCPIPInfo *pNextTCPIPInfo = NULL;
		while( NULL != pTCPIPInfo )
		{
			pNextTCPIPInfo = pTCPIPInfo->pNext;
			
			delete pTCPIPInfo;
			pTCPIPInfo = NULL;

			pTCPIPInfo = pNextTCPIPInfo;
		}

		m_tTCPIPInfo.pNext = NULL;
		m_tTCPIPInfo.tConInfo.byDeviceNo = 0;

		FD_ZERO(&m_tRecvFDSetBuf);
	}
	else
	{
		CTCPIPInfo *pPreTCPIPInfo = &m_tTCPIPInfo;
		CTCPIPInfo *pTCPIPInfo = pPreTCPIPInfo->pNext;		
		CTCPIPInfo *pNextTCPIPInfo = NULL;
		while( NULL != pTCPIPInfo )
		{			
			if( nSocket == pTCPIPInfo->tConInfo.nSocket )
			{
				pNextTCPIPInfo = pTCPIPInfo->pNext;

				delete pTCPIPInfo;
				pTCPIPInfo = NULL;

				pPreTCPIPInfo->pNext = pNextTCPIPInfo;
				pTCPIPInfo = pNextTCPIPInfo;

				m_tTCPIPInfo.tConInfo.byDeviceNo--;

				FD_CLR(nSocket, &m_tRecvFDSetBuf);

				break;
			}

			pTCPIPInfo = pTCPIPInfo->pNext;
			pPreTCPIPInfo = pPreTCPIPInfo->pNext;
		}
	}

	int nSocketCount = m_tTCPIPInfo.tConInfo.byDeviceNo;
	if( nSocketCount <= 0 )
	{
		FD_ZERO(&m_tRecvFDSetBuf);
	}
}

void *CTcpIpConnection::FindTCPIPInfo(BYTE byDevNo)
{
	CMutexLock lock("LOCK_TCPIPINFO");

	if( INVALID_ID_B == byDevNo ) return NULL;

	CTCPIPInfo *pTCPIPInfo = m_tTCPIPInfo.pNext;	
	while( NULL != pTCPIPInfo )
	{
		if( byDevNo == pTCPIPInfo->tConInfo.byDeviceNo )
		{
			return (void *)pTCPIPInfo;
		}

		pTCPIPInfo = pTCPIPInfo->pNext;
	}

	return NULL;
}

void *CTcpIpConnection::FindTCPIPInfo(SOCKET nSocket)
{
	CMutexLock lock("LOCK_TCPIPINFO");

	if( INVALID_SOCKET == nSocket ) return NULL;

	CTCPIPInfo *pTCPIPInfo = m_tTCPIPInfo.pNext;	
	while( NULL != pTCPIPInfo )
	{
		if( nSocket == pTCPIPInfo->tConInfo.nSocket )
		{
			return (void *)pTCPIPInfo;
		}

		pTCPIPInfo = pTCPIPInfo->pNext;
	}

	return NULL;
}

SOCKET CTcpIpConnection::GetConnection(BYTE byDevNo)
{
	//根据设备号获取 Socket
	SOCKET nSocket = INVALID_SOCKET;

	CTCPIPInfo *pTCPIPInfo = (CTCPIPInfo *)FindTCPIPInfo(byDevNo);
	if( NULL != pTCPIPInfo )
	{
		nSocket = pTCPIPInfo->tConInfo.nSocket;
	}

	return nSocket;
}

BYTE CTcpIpConnection::GetDeviceNo(SOCKET nSocket)
{
	//根据 Socket 获取设备号
	BYTE byDeviceNo = INVALID_ID_B;

	CTCPIPInfo *pTCPIPInfo = (CTCPIPInfo *)FindTCPIPInfo(nSocket);
	if( NULL != pTCPIPInfo )
	{
		byDeviceNo = pTCPIPInfo->tConInfo.byDeviceNo;
	}

	return byDeviceNo;
}

CMessageQueue *CTcpIpConnection::GetSendMsgQueue()
{
	return &m_tSendMsgQueue;
}

void CTcpIpConnection::CloseConnection(SOCKET nSocket /*= INVALID_SOCKET*/)
{
	CMutexLock lock("LOCK_TCPIPINFO");

	if( INVALID_SOCKET == nSocket )
	{		
		CTCPIPInfo *pTCPIPInfo = m_tTCPIPInfo.pNext;	
		while( NULL != pTCPIPInfo )
		{
			SOCKET nConSocket = pTCPIPInfo->tConInfo.nSocket;
			if( INVALID_SOCKET != nConSocket )
			{
				closesocket(nConSocket);
			}

			pTCPIPInfo->tConInfo.byDeviceNo = INVALID_ID_B;
			pTCPIPInfo->tConInfo.nSocket = INVALID_SOCKET;
			
			pTCPIPInfo = pTCPIPInfo->pNext;
		}
	}
	else
	{
		closesocket(nSocket);
	}
}

BOOL CTcpIpConnection::ConnectToServer(char *szServerName, int nPort, BYTE byDevNo)
{
	try
	{
		////通信端口初始化
		WSADATA wSData;
		WORD wVersionReq = MAKEWORD(1,1);		
		
		int nWSAResult = WSAStartup(wVersionReq, &wSData);
		if(nWSAResult != 0) return FALSE;
		
		///////////////////////

		//创建Socket
		SOCKET nSocket = socket(AF_INET, SOCK_STREAM, 0);
		if( INVALID_SOCKET == nSocket ) return FALSE;

		//Socket地址
		sockaddr_in sai;
		memset(&sai, 0, sizeof(sockaddr_in));
		sai.sin_port = htons(nPort);
		sai.sin_family = AF_INET;
		sai.sin_addr.s_addr = inet_addr(szServerName);
		if(INADDR_NONE == sai.sin_addr.s_addr)
		{
			hostent *pHostent = gethostbyname(szServerName);
			if (pHostent == NULL)
			{
				CloseConnection(nSocket);

				return FALSE;
			}

			sai.sin_addr.s_addr = ((struct in_addr FAR *)(pHostent->h_addr))->s_addr;
		}

		//联结
		int nResult = connect(nSocket, (sockaddr*)&sai, sizeof(sockaddr_in));
		if (SOCKET_ERROR == nResult)
		{
			CloseConnection(nSocket);
			
			return FALSE;
		}
		else {}
		
		BindConnection(nSocket, byDevNo);

		//创建消息处理线程...
		int nThreadCount = 0;
		m_nThreadCount = nThreadCount;
		m_bThreadRunning = TRUE;		
		if( NULL == m_hRecvMsgThreadHandle )
		{			
			m_hRecvMsgThreadHandle = CreateThread(NULL, 1024, 
					(LPTHREAD_START_ROUTINE)ReceiveMessageThread, 
					(LPVOID)this, 0, NULL);
			
			nThreadCount++;
		}
		if( NULL == m_hSendMsgThreadHandle )
		{			
			m_hSendMsgThreadHandle = CreateThread(NULL, 1024, 
					(LPTHREAD_START_ROUTINE)SendMessageThread, 
					(LPVOID)this, 0, NULL);

			nThreadCount++;
		}
		m_bThreadRunning = ( (NULL != m_hRecvMsgThreadHandle) && 
							 (NULL != m_hSendMsgThreadHandle) );

		//等待线程的创建
		int nMaxCount = 2000;
		int nTimeOutCount = 0;
		while(m_nThreadCount < nThreadCount )
		{
			Sleep(50);
			nTimeOutCount++;
			if( nTimeOutCount > nMaxCount ) break;
		}
		////

		return (INVALID_SOCKET != nSocket);
		
    }
	catch(...)
	{		
	}

	return FALSE;
}

void CTcpIpConnection::BindConnection(SOCKET nSocket, BYTE byDevNo)
{
	//把 Socket 和 设备号绑定起来并保存
    DWORD dwOptVal = 1024; 
    
    setsockopt(nSocket, IPPROTO_TCP, TCP_NODELAY, (const char*)&dwOptVal, sizeof(dwOptVal));
    setsockopt(nSocket, IPPROTO_TCP, SO_KEEPALIVE, (const char*)&dwOptVal, sizeof(dwOptVal));

	AddTCPIPInfo(nSocket, byDevNo);
}

BOOL CTcpIpConnection::DisconnectServer(BYTE byDevNo)
{
	SOCKET nSocket = GetConnection(byDevNo);

	BOOL bResult = DisconnectServer(nSocket);

	return bResult;
}

BOOL CTcpIpConnection::DisconnectServer(SOCKET nSocket)
{
	if( INVALID_SOCKET != nSocket )
	{
		CloseConnection(nSocket);
	}
	
	RemoveTCPIPInfo(nSocket);
	
	return TRUE;
}

BOOL CTcpIpConnection::StartServer()
{
	m_bListening = TRUE;
	if( NULL == m_hListenThreadHandle )
	{			
		m_hListenThreadHandle = CreateThread(NULL, 1024, 
				(LPTHREAD_START_ROUTINE)ListenClientsThread, 
				(LPVOID)this, 0, NULL);
	}

	return TRUE;
}

void CTcpIpConnection::StopServer()
{
	CloseConnection();

	m_bThreadRunning = FALSE;
    if( NULL != m_hSendMsgThreadHandle )
	{
		HANDLE hQueueHandle = GetSendMsgQueue()->GetLockQueueHandle();
		SetEvent(hQueueHandle);
		WaitForSingleObject(m_hSendMsgThreadHandle, 1000*30);
		CloseHandle(m_hSendMsgThreadHandle);
		m_hSendMsgThreadHandle = NULL;
	}
	if( NULL != m_hRecvMsgThreadHandle )
	{		
		WaitForSingleObject(m_hRecvMsgThreadHandle, 1000*30);
		CloseHandle(m_hRecvMsgThreadHandle);
		m_hRecvMsgThreadHandle = NULL;
	}

	m_bListening = FALSE;
	if( NULL != m_hListenThreadHandle )
	{
		WaitForSingleObject(m_hListenThreadHandle, 1000*30);
		CloseHandle(m_hListenThreadHandle);
		m_hListenThreadHandle = NULL;
	}
}

void CTcpIpConnection::ProcessListenClients()
{
	//Start the server firstly...
	int nPort = (int)PORTID;

	////通信端口初始化
	WSADATA wSData;
	WORD wVersionReq = MAKEWORD(1,1);		
	
	int nWSAResult = WSAStartup(wVersionReq, &wSData);
	if(nWSAResult != 0) return;
	
	///////////////////////

	SOCKET nListenSocket = socket(PF_INET,SOCK_STREAM,0);
    if(nListenSocket==INVALID_SOCKET)
    {
        printf("socket()\n");
		WSACleanup();

        return;
    }

	SOCKADDR_IN sin;
    sin.sin_family=PF_INET;
    sin.sin_port=htons(nPort);
    sin.sin_addr.s_addr=INADDR_ANY;
    int iTmp=sizeof(sin);

    if(bind(nListenSocket,(LPSOCKADDR)&sin,iTmp)==SOCKET_ERROR)
    {
        printf("bind()\n");
        closesocket(nListenSocket);
        WSACleanup();

        return;
    }

    if(listen(nListenSocket, 5)==SOCKET_ERROR)
    {
        printf("listen()\n");
        closesocket(nListenSocket);
        WSACleanup();

        return;
    }

    char szHostName[128];
	gethostname(szHostName, sizeof(szHostName));
	HOSTENT *pHostent = gethostbyname(szHostName);
	if( NULL != pHostent )
	{
		struct in_addr *pIPAddress = (struct in_addr *) pHostent->h_addr_list[0];
		printf("\nThe server is ready: %d.%d.%d.%d[%s], Port = %d...\n\n", 
				pIPAddress->S_un.S_un_b.s_b1, 
				pIPAddress->S_un.S_un_b.s_b2, 
				pIPAddress->S_un.S_un_b.s_b3, 
				pIPAddress->S_un.S_un_b.s_b4, 
				pHostent->h_name,
				nPort);
	}
	else
	{
		printf("\nThe server is ready! \nServer Name = %s, Port = %d...\n\n", 
				szHostName, nPort);
	}

	//wait for a new connection
	SOCKADDR_IN tSockPeer;	
	int nPeerLen = sizeof( tSockPeer );
	
	while(m_bListening)
	{		
		SOCKET sNewClient = accept(nListenSocket, (LPSOCKADDR)&tSockPeer, &nPeerLen);	

		if(!m_bListening) break;

		if ( sNewClient <= 0 )
		{			
			printf("accept error! errno!\n");

			continue;
		}
		//insert the new_fd to a list...
		{
		}

		//send accept notification to the client...
	}
}

void* CTcpIpConnection::ReceiveMessage(SOCKET nSocket, int &nResult)
{	
    nResult = SOCKET_OK;

	//总长度 = 消息大头的长度 + 消息小头的长度 + 数据域的长度
	WORD wHeaderLen = g_tMessageMgr.wPL_GetMainHerderLen();
	WORD wRecvMsgLen = wHeaderLen;

	//首先接收消息大头
	CHAR *pcRecvBuf = new CHAR[wRecvMsgLen];
	memset(pcRecvBuf, 0, wRecvMsgLen);
	void *pRecvMsg = ReceiveMessage(nSocket, pcRecvBuf, nResult, wRecvMsgLen);

	if (nResult != 0 || wRecvMsgLen != wHeaderLen ||
		!g_tMessageMgr.bPL_IsRcvdMainHeaderCorrect(pcRecvBuf) ) 
	{
		if ( NULL != pRecvMsg || NULL != pcRecvBuf )
		{
			delete [] pcRecvBuf;
			pcRecvBuf = NULL;
		}

		return NULL;
	}

	//从消息大头中获得消息小头和数据域的总长度
	WORD wMsgSize = g_tMessageMgr.wPL_GetMessageSizeFromHeader((CHAR*)pRecvMsg);	
	WORD wRecvMsgSize = wMsgSize;
	WORD wAllLen = wHeaderLen + wMsgSize;
	CHAR * pcAllMsg = new CHAR[wAllLen];
	memset(pcAllMsg, 0, wAllLen);
	CHAR * pcRecvMsgBuf = pcAllMsg + wHeaderLen;

	//根据消息小头和数据域的总长度接收消息小头数据和数据域的数据
	if( wMsgSize <= MESSAGE_LEN )	//一个小包
	{
		pRecvMsg = ReceiveMessage(nSocket, pcRecvMsgBuf, nResult, wRecvMsgSize);		
		if ( nResult != SOCKET_OK || wRecvMsgSize != wMsgSize ) 
		{
			if (pRecvMsg != NULL) 
			{
				delete [] pcRecvBuf;
				delete [] pcAllMsg;
			}

			return NULL;
		}
	}
	else		//大包拆分成多个小包
	{
		WORD wCount = wMsgSize/MESSAGE_LEN;
		WORD wLeftMsgSize = wMsgSize - (MESSAGE_LEN * wCount);

		//接收多个小包
		for(WORD wIndex = 0; wIndex < wCount; wIndex++)
		{
			wMsgSize = MESSAGE_LEN;
			wRecvMsgSize = MESSAGE_LEN;
			pRecvMsg = ReceiveMessage(nSocket, pcRecvMsgBuf, nResult, wRecvMsgSize);		
			if ( nResult != SOCKET_OK || wRecvMsgSize != wMsgSize ) 
			{
				if (pRecvMsg != NULL) 
				{
					delete [] pcRecvBuf;
					delete [] pcAllMsg;
				}

				return NULL;
			}

			pcRecvMsgBuf += wRecvMsgSize;
		}

		//接收最后的一个数据包(长度可能小于一个小包)
		if( wLeftMsgSize > 0 )
		{
			wMsgSize = wLeftMsgSize;
			wRecvMsgSize = wLeftMsgSize;
			pRecvMsg = ReceiveMessage(nSocket, pcRecvMsgBuf, nResult, wRecvMsgSize);		
			if ( nResult != SOCKET_OK || wRecvMsgSize != wMsgSize ) 
			{
				if (pRecvMsg != NULL) 
				{
					delete [] pcRecvBuf;
					delete [] pcAllMsg;
				}

				return NULL;
			}
		}
	}

	memcpy( pcAllMsg, pcRecvBuf, wHeaderLen );

	delete [] pcRecvBuf;
	pcRecvBuf = NULL;

	return pcAllMsg;
}

void* CTcpIpConnection::ReceiveMessage(SOCKET nSocket, CHAR *pcBuf, 
									   int &nResult, WORD &wLength)
{
    nResult = SOCKET_OK;

	int nMaxCount = 200;
	int nTimeOutCount = 0;

	int nRecvMsgLen = 0;
	char *pcRecvBuf = pcBuf;
	WORD wRecvedLen = 0;

	do
	{
		//从网络中接收信息
		nRecvMsgLen = recv(nSocket, pcRecvBuf, wLength-wRecvedLen, 0);
		if( nRecvMsgLen == SOCKET_ERROR || nRecvMsgLen == 0 )
		{
			int nErrorCode = WSAGetLastError();
			if( 0 == nErrorCode )	//接收错误了
			{
				nErrorCode = SOCKET_ERROR;
			}

            nResult = nErrorCode;

            return NULL;
		}

		wRecvedLen += nRecvMsgLen;
		pcRecvBuf += nRecvMsgLen;

		if(wRecvedLen < wLength)	//没有接收完毕,继续接收...
		{
			Sleep(50);
			nTimeOutCount++;
			if(nTimeOutCount > nMaxCount)	//超时了,跳出...
			{
				break;
			}

			continue;	//继续接收...
		}

	}while( (wRecvedLen < wLength) );

	wLength = wRecvedLen;

	return pcBuf;
}

BOOL CTcpIpConnection::SendMessage(DWORD dwSendMsgID, void *pSendMsg, 
								   BOOL bSendAtOnce /*= FALSE*/)
{
	BOOL bResult = FALSE;

	//
	TMainHeader *ptMainHead = &(((TSingleMessage*)pSendMsg)->tmainHeader);
	TSubHeader *ptSubHead = &(((TSingleMessage*)pSendMsg)->tsubHeader);
	void *pvMsgData = (void *)(&(((TSingleMessage*)pSendMsg)->pvMsgData));

	DWORD dwDataLen = ntohl( ptSubHead->dwDatalen );
	DWORD dwMsgMainSubHerderLen = g_tMessageMgr.wPL_GetMainSubHerderLen();
	DWORD dwLength = dwMsgMainSubHerderLen + dwDataLen;
	//

	if( dwDataLen <= MESSAGE_LEN )	//一个小包
	{
		bResult = SendMessage(pSendMsg, bSendAtOnce);
	}
	else	//大包拆分成多个小包
	{
		DWORD dwPacketStLen = g_tMessageMgr.wPL_GetPacketStLen();

		WORD wPacketLen = (WORD)PACKET_LEN;
		WORD wCount = dwDataLen/wPacketLen;
		WORD wLastPacketLen = dwDataLen - (wPacketLen * wCount);
		WORD wPacketCount = wCount;
		if( wLastPacketLen > 0 )
		{
			wPacketCount += 1;
		}
		DWORD dwSinglePacketLen = dwMsgMainSubHerderLen + dwPacketStLen + wPacketLen;
		DWORD dwCopyDataLen = 0;
		for(WORD wIndex = 0; wIndex < wCount; wIndex++)
		{
			CHAR *chSinglePacket = new CHAR[dwSinglePacketLen];
			memset(chSinglePacket, 0, dwSinglePacketLen);
			
			//消息大、小头
			TMainHeader *ptPacketMainHead = &(((TSingleMessage*)chSinglePacket)->tmainHeader);			
			g_tMessageMgr.BuildMainSubHeaderofMsg(ptPacketMainHead, ptMainHead, g_tMessageMgr.MAIN_HEADER);
			WORD wTmpDataLen = dwSinglePacketLen - g_tMessageMgr.wPL_GetMainHerderLen();
			ptPacketMainHead->wDatalen = htons(wTmpDataLen);
			
			TSubHeader* ptPacketSubHead = &(((TSingleMessage*)chSinglePacket)->tsubHeader);
			g_tMessageMgr.BuildMainSubHeaderofMsg(ptPacketSubHead, ptSubHead, g_tMessageMgr.SUB_HEADER);

			//小包数据结构
			CHAR *chPacketStruct = chSinglePacket + dwMsgMainSubHerderLen;
			TPacketStruct *ptPacketStruct = (TPacketStruct *)chPacketStruct;
			ptPacketStruct->dwPacketID = htonl(dwSendMsgID);
			ptPacketStruct->wPacketCount = htons(wPacketCount);
			ptPacketStruct->wPacketNo = htons((wIndex + 1));	//当前包序号(从 1 开始)
			ptPacketStruct->dwDataLen = htonl(dwLength);
			ptPacketStruct->wPacketLen = htons(wPacketLen);
			ptPacketStruct->wReserve = htons(0);

			DWORD dwPacketHeaderLen = dwMsgMainSubHerderLen + dwPacketStLen;
			memcpy((void *)(chSinglePacket + dwPacketHeaderLen), (void *)((CHAR *)pvMsgData + dwCopyDataLen), wPacketLen);
			dwCopyDataLen += wPacketLen;

			TSingleMessage *ptSendMessage = (TSingleMessage *)chSinglePacket;
			bResult = SendMessage((void *)ptSendMessage, bSendAtOnce);
		}

		if( wLastPacketLen > 0 )	//最后一个数据包(长度可能小于一个小包)
		{
			dwSinglePacketLen = dwMsgMainSubHerderLen + dwPacketStLen + wLastPacketLen;
			CHAR *chSinglePacket = new CHAR[dwSinglePacketLen];
			memset(chSinglePacket, 0, dwSinglePacketLen);
			
			//消息大、小头
			TMainHeader *ptPacketMainHead = &(((TSingleMessage*)chSinglePacket)->tmainHeader);
			g_tMessageMgr.BuildMainSubHeaderofMsg(ptPacketMainHead, ptMainHead, g_tMessageMgr.MAIN_HEADER);
			WORD wTmpDataLen = dwSinglePacketLen - g_tMessageMgr.wPL_GetMainHerderLen();
			ptPacketMainHead->wDatalen = htons(wTmpDataLen);
			
			TSubHeader* ptPacketSubHead = &(((TSingleMessage*)chSinglePacket)->tsubHeader);
			g_tMessageMgr.BuildMainSubHeaderofMsg(ptPacketSubHead, ptSubHead, g_tMessageMgr.SUB_HEADER);

			//小包数据结构
			CHAR *chPacketStruct = chSinglePacket + dwMsgMainSubHerderLen;
			TPacketStruct *ptPacketStruct = (TPacketStruct *)chPacketStruct;
			ptPacketStruct->dwPacketID = htonl(dwSendMsgID);
			ptPacketStruct->wPacketCount = htons(wPacketCount);
			ptPacketStruct->wPacketNo = htons((wIndex + 1));	//当前包序号(从 1 开始)
			ptPacketStruct->dwDataLen = htonl(dwLength);
			ptPacketStruct->wPacketLen = htons(wLastPacketLen);
			ptPacketStruct->wReserve = htons(0);

			DWORD dwPacketHeaderLen = dwMsgMainSubHerderLen + dwPacketStLen;
			memcpy((void *)(chSinglePacket + dwPacketHeaderLen), (void *)((CHAR *)pvMsgData + dwCopyDataLen), wLastPacketLen);
			dwCopyDataLen += wLastPacketLen;

			TSingleMessage *ptSendMessage = (TSingleMessage *)chSinglePacket;
			bResult = SendMessage((void *)ptSendMessage, bSendAtOnce);
		}

		g_tMessageMgr.bPL_deleteMsg(pSendMsg);
	}

	return bResult;
}

BOOL CTcpIpConnection::SendMessage(void* pSendMsg, BOOL bSendAtOnce /*= FALSE*/)
{
	if( NULL == pSendMsg ) return FALSE;

	if( bSendAtOnce )	//立即发送
	{
		BOOL bResult = FALSE;

		BYTE byDevNo = (BYTE)((TSingleMessage*)pSendMsg)->tmainHeader.byDstAddr;
		SOCKET nSocket = GetConnection(byDevNo);
		if( INVALID_SOCKET != nSocket )
		{
			bResult = SendMessage(nSocket, pSendMsg);			
		}

		return bResult;
	}
	else				//先放到队列中再发送
	{
		CMessageQueue *pSendMsgQueue = GetSendMsgQueue();
		WORD wCount = pSendMsgQueue->AddMessage((TSingleMessage *)pSendMsg);
	}

	return TRUE;
}

BOOL CTcpIpConnection::SendMessage(SOCKET nSocket, void* pSendMsg)
{
	//
	TMainHeader *ptMainHead = &(((TSingleMessage*)pSendMsg)->tmainHeader);
	TSubHeader *ptSubHead = &(((TSingleMessage*)pSendMsg)->tsubHeader);
	void *pvMsgData = (void *)(&(((TSingleMessage*)pSendMsg)->pvMsgData));

	DWORD dwMsgMainSubHerderLen = g_tMessageMgr.wPL_GetMainSubHerderLen();
	DWORD dwDataLen = ntohl( ptSubHead->dwDatalen );
	DWORD dwLength = dwMsgMainSubHerderLen + dwDataLen;

	if( dwDataLen <= MESSAGE_LEN )	//一个小包
	{
		//数据长度不变: 大小头长度 + 数据域长度
	}
	else	//大包拆分成多个小包
	{
		DWORD dwPacketStLen = g_tMessageMgr.wPL_GetPacketStLen();

		//发送的数据长度为: 大小头长度 + 小包结构长度 + 数据域长度
		TPacketStruct *ptPacket = (TPacketStruct *)(pvMsgData);
		WORD wPacketLen = ntohs(ptPacket->wPacketLen);
		dwLength = dwMsgMainSubHerderLen + dwPacketStLen + wPacketLen;
	}
	///

	int nMaxCount = 200;
	int nTimeOutCount = 0;

	try{
		int nSendingLen = 0;
		char* pcSendBuf = (char*)pSendMsg;		
		int nSentLen = 0;

		do
		{	
			//往网络上发送信息
			nSendingLen = send(nSocket, pcSendBuf, dwLength-nSentLen, 0);
			if (nSendingLen == SOCKET_ERROR) 
			{
				return FALSE;
			}

			nSentLen += nSendingLen;
			pcSendBuf += nSendingLen;

			if( nSentLen < dwLength )	//没有发送完毕,继续发送...
			{
				Sleep(50);
				nTimeOutCount++;
				if( nTimeOutCount > nMaxCount)	//超时了,跳出...
				{
					break;
				}

				continue;	//继续发送...
			}

		}while( (nSentLen < dwLength) );

		g_tMessageMgr.bPL_deleteMsg(pSendMsg);

		if (nSentLen != dwLength) return FALSE;

    }
   catch(...)
   {
	   return FALSE;
   }

   return TRUE;
}

void CTcpIpConnection::ProcessRecvMessage()
{
	fd_set tReadFD;
	FD_ZERO(&tReadFD);

	int nFDs = 1025;
	int nSelectSocketNum = -1;
	int nActiveSocketNum = 0;
	
	struct timeval tTimeOut;
	tTimeOut.tv_sec = 0;
	tTimeOut.tv_usec = 100 * 1000;
	
	int nResult = SOCKET_OK;
	WORD wLength = 0;
	void *pRecvMsg = NULL;

	int nConnectionCount = 0;
	TConnectionInfo *pConnectionArray = NULL;

	while( m_bThreadRunning )
	{
		Sleep(50);

		//获取 Socket 句柄,保存在 pSocketArray 数组中		
		{
			//注意: 增加额外的 { } 是为了使 CMutexLock 能自动解锁, 
			//		否则要使用 UnLock() 解锁

			CMutexLock lock("LOCK_TCPIPINFO");

			if( NULL != pConnectionArray )
			{
				delete pConnectionArray;
				pConnectionArray = NULL;
			}

			FD_ZERO(&tReadFD);
			memcpy(&tReadFD, &m_tRecvFDSetBuf, sizeof(fd_set));

			nSelectSocketNum = select(nFDs, &tReadFD, NULL, NULL, &tTimeOut);
			
			nActiveSocketNum = nSelectSocketNum;
			nConnectionCount = m_tTCPIPInfo.tConInfo.byDeviceNo;

			if( (nSelectSocketNum <= 0) || (nConnectionCount <= 0) )
			{
				continue;
			}
			
			//先把 Socket 句柄 Copy 到数组中
			pConnectionArray = new TConnectionInfo[nConnectionCount];

			int nIndex = 0;
			CTCPIPInfo *pTCPIPInfo = m_tTCPIPInfo.pNext;	
			while( NULL != pTCPIPInfo )
			{
				pConnectionArray[nIndex].byDeviceNo = 
								pTCPIPInfo->tConInfo.byDeviceNo;
				pConnectionArray[nIndex].nSocket = 
								pTCPIPInfo->tConInfo.nSocket;
				nIndex++;

				pTCPIPInfo = pTCPIPInfo->pNext;
			}
			////
		}
		////////////////////
		
		//接收消息和处理消息
		for(int nIndex = 0; nIndex < nConnectionCount; nIndex++)
		{
			TConnectionInfo *tConInfo = &pConnectionArray[nIndex];
			SOCKET nSocket = tConInfo->nSocket;
			BYTE byDecNo = tConInfo->byDeviceNo;

			nResult = SOCKET_OK;
			wLength = 0;
			pRecvMsg = NULL;

			if( INVALID_SOCKET == nSocket ) continue;

			//接收一条消息
			if (FD_ISSET(nSocket, &tReadFD))			
			{
				pRecvMsg = ReceiveMessage(nSocket, nResult);
			
				if( !m_bThreadRunning ) break;

				if( nResult == SOCKET_OK )
				{
					if( NULL != pRecvMsg )
					{
						//处理接收到的一条消息
						ProcessRecvMessage(nSocket, pRecvMsg);
						////
					}
				}
				else
				{
					DisconnectServer(nSocket);
					//gMCUMgr.bDev_DisConnection(byDecNo);

					FD_ZERO(&tReadFD);
					memcpy(&tReadFD, &m_tRecvFDSetBuf, sizeof(fd_set));

					continue;
				}
				
				Sleep(10);

				nActiveSocketNum--;
				if( nActiveSocketNum <= 0 ) break;
			} //end if		
		} //end for
		//////////////////////////

		if( !m_bThreadRunning ) break;
	}

	if( NULL != pConnectionArray )
	{
		delete pConnectionArray;
		pConnectionArray = NULL;
	}
}

void CTcpIpConnection::ProcessRecvMessage(SOCKET nSocket, void *pRecvMsg)
{
    TSingleMessage* ptMsg = g_tMessageMgr.ptPL_CheckRegHeaderofRecMsg(pRecvMsg);    
   
    if(NULL != ptMsg )
    {
		//消息处理在此
		//........
        //g_tMessageMgr.bPL_deleteMsg(pRecvMsg);        
    }
	else
	{
		g_tMessageMgr.bPL_deleteMsg(pRecvMsg);
    }
}

void CTcpIpConnection::ProcessSendMessage()
{
	CMessageQueue *pSendMsgQueue = GetSendMsgQueue();
	HANDLE hSendMsgThread = pSendMsgQueue->GetLockQueueHandle();

	while( m_bThreadRunning )
	{
		DWORD dwRt = WaitForSingleObject( hSendMsgThread, INFINITE );

		if( !m_bThreadRunning ) break;

		if( 0 == dwRt - WAIT_OBJECT_0 )
        {
			//先 Copy 再处理
			pSendMsgQueue->BlockQueue();

			CMessageQueue tSendMsgQ;
			WORD dwMsgCount = pSendMsgQueue->GetMsgCount();
			for(WORD dwIndex = 0; dwIndex < dwMsgCount; dwIndex++ )
			{
				TSingleMessage *pMessage = pSendMsgQueue->GetMessage(dwIndex);
				tSendMsgQ.AddMessage((TSingleMessage *)pMessage);
			}
			pSendMsgQueue->ClearAllMessages();

			pSendMsgQueue->UnBlockQueue();

			//处理发送消息
			dwMsgCount = tSendMsgQ.GetMsgCount();
			for(dwIndex = 0; dwIndex < dwMsgCount; dwIndex++ )
			{
				TSingleMessage *pMessage = tSendMsgQ.GetMessage(dwIndex);

				BOOL bResult = SendMessage(pMessage, TRUE);
			}
			tSendMsgQ.ClearAllMessages();
		}
        else
		{
			break; // Terminate this thread by existing the proc.  
		}

		Sleep(50);
	}
}