From 93cbe528cfe699bd3b49c64126cc6de46d36ebde Mon Sep 17 00:00:00 2001 From: wangyu- Date: Tue, 17 Oct 2017 12:35:03 -0500 Subject: [PATCH] more robust,more log --- common.cpp | 2 +- common.h | 4 +- connection.cpp | 1 + connection.h | 50 ++++++++++++++++++++ fec_manager.cpp | 120 ++++++++++++++++++++++++++++-------------------- fec_manager.h | 13 +++--- main.cpp | 70 ++++++++++++++++++++++++---- 7 files changed, 192 insertions(+), 68 deletions(-) diff --git a/common.cpp b/common.cpp index c2cdf7f..d7648fd 100644 --- a/common.cpp +++ b/common.cpp @@ -15,7 +15,7 @@ int about_to_exit=0; raw_mode_t raw_mode=mode_faketcp; unordered_map raw_mode_tostring = {{mode_faketcp, "faketcp"}, {mode_udp, "udp"}, {mode_icmp, "icmp"}}; -int max_pending_packet=0; +int delay_capacity=0; //static int random_number_fd=-1; char iptables_rule[200]=""; //int is_client = 0, is_server = 0; diff --git a/common.h b/common.h index 0911165..5994d69 100644 --- a/common.h +++ b/common.h @@ -63,7 +63,7 @@ typedef short i16_t; typedef u64_t my_time_t; -const int max_data_len=2000; +const int max_data_len=2200; const int buf_len=max_data_len+200; const u32_t conv_clear_interval=200; @@ -117,7 +117,7 @@ enum program_mode_t {unset_mode=0,client_mode,server_mode}; extern program_mode_t program_mode; extern unordered_map raw_mode_tostring ; -extern int max_pending_packet; +extern int delay_capacity; typedef u32_t id_t; diff --git a/connection.cpp b/connection.cpp index 12fdd7f..d5d6a3b 100644 --- a/connection.cpp +++ b/connection.cpp @@ -11,6 +11,7 @@ const int disable_conv_clear=0;//a udp connection in the multiplexer is called c const int disable_conn_clear=0;//a raw connection is called conn. +int report_interval=0; void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection), //so we have to close the fd when conv expires diff --git a/connection.h b/connection.h index e9c57d0..2c326ae 100644 --- a/connection.h +++ b/connection.h @@ -17,6 +17,7 @@ extern int disable_anti_replay; #include "fd_manager.h" #include "fec_manager.h" +extern int report_interval; struct conv_manager_t // manage the udp connections { @@ -51,6 +52,54 @@ struct conv_manager_t // manage the udp connections int clear_inactive(char * ip_port=0); int clear_inactive0(char * ip_port); }; + + +struct inner_stat_t +{ + u64_t input_packet_num; + u64_t input_packet_size; + u64_t output_packet_num; + u64_t output_packet_size; +}; +struct stat_t +{ + u64_t last_report_time; + inner_stat_t normal_to_fec; + inner_stat_t fec_to_normal; + stat_t() + { + memset(this,0,sizeof(stat_t)); + } + void report_as_client() + { + if(report_interval!=0 &&get_current_time()-last_report_time>u64_t(report_interval)*1000) + { + last_report_time=get_current_time(); + inner_stat_t &a=normal_to_fec; + inner_stat_t &b=fec_to_normal; + mylog(log_info,"[report]client-->server:(original:%llu pkt;%llu byte) (fec:%llu pkt,%llu byte) server-->client:(original:%llu pkt;%llu byte) (fec:%llu pkt;%llu byte)\n", + a.input_packet_num,a.input_packet_size,a.output_packet_num,a.output_packet_size, + b.output_packet_num,b.output_packet_size,b.input_packet_num,b.input_packet_size + ); + } + } + void report_as_server(ip_port_t &ip_port) + { + if(report_interval!=0 &&get_current_time()-last_report_time>u64_t(report_interval)*1000) + { + last_report_time=get_current_time(); + inner_stat_t &a=fec_to_normal; + inner_stat_t &b=normal_to_fec; + mylog(log_info,"[report][%s]client-->server:(original:%llu pkt;%llu byte) (fec:%llu pkt;%llu byte) server-->client:(original:%llu pkt;%llu byte) (fec:%llu pkt;%llu byte)\n", + ip_port.to_s(), + a.output_packet_num,a.output_packet_size,a.input_packet_num,a.input_packet_size, + b.input_packet_num,b.input_packet_size,b.output_packet_num,b.output_packet_size + ); + } + } +}; + + struct conn_info_t //stores info for a raw connection.for client ,there is only one connection,for server there can be thousand of connection since server can //handle multiple clients { @@ -60,6 +109,7 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o my_timer_t timer; ip_port_t ip_port; u64_t last_active_time; + stat_t stat; conn_info_t() { } diff --git a/fec_manager.cpp b/fec_manager.cpp index 2e3583e..03baca7 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -47,7 +47,7 @@ int blob_encode_t::input(char *s,int len) assert(current_len+len+sizeof(u16_t) +100=0); counter++; - assert(counter<=max_fec_pending_packet_num); + assert(counter<=max_blob_packet_num); write_u16(input_buf+current_len,len); current_len+=sizeof(u16_t); memcpy(input_buf+current_len,s,len); @@ -99,7 +99,7 @@ int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr) if(parser_pos+(int)sizeof(u32_t)>current_len) {mylog(log_info,"failed 0\n");return -1;} n=(int)read_u32(input_buf+parser_pos); - if(n>max_fec_pending_packet_num) {mylog(log_info,"failed 1\n");return -1;} + if(n>max_blob_packet_num) {mylog(log_info,"failed 1\n");return -1;} s_arr=output_buf; len_arr=output_len; @@ -146,7 +146,7 @@ int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pen fec_pending_time=pending_time; this->type=type; - assert(data_num+redundant_num<255); + assert(data_num+redundant_num=0); @@ -221,7 +221,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) if(type==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet - if(type==0) assert(counter=1); assert(counter<=255); - int buf_idx=counter-1; + int input_buf_idx=counter-1; assert(ready_for_output==0); ready_for_output=1; first_packet_time_for_output=0; output_n=1; int tmp_idx=0; - write_u32(input_buf[buf_idx]+tmp_idx,seq); + write_u32(input_buf[input_buf_idx]+tmp_idx,seq); tmp_idx+=sizeof(u32_t); - input_buf[buf_idx][tmp_idx++]=(unsigned char)type; - input_buf[buf_idx][tmp_idx++]=(unsigned char)0; - input_buf[buf_idx][tmp_idx++]=(unsigned char)0; - input_buf[buf_idx][tmp_idx++]=(unsigned char)((u32_t)buf_idx); + input_buf[input_buf_idx][tmp_idx++]=(unsigned char)type; + input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0; + input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0; + input_buf[input_buf_idx][tmp_idx++]=(unsigned char)((u32_t)input_buf_idx); - output_len[0]=input_len[buf_idx]+tmp_idx; - output_buf[0]=input_buf[buf_idx]; + output_len[0]=input_len[input_buf_idx]+tmp_idx; + output_buf[0]=input_buf[input_buf_idx]; if(0) { - printf("seq=%u,buf_idx=%d\n",seq,buf_idx); + printf("seq=%u,buf_idx=%d\n",seq,input_buf_idx); for(int j=0;j=max_fec_packet_num) { - mylog(log_warn,"failed here\n"); + mylog(log_warn,"data_num+redundant_num>=max_fec_packet_num\n"); return -1; } if(!anti_replay.is_vaild(seq)) @@ -524,18 +538,11 @@ int fec_decode_manager_t::input(char *s,int len) if(mp[seq].group_mp.find(inner_index)!=mp[seq].group_mp.end() ) { - mylog(log_debug,"dup fec index\n"); + mylog(log_debug,"dup fec index\n");//duplicate can happen on a normal network, so its just log_debug return -1; } - if(type==0&&data_num==0) - { - mylog(log_warn,"unexpected type==0&&data_num==0\n"); - return -1; - } - - int ok=1; if(mp[seq].type==-1) mp[seq].type=type; else @@ -543,7 +550,7 @@ int fec_decode_manager_t::input(char *s,int len) if(mp[seq].type!=type) { mylog(log_warn,"type mismatch\n"); - ok=0; + return -1; } } @@ -561,18 +568,12 @@ int fec_decode_manager_t::input(char *s,int len) { if(mp[seq].data_num!=data_num||mp[seq].redundant_num!=redundant_num||mp[seq].len!=len) { - mylog(log_warn,"unexpected here\n"); - ok=0; + mylog(log_warn,"unexpected mp[seq].data_num!=data_num||mp[seq].redundant_num!=redundant_num||mp[seq].len!=len\n"); + return -1; } } } - if(ok==0) - { - mylog(log_warn,"fec packets invaild\n"); - return -1; - } - if(fec_data[index].used!=0) { u32_t tmp_seq=fec_data[index].seq; @@ -607,7 +608,13 @@ int fec_decode_manager_t::input(char *s,int len) int about_to_fec=0; if(type==0) { - assert((int)inner_mp.size()<=data_num); + //assert((int)inner_mp.size()<=data_num); + if((int)inner_mp.size()>data_num) + { + mylog(log_warn,"inner_mp.size()>data_num\n"); + anti_replay.set_invaild(seq); + goto end; + } if((int)inner_mp.size()==data_num) about_to_fec=1; } @@ -615,6 +622,12 @@ int fec_decode_manager_t::input(char *s,int len) { if(mp[seq].data_num!=-1) { + if((int)inner_mp.size()>data_num+1) + { + mylog(log_warn,"inner_mp.size()>data_num+1\n"); + anti_replay.set_invaild(seq); + goto end; + } if((int)inner_mp.size()>=mp[seq].data_num) { about_to_fec=1; @@ -637,17 +650,25 @@ int fec_decode_manager_t::input(char *s,int len) fec_tmp_arr[it->first]=fec_data[it->second].buf; } assert(rs_decode2(group_data_num,group_data_num+group_redundant_num,fec_tmp_arr,len)==0); //the input data has been modified in-place + //this line should always succeed + blob_decode.clear(); for(int i=0;isecond].len > max_len) max_len=fec_data[it->second].len; } - if(max_len!=mp[seq].len) { data_check_ok=0; @@ -687,13 +707,15 @@ int fec_decode_manager_t::input(char *s,int len) } if(data_check_ok==0) { - ready_for_output=0; + //ready_for_output=0; + mylog(log_warn,"data_check_ok==0\n"); anti_replay.set_invaild(seq); goto end; } for(auto it=inner_mp.begin();it!=inner_mp.end();it++) { int tmp_idx=it->second; + assert(max_len>=fec_data[tmp_idx].len);//guarenteed by data_check_ok memset(fec_data[tmp_idx].buf+fec_data[tmp_idx].len,0,max_len-fec_data[tmp_idx].len); } @@ -706,7 +728,7 @@ int fec_decode_manager_t::input(char *s,int len) } mylog(log_trace,"fec done,%d %d,missed_packet_counter=%d\n",group_data_num,group_redundant_num,missed_packet_counter); - assert(rs_decode2(group_data_num,group_data_num+group_redundant_num,output_s_arr_buf,max_len)==0); + assert(rs_decode2(group_data_num,group_data_num+group_redundant_num,output_s_arr_buf,max_len)==0);//this should always succeed for(int i=0;i=(current_time - first_packet_time)) { @@ -209,7 +222,11 @@ int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,ch } for(int i=0;i255) + if(fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>254) { - mylog(log_fatal,"fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>255\n"); + mylog(log_fatal,"fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>254\n"); myexit(-1); } } @@ -1329,9 +1369,9 @@ void process_arg(int argc, char *argv[]) break; case 'm': sscanf(optarg,"%d",&fec_mtu); - if(fec_mtu<500||fec_mtu>1600) + if(fec_mtu<100||fec_mtu>2000) { - mylog(log_fatal,"fec_mtu should be between 500 and 1600\n"); + mylog(log_fatal,"fec_mtu should be between 100 and 2000\n"); myexit(-1); } break; @@ -1452,6 +1492,16 @@ void process_arg(int argc, char *argv[]) myexit(-1); } } + else if(strcmp(long_options[option_index].name,"delay-capacity")==0) + { + sscanf(optarg,"%d",&delay_capacity); + + if(delay_capacity<0) + { + mylog(log_fatal,"delay_capacity must be >=0 \n"); + myexit(-1); + } + } else if(strcmp(long_options[option_index].name,"report")==0) { sscanf(optarg,"%d",&report_interval); @@ -1541,7 +1591,7 @@ int main(int argc, char *argv[]) int i, j, k; process_arg(argc,argv); - delay_manager.set_capacity(max_pending_packet); + delay_manager.set_capacity(delay_capacity); local_ip_uint32=inet_addr(local_ip); remote_ip_uint32=inet_addr(remote_ip);