diff --git a/connection.cpp b/connection.cpp index f285448..12fdd7f 100644 --- a/connection.cpp +++ b/connection.cpp @@ -11,7 +11,6 @@ const int disable_conv_clear=0;//a udp connection in the multiplexer is called c const int disable_conn_clear=0;//a raw connection is called conn. -conn_manager_t conn_manager; void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection), //so we have to close the fd when conv expires diff --git a/fec_manager.cpp b/fec_manager.cpp index b41cb77..47f2e54 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -40,13 +40,13 @@ int blob_encode_t::get_shard_len(int n,int next_packet_len) int blob_encode_t::input(char *s,int len) { - assert(current_len+len+sizeof(u16_t) <=max_fec_packet_num*buf_len); + assert(current_len+len+sizeof(u16_t) +100=0); counter++; assert(counter<=max_fec_pending_packet_num); - write_u16(buf+current_len,len); + write_u16(input_buf+current_len,len); current_len+=sizeof(u16_t); - memcpy(buf+current_len,s,len); + memcpy(input_buf+current_len,s,len); current_len+=len; return 0; } @@ -54,12 +54,12 @@ int blob_encode_t::input(char *s,int len) int blob_encode_t::output(int n,char ** &s_arr,int & len) { len=round_up_div(current_len,n); - write_u32(buf,counter); + write_u32(input_buf,counter); for(int i=0;icurrent_len) return -1; + if(parser_pos+(int)sizeof(u32_t)>current_len) {mylog(log_info,"failed 0\n");return -1;} - n=(int)read_u32(buf+parser_pos); + n=(int)read_u32(input_buf+parser_pos); if(n>max_fec_pending_packet_num) {mylog(log_info,"failed 1\n");return -1;} - s_arr=s_buf; - len_arr=len_buf; + s_arr=output_buf; + len_arr=output_len; parser_pos+=sizeof(u32_t); for(int i=0;icurrent_len) {mylog(log_info,"failed2 \n");return -1;} - len_arr[i]=(int)read_u16(buf+parser_pos); + len_arr[i]=(int)read_u16(input_buf+parser_pos); parser_pos+=(int)sizeof(u16_t); if(parser_pos+len_arr[i]>current_len) {mylog(log_info,"failed 3 %d %d %d\n",parser_pos,len_arr[i],current_len);return -1;} - s_arr[i]=buf+parser_pos; + s_arr[i]=input_buf+parser_pos; parser_pos+=len_arr[i]; } return 0; @@ -123,6 +123,7 @@ fec_encode_manager_t::fec_encode_manager_t() timer_fd64=fd_manager.create(timer_fd); re_init(4,2,1200,100,10000,0); + seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug. } fec_encode_manager_t::~fec_encode_manager_t() { @@ -141,10 +142,11 @@ int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pen fec_pending_time=pending_time; this->type=type; + assert(data_num+redundant_num<255); counter=0; blob_encode.clear(); ready_for_output=0; - seq=0; + //seq=0; itimerspec zero_its; memset(&zero_its, 0, sizeof(zero_its)); @@ -166,14 +168,14 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/) } if(type==0) { - blob_encode.input(s,len); + assert(blob_encode.input(s,len)==0); } else if(type==1) { mylog(log_trace,"counter=%d\n",counter); assert(len<=65535&&len>=0); char * p=input_buf[counter]+sizeof(u32_t)+4*sizeof(char); - write_u16(p,(u16_t)((u32_t)len)); + write_u16(p,(u16_t)((u32_t)len)); //TODO omit this u16 for data packet while sending p+=sizeof(u16_t); memcpy(p,s,len);//remember to change this,if protocol is modified input_len[counter]=len+sizeof(u16_t); @@ -190,6 +192,8 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) int about_to_fec=0; int delayed_append=0; //int counter_back=counter; + assert(type==0||type==1); + if(type==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) { mylog(log_warn,"message too long len=%d,ignored\n",len); @@ -200,10 +204,15 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) mylog(log_warn,"message too long len=%d,ignored\n",len); return -1; } + if(s==0&&counter==0) + { + mylog(log_warn,"unexpected s==0&&counter==0\n"); + return -1; + } if(s==0) about_to_fec=1;//now - assert(type==0||type==1); if(type==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet + if(type==0) assert(countermax_fec_packet_num) + if(data_num+redundant_num>=max_fec_packet_num) { mylog(log_info,"failed here\n"); return -1; } if(!anti_replay.is_vaild(seq)) { - //mylog(log_info,"failed here2\n"); + mylog(log_trace,"!anti_replay.is_vaild(seq) ,seq =%u\n",seq); return 0; } if(mp[seq].group_mp.find(inner_index)!=mp[seq].group_mp.end() ) { - mylog(log_info,"dup inner_index\n"); + mylog(log_debug,"dup fec index\n"); + return -1; + } + + + if(type==0&&data_num==0) + { + mylog(log_warn,"unexpected here,data_num=0\n"); return -1; } @@ -457,13 +472,6 @@ int fec_decode_manager_t::input(char *s,int len) if(mp[seq].type!=type) ok=0; } - if(type==0&&data_num==0) - { - mylog(log_warn,"unexpected here,data_num=0\n"); - return -1; - } - - if(data_num!=0) { mp[seq].data_counter++; @@ -486,13 +494,9 @@ int fec_decode_manager_t::input(char *s,int len) if(ok==0) { - //mylog(log_info,"ok=0\n"); + mylog(log_warn,"fec data invaild\n"); return -1; } - else - { - //mylog(log_info,"ok=1\n"); - } if(fec_data[index].used!=0) { @@ -504,6 +508,7 @@ int fec_decode_manager_t::input(char *s,int len) } if(tmp_seq==seq) { + mylog(log_warn,"unexpected tmp_seq==seq ,seq=%d\n",seq); return -1; } } @@ -517,6 +522,7 @@ int fec_decode_manager_t::input(char *s,int len) fec_data[index].len=len; memcpy(fec_data[index].buf,s+tmp_idx,len); mp[seq].group_mp[inner_index]=index; + //index++ at end of function map &inner_mp=mp[seq].group_mp; @@ -606,7 +612,7 @@ int fec_decode_manager_t::input(char *s,int len) { fec_tmp_arr[it->first]=fec_data[it->second].buf; } - rs_decode2(group_data_num,group_data_num+group_redundant_num,fec_tmp_arr,len); //the input data has been modified in-place + assert(rs_decode2(group_data_num,group_data_num+group_redundant_num,fec_tmp_arr,len)==0); //the input data has been modified in-place blob_decode.clear(); for(int i=0;i