it works again

This commit is contained in:
wangyu- 2017-10-10 13:53:08 -05:00
parent 4680b1a3a1
commit 321afdb627

View File

@ -189,6 +189,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
{ {
int about_to_fec=0; int about_to_fec=0;
int delayed_append=0; int delayed_append=0;
//int counter_back=counter;
if(type==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) if(type==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu)
{ {
mylog(log_warn,"message too long len=%d,ignored\n",len); mylog(log_warn,"message too long len=%d,ignored\n",len);
@ -302,6 +303,9 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
mylog(log_info,"!!! s= %d\n"); mylog(log_info,"!!! s= %d\n");
ready_for_output=1;
seq++;
counter=0;
output_n=actual_data_num+actual_redundant_num; output_n=actual_data_num+actual_redundant_num;
blob_encode.clear(); blob_encode.clear();
@ -309,19 +313,18 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
memset(&its,0,sizeof(its)); memset(&its,0,sizeof(its));
timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0); timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
if(encode_fast_send&&s!=0&&type==1) if(encode_fast_send&&type==1)
{ {
int packet_to_send[max_fec_packet_num+5]={0}; int packet_to_send[max_fec_packet_num+5]={0};
int packet_to_send_counter=0; int packet_to_send_counter=0;
assert(counter!=0); //assert(counter!=0);
if(s!=0) if(s!=0)
packet_to_send[packet_to_send_counter++]=counter-1; packet_to_send[packet_to_send_counter++]=actual_data_num-1;
for(int i=actual_data_num;i<actual_data_num+actual_redundant_num;i++) for(int i=actual_data_num;i<actual_data_num+actual_redundant_num;i++)
{ {
{
packet_to_send[packet_to_send_counter++]=i; packet_to_send[packet_to_send_counter++]=i;
}
} }
output_n=packet_to_send_counter;//re write output_n=packet_to_send_counter;//re write
for(int i=0;i<packet_to_send_counter;i++) for(int i=0;i<packet_to_send_counter;i++)
@ -331,11 +334,6 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
} }
} }
ready_for_output=1;
seq++;
counter=0;
} }
else else
{ {
@ -351,10 +349,11 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
int tmp_idx=0; int tmp_idx=0;
write_u32(buf[buf_idx]+tmp_idx,seq); write_u32(buf[buf_idx]+tmp_idx,seq);
tmp_idx+=sizeof(u32_t); tmp_idx+=sizeof(u32_t);
buf[buf_idx][tmp_idx++]=(unsigned char)type; buf[buf_idx][tmp_idx++]=(unsigned char)type;
buf[buf_idx][tmp_idx++]=(unsigned char)0; buf[buf_idx][tmp_idx++]=(unsigned char)0;
buf[buf_idx][tmp_idx++]=(unsigned char)0; buf[buf_idx][tmp_idx++]=(unsigned char)0;
buf[buf_idx][tmp_idx++]=(unsigned char)((u32_t)counter); buf[buf_idx][tmp_idx++]=(unsigned char)((u32_t)buf_idx);
output_len[0]=buf_s_len[buf_idx]+tmp_idx; output_len[0]=buf_s_len[buf_idx]+tmp_idx;
output_buf[0]=buf[buf_idx]; output_buf[0]=buf[buf_idx];
@ -440,10 +439,16 @@ int fec_decode_manager_t::input(char *s,int len)
} }
if(!anti_replay.is_vaild(seq)) if(!anti_replay.is_vaild(seq))
{ {
mylog(log_info,"failed here2\n"); //mylog(log_info,"failed here2\n");
return 0; return 0;
} }
if(mp[seq].group_mp.find(inner_index)!=mp[seq].group_mp.end() )
{
mylog(log_info,"dup inner_index\n");
return -1;
}
int ok=1; int ok=1;
if(mp[seq].type==-1) if(mp[seq].type==-1)
mp[seq].type=type; mp[seq].type=type;
@ -452,7 +457,12 @@ int fec_decode_manager_t::input(char *s,int len)
if(mp[seq].type!=type) ok=0; if(mp[seq].type!=type) ok=0;
} }
if(type==0) assert(data_num!=0); if(type==0&&data_num==0)
{
mylog(log_warn,"unexpected here,data_num=0\n");
return -1;
}
if(data_num!=0) if(data_num!=0)
{ {
@ -467,18 +477,21 @@ int fec_decode_manager_t::input(char *s,int len)
else else
{ {
if(mp[seq].data_num!=data_num||mp[seq].redundant_num!=redundant_num||mp[seq].len!=len) if(mp[seq].data_num!=data_num||mp[seq].redundant_num!=redundant_num||mp[seq].len!=len)
{
mylog(log_warn,"unexpected here\n");
ok=0; ok=0;
}
} }
} }
if(ok==0) if(ok==0)
{ {
mylog(log_info,"ok=0\n"); //mylog(log_info,"ok=0\n");
return -1; return -1;
} }
else else
{ {
mylog(log_info,"ok=1\n"); //mylog(log_info,"ok=1\n");
} }
if(fec_data[index].used!=0) if(fec_data[index].used!=0)
@ -519,43 +532,58 @@ int fec_decode_manager_t::input(char *s,int len)
{ {
if(mp[seq].data_num!=-1) if(mp[seq].data_num!=-1)
{ {
data_num=mp[seq].data_num; //int old_data_num=data_num;
redundant_num=mp[seq].redundant_num; //int old_redundant_num=redundant_num;
if(mp[seq].data_counter>data_num) //data_num=mp[seq].data_num;
//redundant_num=mp[seq].redundant_num;
/*if(mp[seq].data_counter>data_num) //invaild
{ {
mylog(log_warn,"unexpected mp[seq].data_counter>mp[seq].data_num\n"); mylog(log_warn,"unexpected mp[seq].data_counter>mp[seq].data_num\n");
anti_replay.set_invaild(seq); anti_replay.set_invaild(seq);
return -1; return -1;
}*/
//assert((int)inner_mp.size()<=data_num);
if((int)inner_mp.size()>=mp[seq].data_num)
{
about_to_fec=1;
} }
else if(mp[seq].data_counter==data_num)
/*
else if(mp[seq].data_counter==data_num) //no need to fec . (received first redundant packet ,or received a data packet after redunant packet)
{ {
anti_replay.set_invaild(seq);// dont do fec,but still set invaild anti_replay.set_invaild(seq);// dont do fec,but still set invaild
} }
else //mp[seq].data_counter < mp[seq].data_num else //mp[seq].data_counter < mp[seq].data_num
{ {
if((int)inner_mp.size()>data_num+1) if((int)inner_mp.size()>data_num)
{ {
mylog(log_warn,"unexpected (int)inner_mp.size()>data_num+1\n"); mylog(log_warn,"unexpected (int)inner_mp.size()>data_num+1\n");
anti_replay.set_invaild(seq); anti_replay.set_invaild(seq);
return -1; return -1;
} }*/
//if((int)inner_mp.size()==data_num)
//about_to_fec=1;
/*
if((int)inner_mp.size()==data_num+1) if((int)inner_mp.size()==data_num+1)
{ {
anti_replay.set_invaild(seq); anti_replay.set_invaild(seq);
if(data_num==0)
if(old_data_num==0)
return 0; return 0;
else else
{ {
//mylog(log_warn,"data_num!=0\n"); //this is possible mylog(log_warn,"data_num=0\n");
return -1; return -1;
} }
} }*/
if((int)inner_mp.size()==data_num)
about_to_fec=1; //}
}
} }
else else
{ {
@ -564,9 +592,13 @@ int fec_decode_manager_t::input(char *s,int len)
//for() //for()
} }
if(about_to_fec) if(about_to_fec)
{ {
mylog(log_error,"fec here!\n"); int group_data_num=mp[seq].data_num;
int group_redundant_num=mp[seq].redundant_num;
//mylog(log_error,"fec here!\n");
if(type==0) if(type==0)
{ {
char *fec_tmp_arr[max_fec_packet_num+5]={0}; char *fec_tmp_arr[max_fec_packet_num+5]={0};
@ -574,9 +606,9 @@ int fec_decode_manager_t::input(char *s,int len)
{ {
fec_tmp_arr[it->first]=fec_data[it->second].buf; fec_tmp_arr[it->first]=fec_data[it->second].buf;
} }
rs_decode2(data_num,data_num+redundant_num,fec_tmp_arr,len); //the input data has been modified in-place rs_decode2(group_data_num,group_data_num+group_redundant_num,fec_tmp_arr,len); //the input data has been modified in-place
blob_decode.clear(); blob_decode.clear();
for(int i=0;i<data_num;i++) for(int i=0;i<group_data_num;i++)
{ {
blob_decode.input(fec_tmp_arr[i],len); blob_decode.input(fec_tmp_arr[i],len);
} }
@ -587,6 +619,8 @@ int fec_decode_manager_t::input(char *s,int len)
} }
else else
{ {
int max_len=-1; int max_len=-1;
int fec_ok=1; int fec_ok=1;
int debug_num=inner_mp.size(); int debug_num=inner_mp.size();
@ -594,7 +628,7 @@ int fec_decode_manager_t::input(char *s,int len)
//memset(output_s_arr_buf,0,sizeof(output_s_arr_buf));//in efficient //memset(output_s_arr_buf,0,sizeof(output_s_arr_buf));//in efficient
for(int i=0;i<data_num+redundant_num;i++) for(int i=0;i<group_data_num+group_redundant_num;i++)
{ {
output_s_arr_buf[i]=0; output_s_arr_buf[i]=0;
} }
@ -613,7 +647,7 @@ int fec_decode_manager_t::input(char *s,int len)
int missed_packet[max_fec_packet_num+5]; int missed_packet[max_fec_packet_num+5];
int missed_packet_counter=0; int missed_packet_counter=0;
for(int i=0;i<data_num;i++) for(int i=0;i<group_data_num;i++)
{ {
if(output_s_arr_buf[i]==0 ||i==inner_index) //only missed packet +current packet if(output_s_arr_buf[i]==0 ||i==inner_index) //only missed packet +current packet
{ {
@ -621,14 +655,14 @@ int fec_decode_manager_t::input(char *s,int len)
} }
} }
rs_decode2(data_num,data_num+redundant_num,output_s_arr_buf,max_len); rs_decode2(group_data_num,group_data_num+group_redundant_num,output_s_arr_buf,max_len);
for(int i=0;i<data_num;i++) for(int i=0;i<group_data_num;i++)
{ {
output_len_arr_buf[i]=read_u16(output_s_arr_buf[i]); output_len_arr_buf[i]=read_u16(output_s_arr_buf[i]);
output_s_arr_buf[i]+=sizeof(u16_t); output_s_arr_buf[i]+=sizeof(u16_t);
if(output_len_arr_buf[i]>max_data_len) if(output_len_arr_buf[i]>max_data_len)
{ {
mylog(log_warn,"invaild len %d,seq= %u,data_num= %d r_num= %d,i= %d\n",output_len_arr_buf[i],seq,data_num,redundant_num,i); mylog(log_warn,"invaild len %d,seq= %u,data_num= %d r_num= %d,i= %d\n",output_len_arr_buf[i],seq,group_data_num,group_redundant_num,i);
fec_ok=0; fec_ok=0;
for(int i=0;i<missed_packet_counter;i++) for(int i=0;i<missed_packet_counter;i++)
{ {
@ -641,7 +675,7 @@ int fec_decode_manager_t::input(char *s,int len)
if(fec_ok) if(fec_ok)
{ {
output_n=data_num; output_n=group_data_num;
if(decode_fast_send) if(decode_fast_send)
{ {