www.gusucode.com > 嵌入式linux系统的网络编程源码程序 > 嵌入式linux系统的网络编程源码程序/视频会议源码/network_video_recv_thread.cpp
/////////////////////////////////////////////////////// // FileName: network_video_recv_thread.cpp // Author: b1gm0use // Project: myvideo #include <iostream> #include <sys/types.h> #include <sys/socket.h> #include <unistd.h> #include <netinet/in.h> #include <arpa/inet.h> #include <qapplication.h> #include <poll.h> #include "network_video_recv_thread.h" #include "video_cap.h" #include "capture_event.h" #include "video_send.h" #include "network_video_send.h" #include "network_ctrl.h" #include "network.h" #include "common.h" #include "video.h" #include "avi.h" #include "signal_def.h" using namespace std; /////////////////////////////////////////////////////// // Public Functions /////////////////////////////////////////////////////// // 构造函数 // 传入参数为 // nc_in 上层network_ctrl对象指针 // stackSize QThread所用的参数 network_video_recv_thread::network_video_recv_thread ( avi * avi_ptr_in, network_ctrl * nc_in, unsigned int stackSize ) :QThread( stackSize ) // {{{ { verbose_output( 2, "create network_video_recv_thread." ); nc = nc_in; fd_video = 0; video_cap_buff[0] = video_cap_buff[1] = video_cap_buff[2] = NULL; video_frame = 0; recv_buff = new BUFF [ PACKET_LENGTH ]; send_buff = new BUFF [ PACKET_LENGTH ]; merge_buff = NULL; max_size = 0; addr_accept = new sockaddr_in; acclen = new socklen_t; packet_num = 0; expect_num = 0; sub_expect_num = 0; avi_ptr = avi_ptr_in; } // }}} // 析构函数 network_video_recv_thread::~network_video_recv_thread ( void ) // {{{ { if ( 0 != fd_video ) { close( fd_video ); } delete [] video_cap_buff[0]; delete [] video_cap_buff[1]; delete [] video_cap_buff[2]; delete [] recv_buff; delete [] send_buff; delete [] merge_buff; delete addr_accept; delete acclen; } // }}} // 运行部分,线程代码在这里 void network_video_recv_thread::run ( void ) // {{{ { verbose_output( 1, "network video recv thread running..." ); // 为缓冲区开辟内存 max_size = avi_ptr->video_opt.min_width * avi_ptr->video_opt.factor * avi_ptr->video_opt.min_height * avi_ptr->video_opt.factor * 3 + VIDEO_BUFF_HEAD_SIZE; merge_buff = new BUFF [ max_size + DATA_OFFSET ]; video_cap_buff[0] = new BUFF [ max_size ]; video_cap_buff[1] = new BUFF [ max_size ]; video_cap_buff[2] = new BUFF [ max_size ]; if ( avi_ptr->use_multicast ) { connect_init_mc(); connect_handle_mc(); } else { connect_init(); connect_handle(); } return; } // }}} /////////////////////////////////////////////////////// // Private Functions /////////////////////////////////////////////////////// // 连接初始化,创建套接字 void network_video_recv_thread::connect_init ( void ) // {{{ { verbose_output( 3, "network_video_recv_thread connect_init." ); ///////////////////////////////////////// // 创建视频端口 ///////////////////////////////////////// *acclen = sizeof( *addr_accept ); memset( addr_accept, 0, *acclen ); // 网络连接的参数设置 addr_accept->sin_family = AF_INET; addr_accept->sin_port = htons( DEFAULT_VIDEO_PORT ); inet_pton( AF_INET, static_cast< const char* > ( avi_ptr->ip ), &addr_accept->sin_addr ); verbose_output( 3, "creating video connect socket..." ); fd_video = socket( AF_INET, SOCK_DGRAM, 0 ); // 与服务端建立连接 BUFF * temp; bool read_ok = false; packet_head_t * packet_head = NULL; do { verbose_output( 3, "send video Hello message ..." ); send_data( fd_video, addr_accept, send_buff, NULL, 0, packet_num, SYN ); recv_line( fd_video, addr_accept, recv_buff, temp, packet_head ); if ( !TEST_CTRL_SYN_BIT( packet_head->opt_bits ) ) { cerr << "Warning! not the video Welcome reply" << endl; continue; } read_ok = true; } while( !read_ok ); } // }}} // 处理连接后的数据接收工作 void network_video_recv_thread::connect_handle ( void ) // {{{ { int size = max_size; // 读出的一帧数据的总大小 int count = 0; int num = 0; // 读出的数据包的序号 int seq_diff = 0; int sub_seq_diff = 0; // 此帧是否有破损 bool broken = false; // 这一帧第一个数据包的序号 int first_packet_of_frame = 0; // 接收从网络读出的数据,指向recv_buff中的数据,不必开辟内存 BUFF * temp = NULL; bool should_exit = false; int lost_data_time = 0; pollfd connect_to = { fd_video, POLLIN, 0 }; int result = 0; packet_head_t * packet_head = NULL; while( 1 ) // {{{ { count = 0; num = 0; broken = false; first_packet_of_frame = expect_num; sub_expect_num = 0; // 临时放置读入的数据 verbose_output( 3, "reading from network..." ); // 从网络上读,并组合成为一帧 while ( 1 ) // {{{ { // 判断是否应该退出 (*(nc->term_sub_thread_sema))++; should_exit = nc->term_sub_thread; (*(nc->term_sub_thread_sema))--; if ( should_exit ) { return; } result = poll( &connect_to, 1, DEFAULT_TIMEOUT ); if ( result < 0 ) { perror( "Error in video listening" ); } // 超时 if ( result == 0 ) // {{{ { lost_data_time++; if ( lost_data_time >= MAX_LOST_HEARTBEAT ) { QCustomEvent * event = new QCustomEvent( SIG_LOST_SERVER ); QApplication::postEvent( avi_ptr, event ); return; } } else { // 有读入数据 num = recv_line( fd_video, addr_accept, recv_buff, temp, packet_head ); lost_data_time = 0; // 当前要接收的包是本帧的第一个数据包 if ( count == 0 ) { if ( TEST_VIDEO_BGN_BIT( packet_head->opt_bits ) ) { // 是第一个数据包,继续接收 } else { // 中间有数据数据包丢失,则接收下一个 cerr << "Warning! Not the first packet." << endl; continue; } } seq_diff = packet_head->sequence_num - expect_num ; sub_seq_diff = packet_head->sub_sequence_num - sub_expect_num; // 检查是否有丢包或数据包的顺序是否有问题 if ( seq_diff == 0 ) // {{{ { // OK! right! nothing to do! } else { if ( seq_diff < 0 ) { // 旧的数据包 cerr << "Warning! old packet received" << endl; continue; } else { // seq_diff > 0, 数据包丢失 if ( count + (seq_diff + 1) * PACKET_DATA_LENGTH > max_size ) { // 已经不在当前包内 broken = true; cerr << "Warning! lost the end packet of last frame" << endl; expect_num = packet_head->sequence_num + 1; break; } else { // 仍在当前帧内 broken = true; cerr << "Warning! video frame lost some packet" << endl; count += seq_diff * PACKET_DATA_LENGTH; } } // if ( seq_diff < 0 ) } // }}} expect_num = packet_head->sequence_num + 1; sub_expect_num = packet_head->sub_sequence_num + 1; // 未发现错误,组合为一个整帧 if ( !broken ) { memcpy( merge_buff + count, temp, PACKET_DATA_LENGTH ); } count += num - PACKET_HEAD_LENGTH; // 如果有一个数据包的开始标明为结束包,则已经到了这一帧的结束 if ( TEST_VIDEO_END_BIT( packet_head->opt_bits ) ) { verbose_output( 4, "End of Video frame" ); break; } if ( count > max_size + DATA_OFFSET ) { cerr << "Error! You shouldn't be here" << endl; break; } } // if ( result == 0 ) }}} } // while ( 1 ) }}} verbose_output( 3, "Reading from network OK!" ); if ( broken ) { cerr << "Warning! Broken Video frame" << endl; continue; } size = count - DATA_OFFSET; if ( memcmp( merge_buff, "VBGN", 4 ) == 0 ) { // Video Capture Part verbose_output( 3, "video frame." ); int data_size = *( (int*) (merge_buff+4) ); verbose_output( 4, "total video frame size", data_size ); verbose_output( 4, "receive video frame size", size ); if ( data_size > max_size ) { cerr << "Warning! Invalid video frame size: " << data_size << endl; } else { if ( data_size > size ) { cerr << "Warning! Not a integral video frame!" << endl; size = data_size; } else { if ( data_size < size ) { size = data_size; } } } memcpy( video_cap_buff[video_frame], merge_buff+DATA_OFFSET, size ); //发送消息 capture_event * event = new capture_event( VIDEO_EVENT, video_cap_buff[video_frame], size ); verbose_output( 3, "sending new video change event." ); QApplication::postEvent( (QObject *) avi_ptr->video_player_ptr, (QEvent *) event ); video_frame = ( video_frame + 1 ) % 3; } else { cerr << "Warning! Unexpected video frame!" << endl; } // if "VBGN" } // }}} return; } // }}} // 组播初始化,创建套接字 void network_video_recv_thread::connect_init_mc ( void ) // {{{ { verbose_output( 3, "network video recv thread Multicast init." ); ///////////////////////////////////////// // 创建视频端口 ///////////////////////////////////////// *acclen = sizeof( *addr_accept ); memset( addr_accept, 0, *acclen ); // 创建控制端监听socket verbose_output( 3, "creating video multicast listen socket..." ); fd_video = socket( AF_INET, SOCK_DGRAM, 0 ); // 绑定指定端口 addr_accept->sin_family = AF_INET; inet_pton( AF_INET, static_cast< const char* >( avi_ptr->mc_addr ), &addr_accept->sin_addr ); addr_accept->sin_port = htons( DEFAULT_MC_VIDEO_PORT ); // 绑定端口 if ( bind( fd_video, (sockaddr*) addr_accept, *acclen ) == -1 ) { cerr << "Can't bind to the listen port [" << DEFAULT_MC_VIDEO_PORT << "]" << endl; ::exit( 1 ); } // 初始化组播 ip_mreq mreq; memcpy( &mreq.imr_multiaddr, &(((sockaddr_in*) addr_accept)->sin_addr), sizeof( in_addr ) ); mreq.imr_interface.s_addr = htonl( INADDR_ANY ); // 加入组播组 setsockopt( fd_video, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof( mreq ) ); return; } // }}} // 处理组播连接后的数据接收工作 void network_video_recv_thread::connect_handle_mc ( void ) // {{{ { int size = max_size; // 读出的一帧数据的总大小 int count = 0; int num = 0; // 读出的数据包的序号 int seq_diff = 0; int sub_seq_diff = 0; // 此帧是否有破损 bool broken = false; // 这一帧第一个数据包的序号 int first_packet_of_frame = 0; // 接收从网络读出的数据,指向recv_buff中的数据,不必开辟内存 BUFF * temp = NULL; bool should_exit = false; int lost_data_time = 0; int old_packet_num = 0; pollfd connect_to = { fd_video, POLLIN, 0 }; int result = 0; packet_head_t * packet_head = NULL; while( 1 ) // {{{ { count = 0; num = 0; broken = false; first_packet_of_frame = expect_num; sub_expect_num = 0; // 临时放置读入的数据 verbose_output( 3, "reading from network..." ); // 从网络上读,并组合成为一帧 while ( 1 ) // {{{ { // 判断是否要退出 (*(nc->term_sub_thread_sema))++; should_exit = nc->term_sub_thread; (*(nc->term_sub_thread_sema))--; if ( should_exit ) { return; } result = poll( &connect_to, 1, DEFAULT_TIMEOUT ); if ( result < 0 ) { perror( "Error in video listening" ); } // 超时 if ( result == 0 ) // {{{ { lost_data_time++; if ( lost_data_time >= MAX_LOST_HEARTBEAT ) { QCustomEvent * event = new QCustomEvent( SIG_LOST_SERVER ); QApplication::postEvent( avi_ptr, event ); return; } } else { // 有读入数据 num = recv_line( fd_video, addr_accept, recv_buff, temp, packet_head ); lost_data_time = 0; seq_diff = packet_head->sequence_num - expect_num; sub_seq_diff = packet_head->sub_sequence_num - sub_expect_num; // 检查是否有丢包或数据包的顺序是否有问题 if ( seq_diff == 0 ) // {{{ { // OK! right! nothing to do! } else { if ( seq_diff < 0 ) { // 旧的数据包 cerr << "Warning! old packet received" << endl; old_packet_num++; if ( old_packet_num >= 5 ) { old_packet_num = 0; expect_num = packet_head->sequence_num + 1; } //continue; } else { // seq_diff > 0, 数据包丢失 if ( sub_seq_diff < 0 ) { // 已经不在当前包内 broken = true; cerr << "Warning! lost the end packet of last frame" << endl; expect_num = packet_head->sequence_num + 1; break; } else { // 仍在当前帧内 broken = true; cerr << "Warning! video frame lost some packet" << endl; count += seq_diff * PACKET_DATA_LENGTH; } } // if ( seq_diff < 0 ) } // }}} expect_num = packet_head->sequence_num + 1; sub_expect_num = packet_head->sub_sequence_num + 1; // 未发现错误,组合为一个整帧 if ( !broken ) { memcpy( merge_buff + count, temp, PACKET_DATA_LENGTH ); } count += num - PACKET_HEAD_LENGTH; // 如果有一个数据包的开始标明为结束包,则已经到了这一帧的结束 if ( TEST_VIDEO_END_BIT( packet_head->opt_bits ) ) { verbose_output( 4, "End of Video frame" ); break; } if ( count > max_size + DATA_OFFSET ) { cerr << "Error! You shouldn't be here" << endl; break; } } // if ( result == 0 ) }}} } // while ( 1 ) }}} verbose_output( 3, "Reading from network OK!" ); if ( broken ) { cerr << "Warning! Broken video frame" << endl; continue; } size = count - DATA_OFFSET; if ( memcmp( merge_buff, "VBGN", 4 ) == 0 ) { // Video Capture Part verbose_output( 3, "video frame." ); int data_size = *( (int*) (merge_buff+4) ); verbose_output( 4, "total video frame size", data_size ); verbose_output( 4, "receive video frame size", size ); if ( data_size > max_size ) { cerr << "Warning! Invalid video frame size: " << data_size << endl; } else { if ( data_size > size ) { cerr << "Warning! Not a integral video frame!" << endl; size = data_size; } else { if ( data_size < size ) { size = data_size; } } } memcpy( video_cap_buff[video_frame], merge_buff+DATA_OFFSET, size ); //发送消息 capture_event * event = new capture_event( VIDEO_EVENT, video_cap_buff[video_frame], size ); verbose_output( 3, "sending new video change event." ); QApplication::postEvent( (QObject *) avi_ptr->video_player_ptr, (QEvent *) event ); video_frame = ( video_frame + 1 ) % 3; } else { cerr << "Warning! Unexpected video frame!" << endl; } // if "VBGN" } // }}} return; } // }}}