www.gusucode.com > 嵌入式linux系统的网络编程源码程序 > 嵌入式linux系统的网络编程源码程序/视频会议源码/network_video_recv_thread.cpp

    ///////////////////////////////////////////////////////
// FileName:	network_video_recv_thread.cpp
// Author:		b1gm0use
// Project:		myvideo

#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <qapplication.h>
#include <poll.h>

#include "network_video_recv_thread.h"
#include "video_cap.h"
#include "capture_event.h"
#include "video_send.h"
#include "network_video_send.h"
#include "network_ctrl.h"
#include "network.h"
#include "common.h"
#include "video.h"
#include "avi.h"
#include "signal_def.h"

using namespace std;

///////////////////////////////////////////////////////
// Public Functions
///////////////////////////////////////////////////////

// 构造函数
// 传入参数为
// nc_in 上层network_ctrl对象指针
// stackSize QThread所用的参数
network_video_recv_thread::network_video_recv_thread ( avi * avi_ptr_in, 
		network_ctrl * nc_in, unsigned int stackSize )
		:QThread( stackSize ) // {{{
{
	verbose_output( 2, "create network_video_recv_thread." );

	nc = nc_in;

	fd_video = 0;

	video_cap_buff[0] = video_cap_buff[1] = video_cap_buff[2] = NULL;

	video_frame = 0;

	recv_buff = new BUFF [ PACKET_LENGTH ];
	send_buff = new BUFF [ PACKET_LENGTH ];

	merge_buff = NULL;

	max_size = 0;

	addr_accept = new sockaddr_in;
	acclen = new socklen_t;

	packet_num = 0;
	expect_num = 0;
	sub_expect_num = 0;

	avi_ptr = avi_ptr_in;

} // }}}


// 析构函数
network_video_recv_thread::~network_video_recv_thread ( void ) // {{{
{

	if ( 0 != fd_video )
	{
		close( fd_video );
	}

	delete [] video_cap_buff[0];
	delete [] video_cap_buff[1];
	delete [] video_cap_buff[2];
	delete [] recv_buff;
	delete [] send_buff;
	delete [] merge_buff;
	delete addr_accept;
	delete acclen;

} // }}}


// 运行部分,线程代码在这里
void network_video_recv_thread::run ( void ) // {{{
{

	verbose_output( 1, "network video recv thread running..." );

	// 为缓冲区开辟内存
	max_size = avi_ptr->video_opt.min_width * avi_ptr->video_opt.factor * avi_ptr->video_opt.min_height * avi_ptr->video_opt.factor * 3 + VIDEO_BUFF_HEAD_SIZE;
	merge_buff = new BUFF [ max_size + DATA_OFFSET ];
	video_cap_buff[0] = new BUFF [ max_size ];
	video_cap_buff[1] = new BUFF [ max_size ];
	video_cap_buff[2] = new BUFF [ max_size ];

	if ( avi_ptr->use_multicast )
	{
		connect_init_mc();
		connect_handle_mc();
	}
	else
	{
		connect_init();
		connect_handle();
	}

	return;

} // }}}


///////////////////////////////////////////////////////
// Private Functions
///////////////////////////////////////////////////////

// 连接初始化,创建套接字
void network_video_recv_thread::connect_init ( void ) // {{{
{

	verbose_output( 3, "network_video_recv_thread connect_init." );

	/////////////////////////////////////////
	// 创建视频端口
	/////////////////////////////////////////
	
	*acclen = sizeof( *addr_accept );
	memset( addr_accept, 0, *acclen );

	// 网络连接的参数设置
	addr_accept->sin_family = AF_INET;
	addr_accept->sin_port = htons( DEFAULT_VIDEO_PORT );
	inet_pton( AF_INET, static_cast< const char* > ( avi_ptr->ip ), &addr_accept->sin_addr );

	verbose_output( 3, "creating video connect socket..." );
	fd_video = socket( AF_INET, SOCK_DGRAM, 0 );
	
	// 与服务端建立连接
	BUFF * temp;
	bool read_ok = false;
	packet_head_t * packet_head = NULL;

	do
	{
		verbose_output( 3, "send video Hello message ..." );

		send_data( fd_video, addr_accept, send_buff, NULL, 0, packet_num, SYN );

		recv_line( fd_video, addr_accept, recv_buff, temp, packet_head );

		if ( !TEST_CTRL_SYN_BIT( packet_head->opt_bits ) )
		{
			cerr << "Warning! not the video Welcome reply" << endl;
			continue;
		}

		read_ok = true;
	}
	while( !read_ok );
	
} // }}}


// 处理连接后的数据接收工作
void network_video_recv_thread::connect_handle ( void ) // {{{
{

	int size = max_size;
	// 读出的一帧数据的总大小
	int count = 0;
	int num = 0;
	// 读出的数据包的序号
	int seq_diff = 0;
	int sub_seq_diff = 0;
	// 此帧是否有破损
	bool broken = false;
	// 这一帧第一个数据包的序号
	int first_packet_of_frame = 0;
	// 接收从网络读出的数据,指向recv_buff中的数据,不必开辟内存
	BUFF * temp = NULL;
	bool should_exit = false;
	int lost_data_time = 0;
	
	pollfd connect_to = { fd_video, POLLIN, 0 };
	int result = 0;
	packet_head_t * packet_head = NULL;

	while( 1 ) // {{{
	{

		count = 0;
		num = 0;
		broken = false;
		first_packet_of_frame = expect_num;
		sub_expect_num = 0;

		// 临时放置读入的数据
		verbose_output( 3, "reading from network..." );

		// 从网络上读,并组合成为一帧
		while ( 1 ) // {{{
		{

			// 判断是否应该退出
			(*(nc->term_sub_thread_sema))++;
			should_exit = nc->term_sub_thread;
			(*(nc->term_sub_thread_sema))--;

			if ( should_exit )
			{
				return;
			}

			result = poll( &connect_to, 1, DEFAULT_TIMEOUT );
			
			if ( result < 0 )
			{
				perror( "Error in video listening" );
			}

			// 超时
			if ( result == 0 ) // {{{
			{
				lost_data_time++;
				if ( lost_data_time >= MAX_LOST_HEARTBEAT )
				{
					QCustomEvent * event = new QCustomEvent( SIG_LOST_SERVER );
					QApplication::postEvent( avi_ptr, event );
					return;
				}
			}
			else
			{
				// 有读入数据
				num = recv_line( fd_video, addr_accept, recv_buff, temp, packet_head );
				
				lost_data_time = 0;

				// 当前要接收的包是本帧的第一个数据包
				if ( count == 0 )
				{
					if ( TEST_VIDEO_BGN_BIT( packet_head->opt_bits ) )
					{
						// 是第一个数据包,继续接收
					}
					else
					{
						// 中间有数据数据包丢失,则接收下一个
						cerr << "Warning! Not the first packet." << endl;
						continue;
					}
				}

				seq_diff = packet_head->sequence_num - expect_num ;
				sub_seq_diff = packet_head->sub_sequence_num - sub_expect_num;

				// 检查是否有丢包或数据包的顺序是否有问题
				if ( seq_diff == 0 ) // {{{
				{
					// OK! right! nothing to do!
				}
				else
				{
					if ( seq_diff < 0 )
					{
						// 旧的数据包
						cerr << "Warning! old packet received" << endl;
						continue;
					}
					else
					{
						// seq_diff > 0, 数据包丢失 
						if ( count + (seq_diff + 1) * PACKET_DATA_LENGTH > max_size )
						{
							// 已经不在当前包内
							broken = true;
							cerr << "Warning! lost the end packet of last frame" << endl;
							expect_num = packet_head->sequence_num + 1;
							break;
						}
						else
						{
							// 仍在当前帧内
							broken = true;
							cerr << "Warning! video frame lost some packet" << endl;
							count += seq_diff * PACKET_DATA_LENGTH;

						}
					} // if ( seq_diff < 0 )
				} // }}}

				expect_num = packet_head->sequence_num + 1;
				sub_expect_num = packet_head->sub_sequence_num + 1;

				// 未发现错误,组合为一个整帧
				if ( !broken )
				{
					memcpy( merge_buff + count, temp, PACKET_DATA_LENGTH );
				}

				count += num - PACKET_HEAD_LENGTH;

				// 如果有一个数据包的开始标明为结束包,则已经到了这一帧的结束
				if ( TEST_VIDEO_END_BIT( packet_head->opt_bits ) )
				{
					verbose_output( 4, "End of Video frame" );
					break;
				}

				if ( count > max_size + DATA_OFFSET )
				{
					cerr << "Error! You shouldn't be here" << endl;
					break;
				}
				
			}  // if ( result == 0 ) }}}

		} // while ( 1 ) }}}

		verbose_output( 3, "Reading from network OK!" );

		if ( broken )
		{
			cerr << "Warning! Broken Video frame" << endl;
			continue;
		}

		size = count - DATA_OFFSET;

		if ( memcmp( merge_buff, "VBGN", 4 ) == 0 )
		{
			// Video Capture Part
			verbose_output( 3, "video frame." );

			int data_size = *( (int*) (merge_buff+4) );

			verbose_output( 4, "total video frame size", data_size );
			verbose_output( 4, "receive video frame size", size );

			if ( data_size > max_size )
			{
				cerr << "Warning! Invalid video frame size: " << data_size << endl;
			}
			else
			{
				if ( data_size > size )
				{
					cerr << "Warning! Not a integral video frame!" << endl;
					size = data_size;
				}
				else
				{
					if ( data_size < size )
					{
						size = data_size;
					}
				}
			}

			memcpy( video_cap_buff[video_frame], merge_buff+DATA_OFFSET, size );

			//发送消息
			capture_event * event = new capture_event( VIDEO_EVENT, 
					video_cap_buff[video_frame], size );

			verbose_output( 3, "sending new video change event." );

			QApplication::postEvent( (QObject *) avi_ptr->video_player_ptr, (QEvent *) event );

			video_frame = ( video_frame + 1 ) % 3;

		}
		else
		{
			cerr << "Warning! Unexpected video frame!" << endl;
		} // if "VBGN"

	} // }}}

	return;

} // }}}


// 组播初始化,创建套接字
void network_video_recv_thread::connect_init_mc ( void ) // {{{
{

	verbose_output( 3, "network video recv thread Multicast init." );

	/////////////////////////////////////////
	// 创建视频端口
	/////////////////////////////////////////
	
	*acclen = sizeof( *addr_accept );
	memset( addr_accept, 0, *acclen );

	// 创建控制端监听socket
	verbose_output( 3, "creating video multicast listen socket..." );
	fd_video = socket( AF_INET, SOCK_DGRAM, 0 );
		
	// 绑定指定端口
	addr_accept->sin_family = AF_INET;
	inet_pton( AF_INET, static_cast< const char* >( avi_ptr->mc_addr ), 
			&addr_accept->sin_addr );
	addr_accept->sin_port = htons( DEFAULT_MC_VIDEO_PORT );
	
	// 绑定端口
	if ( bind( fd_video, (sockaddr*) addr_accept, *acclen ) == -1 )
	{
		cerr << "Can't bind to the listen port [" << DEFAULT_MC_VIDEO_PORT << "]" << endl;
		::exit( 1 );
	}

	// 初始化组播
	ip_mreq mreq;

	memcpy( &mreq.imr_multiaddr, &(((sockaddr_in*) addr_accept)->sin_addr),
			sizeof( in_addr ) );
	mreq.imr_interface.s_addr = htonl( INADDR_ANY );

	// 加入组播组
	setsockopt( fd_video, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof( mreq ) );

	return;
	
} // }}}


// 处理组播连接后的数据接收工作
void network_video_recv_thread::connect_handle_mc ( void ) // {{{
{

	int size = max_size;
	// 读出的一帧数据的总大小
	int count = 0;
	int num = 0;
	// 读出的数据包的序号
	int seq_diff = 0;
	int sub_seq_diff = 0;
	// 此帧是否有破损
	bool broken = false;
	// 这一帧第一个数据包的序号
	int first_packet_of_frame = 0;
	// 接收从网络读出的数据,指向recv_buff中的数据,不必开辟内存
	BUFF * temp = NULL;
	bool should_exit = false;
	int lost_data_time = 0;
	int old_packet_num = 0;
	
	pollfd connect_to = { fd_video, POLLIN, 0 };
	int result = 0;
	packet_head_t * packet_head = NULL;

	while( 1 ) // {{{
	{

		count = 0;
		num = 0;
		broken = false;
		first_packet_of_frame = expect_num;
		sub_expect_num = 0;

		// 临时放置读入的数据
		verbose_output( 3, "reading from network..." );

		// 从网络上读,并组合成为一帧
		while ( 1 ) // {{{
		{
			// 判断是否要退出
			(*(nc->term_sub_thread_sema))++;
			should_exit = nc->term_sub_thread;
			(*(nc->term_sub_thread_sema))--;

			if ( should_exit )
			{
				return;
			}

			result = poll( &connect_to, 1, DEFAULT_TIMEOUT );
			
			if ( result < 0 )
			{
				perror( "Error in video listening" );
			}

			// 超时
			if ( result == 0 ) // {{{
			{
				lost_data_time++;
				if ( lost_data_time >= MAX_LOST_HEARTBEAT )
				{
					QCustomEvent * event = new QCustomEvent( SIG_LOST_SERVER );
					QApplication::postEvent( avi_ptr, event );
					return;
				}
			}
			else
			{
				// 有读入数据
				num = recv_line( fd_video, addr_accept, recv_buff, temp, packet_head );
				
				lost_data_time = 0;

				seq_diff = packet_head->sequence_num - expect_num;
				sub_seq_diff = packet_head->sub_sequence_num - sub_expect_num;

				// 检查是否有丢包或数据包的顺序是否有问题
				if ( seq_diff == 0 ) // {{{
				{
					// OK! right! nothing to do!
				}
				else
				{
					if ( seq_diff < 0 )
					{
						// 旧的数据包
						cerr << "Warning! old packet received" << endl;
						old_packet_num++;
						if ( old_packet_num >= 5 )
						{
							old_packet_num = 0;
							expect_num = packet_head->sequence_num + 1;
						}
						//continue;
					}
					else
					{
						// seq_diff > 0, 数据包丢失 
						if ( sub_seq_diff < 0 )
						{
							// 已经不在当前包内
							broken = true;
							cerr << "Warning! lost the end packet of last frame" << endl;
							expect_num = packet_head->sequence_num + 1;
							break;
						}
						else
						{
							// 仍在当前帧内
							broken = true;
							cerr << "Warning! video frame lost some packet" << endl;
							count += seq_diff * PACKET_DATA_LENGTH;

						}
					} // if ( seq_diff < 0 )
				} // }}}

				expect_num = packet_head->sequence_num + 1;
				sub_expect_num = packet_head->sub_sequence_num + 1;

				// 未发现错误,组合为一个整帧
				if ( !broken )
				{
					memcpy( merge_buff + count, temp, PACKET_DATA_LENGTH );
				}

				count += num - PACKET_HEAD_LENGTH;

				// 如果有一个数据包的开始标明为结束包,则已经到了这一帧的结束
				if ( TEST_VIDEO_END_BIT( packet_head->opt_bits ) )
				{
					verbose_output( 4, "End of Video frame" );
					break;
				}

				if ( count > max_size + DATA_OFFSET )
				{
					cerr << "Error! You shouldn't be here" << endl;
					break;
				}
				
			}  // if ( result == 0 ) }}}

		} // while ( 1 ) }}}

		verbose_output( 3, "Reading from network OK!" );

		if ( broken )
		{
			cerr << "Warning! Broken video frame" << endl;
			continue;
		}

		size = count - DATA_OFFSET;

		if ( memcmp( merge_buff, "VBGN", 4 ) == 0 )
		{
			// Video Capture Part
			verbose_output( 3, "video frame." );

			int data_size = *( (int*) (merge_buff+4) );

			verbose_output( 4, "total video frame size", data_size );
			verbose_output( 4, "receive video frame size", size );

			if ( data_size > max_size )
			{
				cerr << "Warning! Invalid video frame size: " << data_size << endl;
			}
			else
			{
				if ( data_size > size )
				{
					cerr << "Warning! Not a integral video frame!" << endl;
					size = data_size;
				}
				else
				{
					if ( data_size < size )
					{
						size = data_size;
					}
				}
			}

			memcpy( video_cap_buff[video_frame], merge_buff+DATA_OFFSET, size );

			//发送消息
			capture_event * event = new capture_event( VIDEO_EVENT, 
					video_cap_buff[video_frame], size );

			verbose_output( 3, "sending new video change event." );

			QApplication::postEvent( (QObject *) avi_ptr->video_player_ptr, (QEvent *) event );

			video_frame = ( video_frame + 1 ) % 3;

		}
		else
		{
			cerr << "Warning! Unexpected video frame!" << endl;
		} // if "VBGN"

	} // }}}

	return;

} // }}}