From 489805d90e1671c84e9ccaa11140065e70c3cda0 Mon Sep 17 00:00:00 2001 From: wangyu- Date: Sun, 5 Aug 2018 10:51:33 -0500 Subject: [PATCH] prepare for better tuneable fec parameter --- fec_manager.cpp | 107 ++++++++++++++++++++++++++---------------------- fec_manager.h | 81 ++++++++++++++++++++++++++++-------- misc.cpp | 78 ++++++++++++++++++++++++----------- 3 files changed, 177 insertions(+), 89 deletions(-) diff --git a/fec_manager.cpp b/fec_manager.cpp index 1ade0da..97eb045 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -11,14 +11,16 @@ #include "lib/rs.h" #include "fd_manager.h" -int g_fec_data_num=20; -int g_fec_redundant_num=10; -int g_fec_mtu=1250; -int g_fec_queue_len=200; -int g_fec_timeout=8*1000; //8ms -int g_fec_mode=0; +//int g_fec_data_num=20; +//int g_fec_redundant_num=10; +//int g_fec_mtu=1250; +//int g_fec_queue_len=200; +//int g_fec_timeout=8*1000; //8ms +//int g_fec_mode=0; -int dynamic_update_fec=1; +fec_parameter_t g_fec_par; + +//int dynamic_update_fec=1; const int encode_fast_send=1; const int decode_fast_send=1; @@ -153,10 +155,13 @@ fec_encode_manager_t::fec_encode_manager_t() timer_fd64=fd_manager.create(timer_fd);*/ - reset_fec_parameter(g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_queue_len,g_fec_timeout,g_fec_mode); + /////reset_fec_parameter(g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_queue_len,g_fec_timeout,g_fec_mode); + fec_par.clone(g_fec_par); + clear_data(); } +/* int fec_encode_manager_t::reset_fec_parameter(int data_num,int redundant_num,int mtu,int queue_len,int timeout,int mode) { fec_data_num=data_num; @@ -172,7 +177,7 @@ int fec_encode_manager_t::reset_fec_parameter(int data_num,int redundant_num,int clear_data(); return 0; -} +}*/ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/) { if(counter==0) @@ -180,7 +185,7 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/) my_itimerspec its; memset(&its.it_interval,0,sizeof(its.it_interval)); first_packet_time=get_current_time_us(); - my_time_t tmp_time=fec_timeout+first_packet_time; + my_time_t tmp_time=fec_par.timeout+first_packet_time; its.it_value.tv_sec=tmp_time/1000000llu; its.it_value.tv_nsec=(tmp_time%1000000llu)*1000llu; //timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0); @@ -191,11 +196,11 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/) //ev_timer_set(loop,) } - if(fec_mode==0)//for type 0 use blob + if(fec_par.mode==0)//for type 0 use blob { assert(blob_encode.input(s,len)==0); } - else if(fec_mode==1)//for tpe 1 use input_buf and counter + else if(fec_par.mode==1)//for tpe 1 use input_buf and counter { mylog(log_trace,"counter=%d\n",counter); assert(len<=65535&&len>=0); @@ -217,33 +222,28 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/) } int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) { - if(counter==0&&dynamic_update_fec) + if(counter==0&&fec_par.version!=g_fec_par.version) { - fec_data_num=g_fec_data_num; - fec_redundant_num=g_fec_redundant_num; - fec_mtu=g_fec_mtu; - fec_queue_len=g_fec_queue_len; - fec_timeout=g_fec_timeout; - fec_mode=g_fec_mode; + fec_par.clone(g_fec_par); } int about_to_fec=0; int delayed_append=0; //int counter_back=counter; - assert(fec_mode==0||fec_mode==1); + assert(fec_par.mode==0||fec_par.mode==1); - if(fec_mode==0&& s!=0 &&counter==0) + if(fec_par.mode==0&& s!=0 &&counter==0) { - int out_len=blob_encode.get_shard_len(fec_data_num,len); - if(out_len>fec_mtu) + int out_len=blob_encode.get_shard_len(fec_par.get_tail().x,len); + if(out_len>fec_par.mtu) { - mylog(log_warn,"message too long ori_len=%d out_len=%d fec_mtu=%d,ignored\n",len,out_len,fec_mtu); + mylog(log_warn,"message too long ori_len=%d out_len=%d fec_mtu=%d,ignored\n",len,out_len,fec_par.mtu); return -1; } } - if(fec_mode==1&&s!=0&&len>fec_mtu) + if(fec_par.mode==1&&s!=0&&len>fec_par.mtu) { - mylog(log_warn,"mode==1,message len=%d,len>fec_mtu,fec_mtu=%d,packet may not be delivered\n",len,fec_mtu); + mylog(log_warn,"mode==1,message len=%d,len>fec_mtu,fec_mtu=%d,packet may not be delivered\n",len,fec_par.mtu); //return -1; } if(s==0&&counter==0) @@ -253,10 +253,10 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) } if(s==0) about_to_fec=1;//now - if(fec_mode==0&& blob_encode.get_shard_len(fec_data_num,len)>fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet + if(fec_par.mode==0&& blob_encode.get_shard_len(fec_par.get_tail().x,len)>fec_par.mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet - if(fec_mode==0) assert(counter(u32_t)fec_mtu) continue; + assert(fec_par.rs_par[i-1].x==i); + int tmp_x=fec_par.rs_par[i-1].x; + int tmp_y=fec_par.rs_par[i-1].y; + assert(tmp_x==i); + u32_t shard_len=blob_encode.get_shard_len(tmp_x,0); + if(shard_len>(u32_t)fec_par.mtu) continue; - u32_t new_len=(shard_len+header_overhead)*(i+fec_redundant_num); + u32_t new_len=(shard_len+header_overhead)*(tmp_x+tmp_y); if(new_len=1&&best_data_num<=fec_par.rs_cnt); + actual_redundant_num=fec_par.rs_par[best_data_num-1].y; + mylog(log_trace,"actual_data_num=%d actual_redundant_num=%d\n",actual_data_num,actual_redundant_num); } assert(blob_encode.output(actual_data_num,blob_output,fec_len)==0); } else { + assert(counter<=fec_par.rs_cnt); actual_data_num=counter; - actual_redundant_num=fec_redundant_num; + actual_redundant_num=fec_par.rs_par[counter-1].y; for(int i=0;i=1); assert(counter<=255); @@ -458,7 +465,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) write_u32(input_buf[input_buf_idx]+tmp_idx,seq); tmp_idx+=sizeof(u32_t); - input_buf[input_buf_idx][tmp_idx++]=(unsigned char)fec_mode; + input_buf[input_buf_idx][tmp_idx++]=(unsigned char)fec_par.mode; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)((u32_t)input_buf_idx); @@ -480,7 +487,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) if(s!=0&&delayed_append) { - assert(fec_mode!=1); + assert(fec_par.mode!=1); append(s,len); } diff --git a/fec_manager.h b/fec_manager.h index 01ed22a..7d59e0e 100644 --- a/fec_manager.h +++ b/fec_manager.h @@ -19,15 +19,58 @@ const int max_fec_packet_num=255;// this is the limitation of the rs lib extern u32_t fec_buff_num; -/*begin for first time init or dynamic update*/ -extern int g_fec_data_num; -extern int g_fec_redundant_num; -extern int g_fec_mtu; -extern int g_fec_queue_len; -extern int g_fec_timeout; //8ms -extern int g_fec_mode; -extern int dynamic_update_fec; -/*end for first time init or dynamic update*/ +struct fec_parameter_t +{ + int version=0; + int mtu=1250; + int queue_len=200; + int timeout=8*1000; + int mode=0; + + int rs_cnt; + struct rs_parameter_t //parameters for reed solomon + { + unsigned char x;//AKA fec_data_num (x should be same as +1 at the moment) + unsigned char y;//fec_redundant_num + }rs_par[255+10]; + + int rs_from_str(char * s) + { + return 0; + } + + char *rs_to_str() + { + return 0; + } + + rs_parameter_t get_tail() + { + assert(rs_cnt>=1); + return rs_par[rs_cnt-1]; + } + + + int clone(fec_parameter_t & other) + { + version=other.version; + mtu=other.mtu; + queue_len=other.queue_len; + timeout=other.timeout; + mode=other.mode; + + assert(other.rs_cnt>=1); + rs_cnt=other.rs_cnt; + memcpy(rs_par,other.rs_par,sizeof(rs_parameter_t)*rs_cnt); + + return 0; + } + + +}; + +extern fec_parameter_t g_fec_par; +//extern int dynamic_update_fec; const int anti_replay_timeout=60*1000;// 60s @@ -134,11 +177,13 @@ class fec_encode_manager_t private: u32_t seq; - int fec_mode; - int fec_data_num,fec_redundant_num; - int fec_mtu; - int fec_queue_len; - int fec_timeout; + //int fec_mode; + //int fec_data_num,fec_redundant_num; + //int fec_mtu; + //int fec_queue_len; + //int fec_timeout; + fec_parameter_t fec_par; + my_time_t first_packet_time; my_time_t first_packet_time_for_output; @@ -168,6 +213,10 @@ public: fec_encode_manager_t(); ~fec_encode_manager_t(); + fec_parameter_t & get_fec_par() + { + return fec_par; + } void set_data(void * data) { timer.data=data; @@ -221,12 +270,12 @@ public: int get_pending_time() { - return fec_timeout; + return fec_par.timeout; } int get_type() { - return fec_mode; + return fec_par.mode; } //u64_t get_timer_fd64(); int reset_fec_parameter(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type); diff --git a/misc.cpp b/misc.cpp index 573d5e1..2959223 100644 --- a/misc.cpp +++ b/misc.cpp @@ -53,6 +53,8 @@ int tun_mtu=1500; int mssfix=1; +char rs_par_str[max_fec_packet_num*10+100]; + int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,char **&out_arr,int *&out_len,my_time_t *&out_delay) { @@ -252,9 +254,10 @@ int delay_send(my_time_t delay,const dest_t &dest,char *data,int len) int print_parameter() { - mylog(log_info,"jitter_min=%d jitter_max=%d output_interval_min=%d output_interval_max=%d fec_timeout=%d fec_data_num=%d fec_redundant_num=%d fec_mtu=%d fec_queue_len=%d fec_mode=%d\n", - jitter_min/1000,jitter_max/1000,output_interval_min/1000,output_interval_max/1000,g_fec_timeout/1000, - g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_queue_len,g_fec_mode); + mylog(log_info,"jitter_min=%d jitter_max=%d output_interval_min=%d output_interval_max=%d fec_timeout=%d fec_mtu=%d fec_queue_len=%d fec_mode=%d\n", + jitter_min/1000,jitter_max/1000,output_interval_min/1000,output_interval_max/1000,g_fec_par.timeout/1000,g_fec_par.mtu,g_fec_par.queue_len,g_fec_par.mode); + mylog(log_info,"fec_str=%s\n",rs_par_str); + mylog(log_info,"fec_inner_parameter=%s\n",g_fec_par.rs_to_str()); return 0; } int handle_command(char *s) @@ -267,14 +270,27 @@ int handle_command(char *s) if(strncmp(s,"fec",strlen("fec"))==0) { mylog(log_info,"got command [fec]\n"); - sscanf(s,"fec %d:%d",&a,&b); + char tmp_str[max_fec_packet_num*10+100]; + fec_parameter_t tmp_par; + sscanf(s,"fec %s",tmp_str); + /* if(a<1||b<0||a+b>254) { mylog(log_warn,"invaild value\n"); return -1; + }*/ + int ret=tmp_par.rs_from_str(tmp_str); + if(ret!=0) + { + mylog(log_warn,"failed to parse [%s]\n",tmp_str); + return -1; } - g_fec_data_num=a; - g_fec_redundant_num=b; + int version=g_fec_par.version; + g_fec_par.clone(tmp_par); + g_fec_par.version=version; + g_fec_par.version++; + //g_fec_data_num=a; + //g_fec_redundant_num=b; } else if(strncmp(s,"mtu",strlen("mtu"))==0) { @@ -285,7 +301,7 @@ int handle_command(char *s) mylog(log_warn,"invaild value\n"); return -1; } - g_fec_mtu=a; + g_fec_par.mtu=a; } else if(strncmp(s,"queue-len",strlen("queue-len"))==0) { @@ -296,7 +312,7 @@ int handle_command(char *s) mylog(log_warn,"invaild value\n"); return -1; } - g_fec_queue_len=a; + g_fec_par.queue_len=a; } else if(strncmp(s,"mode",strlen("mode"))==0) { @@ -307,7 +323,7 @@ int handle_command(char *s) mylog(log_warn,"invaild value\n"); return -1; } - g_fec_mode=a; + g_fec_par.mode=a; } else if(strncmp(s,"timeout",strlen("timeout"))==0) { @@ -318,7 +334,7 @@ int handle_command(char *s) mylog(log_warn,"invaild value\n"); return -1; } - g_fec_timeout=a*1000; + g_fec_par.timeout=a*1000; } else { @@ -440,7 +456,7 @@ int unit_test() static fec_encode_manager_t fec_encode_manager; static fec_decode_manager_t fec_decode_manager; - dynamic_update_fec=0; + //dynamic_update_fec=0; fec_encode_manager.set_loop_and_cb(ev_default_loop(0),empty_cb); @@ -534,7 +550,14 @@ int unit_test() int * len; fec_decode_manager.output(n,s_arr,len); - fec_encode_manager.reset_fec_parameter(3,2,g_fec_mtu,g_fec_queue_len,g_fec_timeout,1); + //fec_encode_manager.reset_fec_parameter(3,2,g_fec_mtu,g_fec_queue_len,g_fec_timeout,1); + + fec_parameter_t &fec_par=fec_encode_manager.get_fec_par(); + fec_par.mtu=g_fec_par.mtu; + fec_par.queue_len=g_fec_par.queue_len; + fec_par.timeout=g_fec_par.timeout; + fec_par.mode=1; + fec_par.rs_from_str((char *)"3:2"); fec_encode_manager.input((char *) a.c_str(), a.length()); fec_encode_manager.output(n,s_arr,len); @@ -749,17 +772,19 @@ void process_arg(int argc, char *argv[]) } else { - sscanf(optarg,"%d:%d\n",&g_fec_data_num,&g_fec_redundant_num); + strcpy(rs_par_str,optarg); + //sscanf(optarg,"%d:%d\n",&g_fec_data_num,&g_fec_redundant_num); + /* if(g_fec_data_num<1 ||g_fec_redundant_num<0||g_fec_data_num+g_fec_redundant_num>254) { mylog(log_fatal,"fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>254\n"); myexit(-1); - } + }*/ } break; case 'q': - sscanf(optarg,"%d",&g_fec_queue_len); - if(g_fec_queue_len<1||g_fec_queue_len>10000) + sscanf(optarg,"%d",&g_fec_par.queue_len); + if(g_fec_par.queue_len<1||g_fec_par.queue_len>10000) { mylog(log_fatal,"fec_pending_num should be between 1 and 10000\n"); @@ -879,8 +904,8 @@ void process_arg(int argc, char *argv[]) } else if(strcmp(long_options[option_index].name,"mode")==0) { - sscanf(optarg,"%d",&g_fec_mode); - if(g_fec_mode!=0&&g_fec_mode!=1) + sscanf(optarg,"%d",&g_fec_par.mode); + if(g_fec_par.mode!=0&&g_fec_par.mode!=1) { mylog(log_fatal,"mode should be 0 or 1\n"); myexit(-1); @@ -888,8 +913,8 @@ void process_arg(int argc, char *argv[]) } else if(strcmp(long_options[option_index].name,"mtu")==0) { - sscanf(optarg,"%d",&g_fec_mtu); - if(g_fec_mtu<100||g_fec_mtu>2000) + sscanf(optarg,"%d",&g_fec_par.mtu); + if(g_fec_par.mtu<100||g_fec_par.mtu>2000) { mylog(log_fatal,"fec_mtu should be between 100 and 2000\n"); myexit(-1); @@ -897,14 +922,14 @@ void process_arg(int argc, char *argv[]) } else if(strcmp(long_options[option_index].name,"timeout")==0) { - sscanf(optarg,"%d",&g_fec_timeout); - if(g_fec_timeout<0||g_fec_timeout>1000) + sscanf(optarg,"%d",&g_fec_par.timeout); + if(g_fec_par.timeout<0||g_fec_par.timeout>1000) { mylog(log_fatal,"fec_pending_time should be between 0 and 1000(1s)\n"); myexit(-1); } - g_fec_timeout*=1000; + g_fec_par.timeout*=1000; } else if(strcmp(long_options[option_index].name,"fifo")==0) { @@ -996,6 +1021,13 @@ void process_arg(int argc, char *argv[]) } } + int ret=g_fec_par.rs_from_str(rs_par_str); + if(ret!=0) + { + mylog(log_fatal,"failed to parse [rs_par_str]\n"); + myexit(-1); + } + print_parameter(); }