diff --git a/fec_manager.cpp b/fec_manager.cpp index aeed512..6f19e5c 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -14,9 +14,9 @@ int g_fec_data_num=20; int g_fec_redundant_num=10; int g_fec_mtu=1250; -int g_fec_pending_num=200; -int g_fec_pending_time=8*1000; //8ms -int g_fec_type=1; +int g_fec_queue_len=200; +int g_fec_timeout=8*1000; //8ms +int g_fec_mode=1; int dynamic_update_fec=1; @@ -127,6 +127,15 @@ int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr) return 0; } + +fec_encode_manager_t::~fec_encode_manager_t() +{ + fd_manager.fd64_close(timer_fd64); +} +u64_t fec_encode_manager_t::get_timer_fd64() +{ + return timer_fd64; +} fec_encode_manager_t::fec_encode_manager_t() { //int timer_fd; @@ -137,26 +146,18 @@ fec_encode_manager_t::fec_encode_manager_t() } timer_fd64=fd_manager.create(timer_fd); - re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type); + re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_queue_len,fec_timeout,fec_mode); seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug. } -fec_encode_manager_t::~fec_encode_manager_t() -{ - fd_manager.fd64_close(timer_fd64); -} -u64_t fec_encode_manager_t::get_timer_fd64() -{ - return timer_fd64; -} -int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type) +int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int queue_len,int timeout,int mode) { fec_data_num=data_num; fec_redundant_num=redundant_num; fec_mtu=mtu; - fec_pending_num=pending_num; - fec_pending_time=pending_time; - fec_type=type; + fec_queue_len=queue_len; + fec_timeout=timeout; + fec_mode=mode; assert(data_num+redundant_num=0); @@ -214,22 +215,22 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) fec_data_num=g_fec_data_num; fec_redundant_num=g_fec_redundant_num; fec_mtu=g_fec_mtu; - fec_pending_num=g_fec_pending_num; - fec_pending_time=g_fec_pending_time; - fec_type=g_fec_type; + fec_queue_len=g_fec_queue_len; + fec_timeout=g_fec_timeout; + fec_mode=g_fec_mode; } int about_to_fec=0; int delayed_append=0; //int counter_back=counter; - assert(fec_type==0||fec_type==1); + assert(fec_mode==0||fec_mode==1); - if(fec_type==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) + if(fec_mode==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) { mylog(log_warn,"message too long len=%d,ignored\n",len); return -1; } - if(fec_type==1&&s!=0&&len>=fec_mtu) + if(fec_mode==1&&s!=0&&len>=fec_mtu) { mylog(log_warn,"message too long len=%d fec_mtu=%d,ignored\n",len,fec_mtu); return -1; @@ -241,10 +242,10 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) } if(s==0) about_to_fec=1;//now - if(fec_type==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet + if(fec_mode==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet - if(fec_type==0) assert(counter=1); assert(counter<=255); @@ -442,7 +443,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) write_u32(input_buf[input_buf_idx]+tmp_idx,seq); tmp_idx+=sizeof(u32_t); - input_buf[input_buf_idx][tmp_idx++]=(unsigned char)fec_type; + input_buf[input_buf_idx][tmp_idx++]=(unsigned char)fec_mode; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)((u32_t)input_buf_idx); @@ -464,7 +465,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) if(s!=0&&delayed_append) { - assert(fec_type!=1); + assert(fec_mode!=1); append(s,len); } diff --git a/fec_manager.h b/fec_manager.h index cfbdd91..7c42a3d 100644 --- a/fec_manager.h +++ b/fec_manager.h @@ -23,9 +23,9 @@ extern u32_t fec_buff_num; extern int g_fec_data_num; extern int g_fec_redundant_num; extern int g_fec_mtu; -extern int g_fec_pending_num; -extern int g_fec_pending_time; //8ms -extern int g_fec_type; +extern int g_fec_queue_len; +extern int g_fec_timeout; //8ms +extern int g_fec_mode; extern int dynamic_update_fec; /*end for first time init or dynamic update*/ @@ -109,11 +109,11 @@ class fec_encode_manager_t private: u32_t seq; - int fec_type; + int fec_mode; int fec_data_num,fec_redundant_num; int fec_mtu; - int fec_pending_num; - int fec_pending_time; + int fec_queue_len; + int fec_timeout; my_time_t first_packet_time; my_time_t first_packet_time_for_output; @@ -147,12 +147,12 @@ public: int get_pending_time() { - return fec_pending_time; + return fec_timeout; } int get_type() { - return fec_type; + return fec_mode; } u64_t get_timer_fd64(); int re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type); diff --git a/main.cpp b/main.cpp index 2df0500..d111cce 100644 --- a/main.cpp +++ b/main.cpp @@ -302,6 +302,84 @@ int from_fec_to_normal(conn_info_t & conn_info,char *data,int len,int & out_n,ch //my_send(dest,data,len); return 0; } +int print_parameter() +{ + mylog(log_info,"jitter_min=%d jitter_max=%d output_interval_min=%d output_interval_max=%d fec_timeout=%d fec_data_num=%d fec_redundant_num=%d fec_mtu=%d fec_queue_len=%d fec_mode=%d\n", + jitter_min/1000,jitter_max/1000,output_interval_min/1000,output_interval_max/1000,g_fec_timeout/1000, + g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_queue_len,g_fec_mode); + return 0; +} +int handle_command(char *s) +{ + int len=strlen(s); + while(len>=1&&s[len-1]=='\n') + s[len-1]=0; + mylog(log_info,"got data from fifo,len=%d,s=[%s]\n",len,s); + int a=-1,b=-1; + if(strncmp(s,"fec",strlen("fec"))==0) + { + mylog(log_info,"got command [fec]\n"); + sscanf(s,"fec %d:%d",&a,&b); + if(a<1||b<0||a+b>254) + { + mylog(log_warn,"invaild value\n"); + return -1; + } + g_fec_data_num=a; + g_fec_redundant_num=b; + } + else if(strncmp(s,"mtu",strlen("mtu"))==0) + { + mylog(log_info,"got command [mtu]\n"); + sscanf(s,"mtu %d",&a); + if(a<100||a>2000) + { + mylog(log_warn,"invaild value\n"); + return -1; + } + g_fec_mtu=a; + } + else if(strncmp(s,"queue-len",strlen("queue-len"))==0) + { + mylog(log_info,"got command [queue-len]\n"); + sscanf(s,"queue-len %d",&a); + if(a<1||a>10000) + { + mylog(log_warn,"invaild value\n"); + return -1; + } + g_fec_queue_len=a; + } + else if(strncmp(s,"mode",strlen("mode"))==0) + { + mylog(log_info,"got command [mode]\n"); + sscanf(s,"mode %d",&a); + if(a!=0&&a!=1) + { + mylog(log_warn,"invaild value\n"); + return -1; + } + g_fec_mode=a; + } + else if(strncmp(s,"timeout",strlen("timeout"))==0) + { + mylog(log_info,"got command [timeout]\n"); + sscanf(s,"timeout %d",&a); + if(a<0||a>1000) + { + mylog(log_warn,"invaild value\n"); + return -1; + } + g_fec_timeout=a*1000; + } + else + { + mylog(log_info,"unknown command\n"); + } + print_parameter(); + + return 0; +} int client_event_loop() { //char buf[buf_len]; @@ -441,19 +519,13 @@ int client_event_loop() { char buf[buf_len]; int len=read (fifo_fd, buf, sizeof (buf)); - assert(len>=0); + if(len<0) + { + mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno)); + continue; + } buf[len]=0; - while(len>=1&&buf[len-1]=='\n') - buf[len-1]=0; - mylog(log_info,"got data from fifo,len=%d,s=[%s]\n",len,buf); - if(0) - { - } - else - { - mylog(log_info,"unknown command\n"); - } - + handle_command(buf); } else if (events[idx].data.u64 == (u64_t)local_listen_fd||events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()) { @@ -742,19 +814,13 @@ int server_event_loop() { char buf[buf_len]; int len=read (fifo_fd, buf, sizeof (buf)); - assert(len>=0); + if(len<0) + { + mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno)); + continue; + } buf[len]=0; - while(len>=1&&buf[len-1]=='\n') - buf[len-1]=0; - mylog(log_info,"got data from fifo,len=%d,s=[%s]\n",len,buf); - if(0) - { - } - else - { - mylog(log_info,"unknown command\n"); - } - + handle_command(buf); } else if (events[idx].data.u64 == (u64_t)local_listen_fd) @@ -1165,7 +1231,7 @@ int unit_test() int * len; fec_decode_manager.output(n,s_arr,len); - fec_encode_manager.re_init(3,2,g_fec_mtu,g_fec_pending_num,g_fec_pending_time,1); + fec_encode_manager.re_init(3,2,g_fec_mtu,g_fec_queue_len,g_fec_timeout,1); fec_encode_manager.input((char *) a.c_str(), a.length()); fec_encode_manager.output(n,s_arr,len); @@ -1442,8 +1508,8 @@ void process_arg(int argc, char *argv[]) } break; case 'q': - sscanf(optarg,"%d",&g_fec_pending_num); - if(g_fec_pending_num<1||g_fec_pending_num>10000) + sscanf(optarg,"%d",&g_fec_queue_len); + if(g_fec_queue_len<1||g_fec_queue_len>10000) { mylog(log_fatal,"fec_pending_num should be between 1 and 10000\n"); @@ -1582,8 +1648,8 @@ void process_arg(int argc, char *argv[]) } else if(strcmp(long_options[option_index].name,"mode")==0) { - sscanf(optarg,"%d",&g_fec_type); - if(g_fec_type!=0&&g_fec_type!=1) + sscanf(optarg,"%d",&g_fec_mode); + if(g_fec_mode!=0&&g_fec_mode!=1) { mylog(log_fatal,"mode should be 0 or 1\n"); myexit(-1); @@ -1600,14 +1666,14 @@ void process_arg(int argc, char *argv[]) } else if(strcmp(long_options[option_index].name,"timeout")==0) { - sscanf(optarg,"%d",&g_fec_pending_time); - if(g_fec_pending_time<0||g_fec_pending_time>1000) + sscanf(optarg,"%d",&g_fec_timeout); + if(g_fec_timeout<0||g_fec_timeout>1000) { mylog(log_fatal,"fec_pending_time should be between 0 and 1000(1s)\n"); myexit(-1); } - g_fec_pending_time*=1000; + g_fec_timeout*=1000; } else if(strcmp(long_options[option_index].name,"fifo")==0) { @@ -1652,9 +1718,8 @@ void process_arg(int argc, char *argv[]) program_mode=server_mode; } - mylog(log_info,"jitter_min=%d jitter_max=%d output_interval_min=%d output_interval_max=%d fec_pending_time=%d fec_data_num=%d fec_redundant_num=%d fec_mtu=%d fec_pending_num=%d fec_type=%d\n", - jitter_min/1000,jitter_max/1000,output_interval_min/1000,output_interval_max/1000,g_fec_pending_time/1000, - g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_pending_num,g_fec_type); + print_parameter(); + } int main(int argc, char *argv[])