www.gusucode.com > VC++三子棋游戏源码(类似五子棋)-源码程序 > VC++三子棋游戏源码(类似五子棋)-源码程序\code\Server\Tcpipcon.cpp
#include "DataType.h" #include "Global.h" #include "tcpipcon.h" //////////////////////////////////////////////////////////////////// // Download by http://www.NewXing.com UINT CTcpIpConnection::ListenClientsThread(LPVOID lpParam) { printf("Enter ListenClientsThread(..)...\n"); CTcpIpConnection *pConnection = (CTcpIpConnection *)lpParam; pConnection->m_nThreadCount++; pConnection->ProcessListenClients(); printf("Leave ListenClientsThread(..)...\n"); return 0; } UINT CTcpIpConnection::ReceiveMessageThread(LPVOID lpParam) { printf("Enter ReceiveMessageThread(..)...\n"); CTcpIpConnection *pConnection = (CTcpIpConnection *)lpParam; pConnection->m_nThreadCount++; pConnection->ProcessRecvMessage(); printf("Leave ReceiveMessageThread(..)...\n"); return 0; } UINT CTcpIpConnection::SendMessageThread(LPVOID lpParam) { printf("Enter SendMessageThread(..)...\n"); CTcpIpConnection *pConnection = (CTcpIpConnection *)lpParam; pConnection->m_nThreadCount++; pConnection->ProcessSendMessage(); printf("Leave SendMessageThread(..)...\n"); return 0; } //////////////////////////////////////////////////////////////////// CTcpIpConnection::CTcpIpConnection() { //// m_bListening = FALSE; m_hListenThreadHandle = NULL; m_bThreadRunning = FALSE; m_hRecvMsgThreadHandle = NULL; m_hSendMsgThreadHandle = NULL; //// //// m_tTCPIPInfo.tConInfo.byDeviceNo = 0; m_tTCPIPInfo.tConInfo.nSocket = INVALID_SOCKET; m_tTCPIPInfo.pNext = NULL; FD_ZERO(&m_tRecvFDSetBuf); FD_ZERO(&m_tListenFDSetBuf); //// } CTcpIpConnection::~CTcpIpConnection() { } void CTcpIpConnection::AddTCPIPInfo(SOCKET nSocket, BYTE byDevNo) { CMutexLock lock("LOCK_TCPIPINFO"); CTCPIPInfo *pTCPIPInfo = &m_tTCPIPInfo; while( NULL != pTCPIPInfo->pNext ) { pTCPIPInfo = pTCPIPInfo->pNext; } CTCPIPInfo *pNewTCPIPInfo = new CTCPIPInfo(); pNewTCPIPInfo->pNext = NULL; pNewTCPIPInfo->tConInfo.nSocket = nSocket; pNewTCPIPInfo->tConInfo.byDeviceNo = byDevNo; pTCPIPInfo->pNext = pNewTCPIPInfo; m_tTCPIPInfo.tConInfo.byDeviceNo++; FD_SET(nSocket, &m_tRecvFDSetBuf); } void CTcpIpConnection::RemoveTCPIPInfo(BYTE byDevNo /*= INVALID_ID_B*/) { SOCKET nSocket = GetConnection(byDevNo); RemoveTCPIPInfo(nSocket); } void CTcpIpConnection::RemoveTCPIPInfo(SOCKET nSocket /*= INVALID_SOCKET*/) { CMutexLock lock("LOCK_TCPIPINFO"); if( INVALID_SOCKET == nSocket ) { CTCPIPInfo *pTCPIPInfo = m_tTCPIPInfo.pNext; CTCPIPInfo *pNextTCPIPInfo = NULL; while( NULL != pTCPIPInfo ) { pNextTCPIPInfo = pTCPIPInfo->pNext; delete pTCPIPInfo; pTCPIPInfo = NULL; pTCPIPInfo = pNextTCPIPInfo; } m_tTCPIPInfo.pNext = NULL; m_tTCPIPInfo.tConInfo.byDeviceNo = 0; FD_ZERO(&m_tRecvFDSetBuf); } else { CTCPIPInfo *pPreTCPIPInfo = &m_tTCPIPInfo; CTCPIPInfo *pTCPIPInfo = pPreTCPIPInfo->pNext; CTCPIPInfo *pNextTCPIPInfo = NULL; while( NULL != pTCPIPInfo ) { if( nSocket == pTCPIPInfo->tConInfo.nSocket ) { pNextTCPIPInfo = pTCPIPInfo->pNext; delete pTCPIPInfo; pTCPIPInfo = NULL; pPreTCPIPInfo->pNext = pNextTCPIPInfo; pTCPIPInfo = pNextTCPIPInfo; m_tTCPIPInfo.tConInfo.byDeviceNo--; FD_CLR(nSocket, &m_tRecvFDSetBuf); break; } pTCPIPInfo = pTCPIPInfo->pNext; pPreTCPIPInfo = pPreTCPIPInfo->pNext; } } int nSocketCount = m_tTCPIPInfo.tConInfo.byDeviceNo; if( nSocketCount <= 0 ) { FD_ZERO(&m_tRecvFDSetBuf); } } void *CTcpIpConnection::FindTCPIPInfo(BYTE byDevNo) { CMutexLock lock("LOCK_TCPIPINFO"); if( INVALID_ID_B == byDevNo ) return NULL; CTCPIPInfo *pTCPIPInfo = m_tTCPIPInfo.pNext; while( NULL != pTCPIPInfo ) { if( byDevNo == pTCPIPInfo->tConInfo.byDeviceNo ) { return (void *)pTCPIPInfo; } pTCPIPInfo = pTCPIPInfo->pNext; } return NULL; } void *CTcpIpConnection::FindTCPIPInfo(SOCKET nSocket) { CMutexLock lock("LOCK_TCPIPINFO"); if( INVALID_SOCKET == nSocket ) return NULL; CTCPIPInfo *pTCPIPInfo = m_tTCPIPInfo.pNext; while( NULL != pTCPIPInfo ) { if( nSocket == pTCPIPInfo->tConInfo.nSocket ) { return (void *)pTCPIPInfo; } pTCPIPInfo = pTCPIPInfo->pNext; } return NULL; } SOCKET CTcpIpConnection::GetConnection(BYTE byDevNo) { //根据设备号获取 Socket SOCKET nSocket = INVALID_SOCKET; CTCPIPInfo *pTCPIPInfo = (CTCPIPInfo *)FindTCPIPInfo(byDevNo); if( NULL != pTCPIPInfo ) { nSocket = pTCPIPInfo->tConInfo.nSocket; } return nSocket; } BYTE CTcpIpConnection::GetDeviceNo(SOCKET nSocket) { //根据 Socket 获取设备号 BYTE byDeviceNo = INVALID_ID_B; CTCPIPInfo *pTCPIPInfo = (CTCPIPInfo *)FindTCPIPInfo(nSocket); if( NULL != pTCPIPInfo ) { byDeviceNo = pTCPIPInfo->tConInfo.byDeviceNo; } return byDeviceNo; } CMessageQueue *CTcpIpConnection::GetSendMsgQueue() { return &m_tSendMsgQueue; } void CTcpIpConnection::CloseConnection(SOCKET nSocket /*= INVALID_SOCKET*/) { CMutexLock lock("LOCK_TCPIPINFO"); if( INVALID_SOCKET == nSocket ) { CTCPIPInfo *pTCPIPInfo = m_tTCPIPInfo.pNext; while( NULL != pTCPIPInfo ) { SOCKET nConSocket = pTCPIPInfo->tConInfo.nSocket; if( INVALID_SOCKET != nConSocket ) { closesocket(nConSocket); } pTCPIPInfo->tConInfo.byDeviceNo = INVALID_ID_B; pTCPIPInfo->tConInfo.nSocket = INVALID_SOCKET; pTCPIPInfo = pTCPIPInfo->pNext; } } else { closesocket(nSocket); } } BOOL CTcpIpConnection::ConnectToServer(char *szServerName, int nPort, BYTE byDevNo) { try { ////通信端口初始化 WSADATA wSData; WORD wVersionReq = MAKEWORD(1,1); int nWSAResult = WSAStartup(wVersionReq, &wSData); if(nWSAResult != 0) return FALSE; /////////////////////// //创建Socket SOCKET nSocket = socket(AF_INET, SOCK_STREAM, 0); if( INVALID_SOCKET == nSocket ) return FALSE; //Socket地址 sockaddr_in sai; memset(&sai, 0, sizeof(sockaddr_in)); sai.sin_port = htons(nPort); sai.sin_family = AF_INET; sai.sin_addr.s_addr = inet_addr(szServerName); if(INADDR_NONE == sai.sin_addr.s_addr) { hostent *pHostent = gethostbyname(szServerName); if (pHostent == NULL) { CloseConnection(nSocket); return FALSE; } sai.sin_addr.s_addr = ((struct in_addr FAR *)(pHostent->h_addr))->s_addr; } //联结 int nResult = connect(nSocket, (sockaddr*)&sai, sizeof(sockaddr_in)); if (SOCKET_ERROR == nResult) { CloseConnection(nSocket); return FALSE; } else {} BindConnection(nSocket, byDevNo); //创建消息处理线程... int nThreadCount = 0; m_nThreadCount = nThreadCount; m_bThreadRunning = TRUE; if( NULL == m_hRecvMsgThreadHandle ) { m_hRecvMsgThreadHandle = CreateThread(NULL, 1024, (LPTHREAD_START_ROUTINE)ReceiveMessageThread, (LPVOID)this, 0, NULL); nThreadCount++; } if( NULL == m_hSendMsgThreadHandle ) { m_hSendMsgThreadHandle = CreateThread(NULL, 1024, (LPTHREAD_START_ROUTINE)SendMessageThread, (LPVOID)this, 0, NULL); nThreadCount++; } m_bThreadRunning = ( (NULL != m_hRecvMsgThreadHandle) && (NULL != m_hSendMsgThreadHandle) ); //等待线程的创建 int nMaxCount = 2000; int nTimeOutCount = 0; while(m_nThreadCount < nThreadCount ) { Sleep(50); nTimeOutCount++; if( nTimeOutCount > nMaxCount ) break; } //// return (INVALID_SOCKET != nSocket); } catch(...) { } return FALSE; } void CTcpIpConnection::BindConnection(SOCKET nSocket, BYTE byDevNo) { //把 Socket 和 设备号绑定起来并保存 DWORD dwOptVal = 1024; setsockopt(nSocket, IPPROTO_TCP, TCP_NODELAY, (const char*)&dwOptVal, sizeof(dwOptVal)); setsockopt(nSocket, IPPROTO_TCP, SO_KEEPALIVE, (const char*)&dwOptVal, sizeof(dwOptVal)); AddTCPIPInfo(nSocket, byDevNo); } BOOL CTcpIpConnection::DisconnectServer(BYTE byDevNo) { SOCKET nSocket = GetConnection(byDevNo); BOOL bResult = DisconnectServer(nSocket); return bResult; } BOOL CTcpIpConnection::DisconnectServer(SOCKET nSocket) { if( INVALID_SOCKET != nSocket ) { CloseConnection(nSocket); } RemoveTCPIPInfo(nSocket); return TRUE; } BOOL CTcpIpConnection::StartServer() { m_bListening = TRUE; if( NULL == m_hListenThreadHandle ) { m_hListenThreadHandle = CreateThread(NULL, 1024, (LPTHREAD_START_ROUTINE)ListenClientsThread, (LPVOID)this, 0, NULL); } return TRUE; } void CTcpIpConnection::StopServer() { CloseConnection(); m_bThreadRunning = FALSE; if( NULL != m_hSendMsgThreadHandle ) { HANDLE hQueueHandle = GetSendMsgQueue()->GetLockQueueHandle(); SetEvent(hQueueHandle); WaitForSingleObject(m_hSendMsgThreadHandle, 1000*30); CloseHandle(m_hSendMsgThreadHandle); m_hSendMsgThreadHandle = NULL; } if( NULL != m_hRecvMsgThreadHandle ) { WaitForSingleObject(m_hRecvMsgThreadHandle, 1000*30); CloseHandle(m_hRecvMsgThreadHandle); m_hRecvMsgThreadHandle = NULL; } m_bListening = FALSE; if( NULL != m_hListenThreadHandle ) { WaitForSingleObject(m_hListenThreadHandle, 1000*30); CloseHandle(m_hListenThreadHandle); m_hListenThreadHandle = NULL; } } void CTcpIpConnection::ProcessListenClients() { //Start the server firstly... int nPort = (int)PORTID; ////通信端口初始化 WSADATA wSData; WORD wVersionReq = MAKEWORD(1,1); int nWSAResult = WSAStartup(wVersionReq, &wSData); if(nWSAResult != 0) return; /////////////////////// SOCKET nListenSocket = socket(PF_INET,SOCK_STREAM,0); if(nListenSocket==INVALID_SOCKET) { printf("socket()\n"); WSACleanup(); return; } SOCKADDR_IN sin; sin.sin_family=PF_INET; sin.sin_port=htons(nPort); sin.sin_addr.s_addr=INADDR_ANY; int iTmp=sizeof(sin); if(bind(nListenSocket,(LPSOCKADDR)&sin,iTmp)==SOCKET_ERROR) { printf("bind()\n"); closesocket(nListenSocket); WSACleanup(); return; } if(listen(nListenSocket, 5)==SOCKET_ERROR) { printf("listen()\n"); closesocket(nListenSocket); WSACleanup(); return; } char szHostName[128]; gethostname(szHostName, sizeof(szHostName)); HOSTENT *pHostent = gethostbyname(szHostName); if( NULL != pHostent ) { struct in_addr *pIPAddress = (struct in_addr *) pHostent->h_addr_list[0]; printf("\nThe server is ready: %d.%d.%d.%d[%s], Port = %d...\n\n", pIPAddress->S_un.S_un_b.s_b1, pIPAddress->S_un.S_un_b.s_b2, pIPAddress->S_un.S_un_b.s_b3, pIPAddress->S_un.S_un_b.s_b4, pHostent->h_name, nPort); } else { printf("\nThe server is ready! \nServer Name = %s, Port = %d...\n\n", szHostName, nPort); } //wait for a new connection SOCKADDR_IN tSockPeer; int nPeerLen = sizeof( tSockPeer ); while(m_bListening) { SOCKET sNewClient = accept(nListenSocket, (LPSOCKADDR)&tSockPeer, &nPeerLen); if(!m_bListening) break; if ( sNewClient <= 0 ) { printf("accept error! errno!\n"); continue; } //insert the new_fd to a list... { } //send accept notification to the client... } } void* CTcpIpConnection::ReceiveMessage(SOCKET nSocket, int &nResult) { nResult = SOCKET_OK; //总长度 = 消息大头的长度 + 消息小头的长度 + 数据域的长度 WORD wHeaderLen = g_tMessageMgr.wPL_GetMainHerderLen(); WORD wRecvMsgLen = wHeaderLen; //首先接收消息大头 CHAR *pcRecvBuf = new CHAR[wRecvMsgLen]; memset(pcRecvBuf, 0, wRecvMsgLen); void *pRecvMsg = ReceiveMessage(nSocket, pcRecvBuf, nResult, wRecvMsgLen); if (nResult != 0 || wRecvMsgLen != wHeaderLen || !g_tMessageMgr.bPL_IsRcvdMainHeaderCorrect(pcRecvBuf) ) { if ( NULL != pRecvMsg || NULL != pcRecvBuf ) { delete [] pcRecvBuf; pcRecvBuf = NULL; } return NULL; } //从消息大头中获得消息小头和数据域的总长度 WORD wMsgSize = g_tMessageMgr.wPL_GetMessageSizeFromHeader((CHAR*)pRecvMsg); WORD wRecvMsgSize = wMsgSize; WORD wAllLen = wHeaderLen + wMsgSize; CHAR * pcAllMsg = new CHAR[wAllLen]; memset(pcAllMsg, 0, wAllLen); CHAR * pcRecvMsgBuf = pcAllMsg + wHeaderLen; //根据消息小头和数据域的总长度接收消息小头数据和数据域的数据 if( wMsgSize <= MESSAGE_LEN ) //一个小包 { pRecvMsg = ReceiveMessage(nSocket, pcRecvMsgBuf, nResult, wRecvMsgSize); if ( nResult != SOCKET_OK || wRecvMsgSize != wMsgSize ) { if (pRecvMsg != NULL) { delete [] pcRecvBuf; delete [] pcAllMsg; } return NULL; } } else //大包拆分成多个小包 { WORD wCount = wMsgSize/MESSAGE_LEN; WORD wLeftMsgSize = wMsgSize - (MESSAGE_LEN * wCount); //接收多个小包 for(WORD wIndex = 0; wIndex < wCount; wIndex++) { wMsgSize = MESSAGE_LEN; wRecvMsgSize = MESSAGE_LEN; pRecvMsg = ReceiveMessage(nSocket, pcRecvMsgBuf, nResult, wRecvMsgSize); if ( nResult != SOCKET_OK || wRecvMsgSize != wMsgSize ) { if (pRecvMsg != NULL) { delete [] pcRecvBuf; delete [] pcAllMsg; } return NULL; } pcRecvMsgBuf += wRecvMsgSize; } //接收最后的一个数据包(长度可能小于一个小包) if( wLeftMsgSize > 0 ) { wMsgSize = wLeftMsgSize; wRecvMsgSize = wLeftMsgSize; pRecvMsg = ReceiveMessage(nSocket, pcRecvMsgBuf, nResult, wRecvMsgSize); if ( nResult != SOCKET_OK || wRecvMsgSize != wMsgSize ) { if (pRecvMsg != NULL) { delete [] pcRecvBuf; delete [] pcAllMsg; } return NULL; } } } memcpy( pcAllMsg, pcRecvBuf, wHeaderLen ); delete [] pcRecvBuf; pcRecvBuf = NULL; return pcAllMsg; } void* CTcpIpConnection::ReceiveMessage(SOCKET nSocket, CHAR *pcBuf, int &nResult, WORD &wLength) { nResult = SOCKET_OK; int nMaxCount = 200; int nTimeOutCount = 0; int nRecvMsgLen = 0; char *pcRecvBuf = pcBuf; WORD wRecvedLen = 0; do { //从网络中接收信息 nRecvMsgLen = recv(nSocket, pcRecvBuf, wLength-wRecvedLen, 0); if( nRecvMsgLen == SOCKET_ERROR || nRecvMsgLen == 0 ) { int nErrorCode = WSAGetLastError(); if( 0 == nErrorCode ) //接收错误了 { nErrorCode = SOCKET_ERROR; } nResult = nErrorCode; return NULL; } wRecvedLen += nRecvMsgLen; pcRecvBuf += nRecvMsgLen; if(wRecvedLen < wLength) //没有接收完毕,继续接收... { Sleep(50); nTimeOutCount++; if(nTimeOutCount > nMaxCount) //超时了,跳出... { break; } continue; //继续接收... } }while( (wRecvedLen < wLength) ); wLength = wRecvedLen; return pcBuf; } BOOL CTcpIpConnection::SendMessage(DWORD dwSendMsgID, void *pSendMsg, BOOL bSendAtOnce /*= FALSE*/) { BOOL bResult = FALSE; // TMainHeader *ptMainHead = &(((TSingleMessage*)pSendMsg)->tmainHeader); TSubHeader *ptSubHead = &(((TSingleMessage*)pSendMsg)->tsubHeader); void *pvMsgData = (void *)(&(((TSingleMessage*)pSendMsg)->pvMsgData)); DWORD dwDataLen = ntohl( ptSubHead->dwDatalen ); DWORD dwMsgMainSubHerderLen = g_tMessageMgr.wPL_GetMainSubHerderLen(); DWORD dwLength = dwMsgMainSubHerderLen + dwDataLen; // if( dwDataLen <= MESSAGE_LEN ) //一个小包 { bResult = SendMessage(pSendMsg, bSendAtOnce); } else //大包拆分成多个小包 { DWORD dwPacketStLen = g_tMessageMgr.wPL_GetPacketStLen(); WORD wPacketLen = (WORD)PACKET_LEN; WORD wCount = dwDataLen/wPacketLen; WORD wLastPacketLen = dwDataLen - (wPacketLen * wCount); WORD wPacketCount = wCount; if( wLastPacketLen > 0 ) { wPacketCount += 1; } DWORD dwSinglePacketLen = dwMsgMainSubHerderLen + dwPacketStLen + wPacketLen; DWORD dwCopyDataLen = 0; for(WORD wIndex = 0; wIndex < wCount; wIndex++) { CHAR *chSinglePacket = new CHAR[dwSinglePacketLen]; memset(chSinglePacket, 0, dwSinglePacketLen); //消息大、小头 TMainHeader *ptPacketMainHead = &(((TSingleMessage*)chSinglePacket)->tmainHeader); g_tMessageMgr.BuildMainSubHeaderofMsg(ptPacketMainHead, ptMainHead, g_tMessageMgr.MAIN_HEADER); WORD wTmpDataLen = dwSinglePacketLen - g_tMessageMgr.wPL_GetMainHerderLen(); ptPacketMainHead->wDatalen = htons(wTmpDataLen); TSubHeader* ptPacketSubHead = &(((TSingleMessage*)chSinglePacket)->tsubHeader); g_tMessageMgr.BuildMainSubHeaderofMsg(ptPacketSubHead, ptSubHead, g_tMessageMgr.SUB_HEADER); //小包数据结构 CHAR *chPacketStruct = chSinglePacket + dwMsgMainSubHerderLen; TPacketStruct *ptPacketStruct = (TPacketStruct *)chPacketStruct; ptPacketStruct->dwPacketID = htonl(dwSendMsgID); ptPacketStruct->wPacketCount = htons(wPacketCount); ptPacketStruct->wPacketNo = htons((wIndex + 1)); //当前包序号(从 1 开始) ptPacketStruct->dwDataLen = htonl(dwLength); ptPacketStruct->wPacketLen = htons(wPacketLen); ptPacketStruct->wReserve = htons(0); DWORD dwPacketHeaderLen = dwMsgMainSubHerderLen + dwPacketStLen; memcpy((void *)(chSinglePacket + dwPacketHeaderLen), (void *)((CHAR *)pvMsgData + dwCopyDataLen), wPacketLen); dwCopyDataLen += wPacketLen; TSingleMessage *ptSendMessage = (TSingleMessage *)chSinglePacket; bResult = SendMessage((void *)ptSendMessage, bSendAtOnce); } if( wLastPacketLen > 0 ) //最后一个数据包(长度可能小于一个小包) { dwSinglePacketLen = dwMsgMainSubHerderLen + dwPacketStLen + wLastPacketLen; CHAR *chSinglePacket = new CHAR[dwSinglePacketLen]; memset(chSinglePacket, 0, dwSinglePacketLen); //消息大、小头 TMainHeader *ptPacketMainHead = &(((TSingleMessage*)chSinglePacket)->tmainHeader); g_tMessageMgr.BuildMainSubHeaderofMsg(ptPacketMainHead, ptMainHead, g_tMessageMgr.MAIN_HEADER); WORD wTmpDataLen = dwSinglePacketLen - g_tMessageMgr.wPL_GetMainHerderLen(); ptPacketMainHead->wDatalen = htons(wTmpDataLen); TSubHeader* ptPacketSubHead = &(((TSingleMessage*)chSinglePacket)->tsubHeader); g_tMessageMgr.BuildMainSubHeaderofMsg(ptPacketSubHead, ptSubHead, g_tMessageMgr.SUB_HEADER); //小包数据结构 CHAR *chPacketStruct = chSinglePacket + dwMsgMainSubHerderLen; TPacketStruct *ptPacketStruct = (TPacketStruct *)chPacketStruct; ptPacketStruct->dwPacketID = htonl(dwSendMsgID); ptPacketStruct->wPacketCount = htons(wPacketCount); ptPacketStruct->wPacketNo = htons((wIndex + 1)); //当前包序号(从 1 开始) ptPacketStruct->dwDataLen = htonl(dwLength); ptPacketStruct->wPacketLen = htons(wLastPacketLen); ptPacketStruct->wReserve = htons(0); DWORD dwPacketHeaderLen = dwMsgMainSubHerderLen + dwPacketStLen; memcpy((void *)(chSinglePacket + dwPacketHeaderLen), (void *)((CHAR *)pvMsgData + dwCopyDataLen), wLastPacketLen); dwCopyDataLen += wLastPacketLen; TSingleMessage *ptSendMessage = (TSingleMessage *)chSinglePacket; bResult = SendMessage((void *)ptSendMessage, bSendAtOnce); } g_tMessageMgr.bPL_deleteMsg(pSendMsg); } return bResult; } BOOL CTcpIpConnection::SendMessage(void* pSendMsg, BOOL bSendAtOnce /*= FALSE*/) { if( NULL == pSendMsg ) return FALSE; if( bSendAtOnce ) //立即发送 { BOOL bResult = FALSE; BYTE byDevNo = (BYTE)((TSingleMessage*)pSendMsg)->tmainHeader.byDstAddr; SOCKET nSocket = GetConnection(byDevNo); if( INVALID_SOCKET != nSocket ) { bResult = SendMessage(nSocket, pSendMsg); } return bResult; } else //先放到队列中再发送 { CMessageQueue *pSendMsgQueue = GetSendMsgQueue(); WORD wCount = pSendMsgQueue->AddMessage((TSingleMessage *)pSendMsg); } return TRUE; } BOOL CTcpIpConnection::SendMessage(SOCKET nSocket, void* pSendMsg) { // TMainHeader *ptMainHead = &(((TSingleMessage*)pSendMsg)->tmainHeader); TSubHeader *ptSubHead = &(((TSingleMessage*)pSendMsg)->tsubHeader); void *pvMsgData = (void *)(&(((TSingleMessage*)pSendMsg)->pvMsgData)); DWORD dwMsgMainSubHerderLen = g_tMessageMgr.wPL_GetMainSubHerderLen(); DWORD dwDataLen = ntohl( ptSubHead->dwDatalen ); DWORD dwLength = dwMsgMainSubHerderLen + dwDataLen; if( dwDataLen <= MESSAGE_LEN ) //一个小包 { //数据长度不变: 大小头长度 + 数据域长度 } else //大包拆分成多个小包 { DWORD dwPacketStLen = g_tMessageMgr.wPL_GetPacketStLen(); //发送的数据长度为: 大小头长度 + 小包结构长度 + 数据域长度 TPacketStruct *ptPacket = (TPacketStruct *)(pvMsgData); WORD wPacketLen = ntohs(ptPacket->wPacketLen); dwLength = dwMsgMainSubHerderLen + dwPacketStLen + wPacketLen; } /// int nMaxCount = 200; int nTimeOutCount = 0; try{ int nSendingLen = 0; char* pcSendBuf = (char*)pSendMsg; int nSentLen = 0; do { //往网络上发送信息 nSendingLen = send(nSocket, pcSendBuf, dwLength-nSentLen, 0); if (nSendingLen == SOCKET_ERROR) { return FALSE; } nSentLen += nSendingLen; pcSendBuf += nSendingLen; if( nSentLen < dwLength ) //没有发送完毕,继续发送... { Sleep(50); nTimeOutCount++; if( nTimeOutCount > nMaxCount) //超时了,跳出... { break; } continue; //继续发送... } }while( (nSentLen < dwLength) ); g_tMessageMgr.bPL_deleteMsg(pSendMsg); if (nSentLen != dwLength) return FALSE; } catch(...) { return FALSE; } return TRUE; } void CTcpIpConnection::ProcessRecvMessage() { fd_set tReadFD; FD_ZERO(&tReadFD); int nFDs = 1025; int nSelectSocketNum = -1; int nActiveSocketNum = 0; struct timeval tTimeOut; tTimeOut.tv_sec = 0; tTimeOut.tv_usec = 100 * 1000; int nResult = SOCKET_OK; WORD wLength = 0; void *pRecvMsg = NULL; int nConnectionCount = 0; TConnectionInfo *pConnectionArray = NULL; while( m_bThreadRunning ) { Sleep(50); //获取 Socket 句柄,保存在 pSocketArray 数组中 { //注意: 增加额外的 { } 是为了使 CMutexLock 能自动解锁, // 否则要使用 UnLock() 解锁 CMutexLock lock("LOCK_TCPIPINFO"); if( NULL != pConnectionArray ) { delete pConnectionArray; pConnectionArray = NULL; } FD_ZERO(&tReadFD); memcpy(&tReadFD, &m_tRecvFDSetBuf, sizeof(fd_set)); nSelectSocketNum = select(nFDs, &tReadFD, NULL, NULL, &tTimeOut); nActiveSocketNum = nSelectSocketNum; nConnectionCount = m_tTCPIPInfo.tConInfo.byDeviceNo; if( (nSelectSocketNum <= 0) || (nConnectionCount <= 0) ) { continue; } //先把 Socket 句柄 Copy 到数组中 pConnectionArray = new TConnectionInfo[nConnectionCount]; int nIndex = 0; CTCPIPInfo *pTCPIPInfo = m_tTCPIPInfo.pNext; while( NULL != pTCPIPInfo ) { pConnectionArray[nIndex].byDeviceNo = pTCPIPInfo->tConInfo.byDeviceNo; pConnectionArray[nIndex].nSocket = pTCPIPInfo->tConInfo.nSocket; nIndex++; pTCPIPInfo = pTCPIPInfo->pNext; } //// } //////////////////// //接收消息和处理消息 for(int nIndex = 0; nIndex < nConnectionCount; nIndex++) { TConnectionInfo *tConInfo = &pConnectionArray[nIndex]; SOCKET nSocket = tConInfo->nSocket; BYTE byDecNo = tConInfo->byDeviceNo; nResult = SOCKET_OK; wLength = 0; pRecvMsg = NULL; if( INVALID_SOCKET == nSocket ) continue; //接收一条消息 if (FD_ISSET(nSocket, &tReadFD)) { pRecvMsg = ReceiveMessage(nSocket, nResult); if( !m_bThreadRunning ) break; if( nResult == SOCKET_OK ) { if( NULL != pRecvMsg ) { //处理接收到的一条消息 ProcessRecvMessage(nSocket, pRecvMsg); //// } } else { DisconnectServer(nSocket); //gMCUMgr.bDev_DisConnection(byDecNo); FD_ZERO(&tReadFD); memcpy(&tReadFD, &m_tRecvFDSetBuf, sizeof(fd_set)); continue; } Sleep(10); nActiveSocketNum--; if( nActiveSocketNum <= 0 ) break; } //end if } //end for ////////////////////////// if( !m_bThreadRunning ) break; } if( NULL != pConnectionArray ) { delete pConnectionArray; pConnectionArray = NULL; } } void CTcpIpConnection::ProcessRecvMessage(SOCKET nSocket, void *pRecvMsg) { TSingleMessage* ptMsg = g_tMessageMgr.ptPL_CheckRegHeaderofRecMsg(pRecvMsg); if(NULL != ptMsg ) { //消息处理在此 //........ //g_tMessageMgr.bPL_deleteMsg(pRecvMsg); } else { g_tMessageMgr.bPL_deleteMsg(pRecvMsg); } } void CTcpIpConnection::ProcessSendMessage() { CMessageQueue *pSendMsgQueue = GetSendMsgQueue(); HANDLE hSendMsgThread = pSendMsgQueue->GetLockQueueHandle(); while( m_bThreadRunning ) { DWORD dwRt = WaitForSingleObject( hSendMsgThread, INFINITE ); if( !m_bThreadRunning ) break; if( 0 == dwRt - WAIT_OBJECT_0 ) { //先 Copy 再处理 pSendMsgQueue->BlockQueue(); CMessageQueue tSendMsgQ; WORD dwMsgCount = pSendMsgQueue->GetMsgCount(); for(WORD dwIndex = 0; dwIndex < dwMsgCount; dwIndex++ ) { TSingleMessage *pMessage = pSendMsgQueue->GetMessage(dwIndex); tSendMsgQ.AddMessage((TSingleMessage *)pMessage); } pSendMsgQueue->ClearAllMessages(); pSendMsgQueue->UnBlockQueue(); //处理发送消息 dwMsgCount = tSendMsgQ.GetMsgCount(); for(dwIndex = 0; dwIndex < dwMsgCount; dwIndex++ ) { TSingleMessage *pMessage = tSendMsgQ.GetMessage(dwIndex); BOOL bResult = SendMessage(pMessage, TRUE); } tSendMsgQ.ClearAllMessages(); } else { break; // Terminate this thread by existing the proc. } Sleep(50); } }