add option fifo

This commit is contained in:
wangyu-
2017-10-24 14:16:17 -05:00
parent cf08bb735d
commit 4dd37700e6
5 changed files with 195 additions and 49 deletions

View File

@@ -11,6 +11,15 @@
#include "lib/rs.h"
#include "fd_manager.h"
int g_fec_data_num=20;
int g_fec_redundant_num=10;
int g_fec_mtu=1250;
int g_fec_pending_num=200;
int g_fec_pending_time=8*1000; //8ms
int g_fec_type=1;
int dynamic_update_fec=1;
const int encode_fast_send=1;
const int decode_fast_send=1;
@@ -128,7 +137,8 @@ fec_encode_manager_t::fec_encode_manager_t()
}
timer_fd64=fd_manager.create(timer_fd);
re_init(4,2,1200,100,10000,0);
re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type);
seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug.
}
fec_encode_manager_t::~fec_encode_manager_t()
@@ -146,7 +156,7 @@ int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pen
fec_mtu=mtu;
fec_pending_num=pending_num;
fec_pending_time=pending_time;
this->type=type;
fec_type=type;
assert(data_num+redundant_num<max_fec_packet_num);
counter=0;
@@ -173,11 +183,11 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/)
its.it_value.tv_nsec=(tmp_time%1000000llu)*1000llu;
timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
}
if(type==0)//for type 0 use blob
if(fec_type==0)//for type 0 use blob
{
assert(blob_encode.input(s,len)==0);
}
else if(type==1)//for tpe 1 use input_buf and counter
else if(fec_type==1)//for tpe 1 use input_buf and counter
{
mylog(log_trace,"counter=%d\n",counter);
assert(len<=65535&&len>=0);
@@ -199,17 +209,27 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/)
}
int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
{
if(counter==0&&dynamic_update_fec)
{
fec_data_num=g_fec_data_num;
fec_redundant_num=g_fec_redundant_num;
fec_mtu=g_fec_mtu;
fec_pending_num=g_fec_pending_num;
fec_pending_time=g_fec_pending_time;
fec_type=g_fec_type;
}
int about_to_fec=0;
int delayed_append=0;
//int counter_back=counter;
assert(type==0||type==1);
assert(fec_type==0||fec_type==1);
if(type==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu)
if(fec_type==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu)
{
mylog(log_warn,"message too long len=%d,ignored\n",len);
return -1;
}
if(type==1&&s!=0&&len>=fec_mtu)
if(fec_type==1&&s!=0&&len>=fec_mtu)
{
mylog(log_warn,"message too long len=%d fec_mtu=%d,ignored\n",len,fec_mtu);
return -1;
@@ -221,10 +241,10 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
}
if(s==0) about_to_fec=1;//now
if(type==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet
if(fec_type==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet
if(type==0) assert(counter<fec_pending_num);//counter will never equal fec_pending_num,if that happens fec should already been done.
if(type==1) assert(counter<fec_data_num);
if(fec_type==0) assert(counter<fec_pending_num);//counter will never equal fec_pending_num,if that happens fec should already been done.
if(fec_type==1) assert(counter<fec_data_num);
if(s!=0&&!delayed_append)
@@ -232,9 +252,9 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
append(s,len);
}
if(type==0&& counter==fec_pending_num) about_to_fec=1;
if(fec_type==0&& counter==fec_pending_num) about_to_fec=1;
if(type==1&& counter==fec_data_num) about_to_fec=1;
if(fec_type==1&& counter==fec_data_num) about_to_fec=1;
if(about_to_fec)
@@ -252,7 +272,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
int actual_data_num;
int actual_redundant_num;
if(type==0)
if(fec_type==0)
{
actual_data_num=fec_data_num;
@@ -299,8 +319,8 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
write_u32(input_buf[i] + tmp_idx, seq);
tmp_idx += sizeof(u32_t);
input_buf[i][tmp_idx++] = (unsigned char) type;
if (type == 1 && i < actual_data_num)
input_buf[i][tmp_idx++] = (unsigned char) fec_type;
if (fec_type == 1 && i < actual_data_num)
{
input_buf[i][tmp_idx++] = (unsigned char) 0;
input_buf[i][tmp_idx++] = (unsigned char) 0;
@@ -313,7 +333,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
tmp_output_buf[i]=input_buf[i]+tmp_idx; //////caution ,trick here.
if(type==0)
if(fec_type==0)
{
output_len[i]=tmp_idx+fec_len;
if(i<actual_data_num)
@@ -383,7 +403,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
memset(&its,0,sizeof(its));
timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
if(encode_fast_send&&type==1)
if(encode_fast_send&&fec_type==1)
{
int packet_to_send[max_fec_packet_num+5]={0};
int packet_to_send_counter=0;
@@ -407,7 +427,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
}
else
{
if(encode_fast_send&&s!=0&&type==1)
if(encode_fast_send&&s!=0&&fec_type==1)
{
assert(counter>=1);
assert(counter<=255);
@@ -422,7 +442,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
write_u32(input_buf[input_buf_idx]+tmp_idx,seq);
tmp_idx+=sizeof(u32_t);
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)type;
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)fec_type;
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0;
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0;
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)((u32_t)input_buf_idx);
@@ -444,7 +464,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
if(s!=0&&delayed_append)
{
assert(type!=1);
assert(fec_type!=1);
append(s,len);
}