implemented commands

This commit is contained in:
wangyu- 2017-10-25 01:14:51 -05:00
parent 4dd37700e6
commit d6fc5dc072
3 changed files with 147 additions and 81 deletions

View File

@ -14,9 +14,9 @@
int g_fec_data_num=20; int g_fec_data_num=20;
int g_fec_redundant_num=10; int g_fec_redundant_num=10;
int g_fec_mtu=1250; int g_fec_mtu=1250;
int g_fec_pending_num=200; int g_fec_queue_len=200;
int g_fec_pending_time=8*1000; //8ms int g_fec_timeout=8*1000; //8ms
int g_fec_type=1; int g_fec_mode=1;
int dynamic_update_fec=1; int dynamic_update_fec=1;
@ -127,6 +127,15 @@ int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr)
return 0; return 0;
} }
fec_encode_manager_t::~fec_encode_manager_t()
{
fd_manager.fd64_close(timer_fd64);
}
u64_t fec_encode_manager_t::get_timer_fd64()
{
return timer_fd64;
}
fec_encode_manager_t::fec_encode_manager_t() fec_encode_manager_t::fec_encode_manager_t()
{ {
//int timer_fd; //int timer_fd;
@ -137,26 +146,18 @@ fec_encode_manager_t::fec_encode_manager_t()
} }
timer_fd64=fd_manager.create(timer_fd); timer_fd64=fd_manager.create(timer_fd);
re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type); re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_queue_len,fec_timeout,fec_mode);
seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug. seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug.
} }
fec_encode_manager_t::~fec_encode_manager_t() int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int queue_len,int timeout,int mode)
{
fd_manager.fd64_close(timer_fd64);
}
u64_t fec_encode_manager_t::get_timer_fd64()
{
return timer_fd64;
}
int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type)
{ {
fec_data_num=data_num; fec_data_num=data_num;
fec_redundant_num=redundant_num; fec_redundant_num=redundant_num;
fec_mtu=mtu; fec_mtu=mtu;
fec_pending_num=pending_num; fec_queue_len=queue_len;
fec_pending_time=pending_time; fec_timeout=timeout;
fec_type=type; fec_mode=mode;
assert(data_num+redundant_num<max_fec_packet_num); assert(data_num+redundant_num<max_fec_packet_num);
counter=0; counter=0;
@ -178,16 +179,16 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/)
itimerspec its; itimerspec its;
memset(&its.it_interval,0,sizeof(its.it_interval)); memset(&its.it_interval,0,sizeof(its.it_interval));
first_packet_time=get_current_time_us(); first_packet_time=get_current_time_us();
my_time_t tmp_time=fec_pending_time+first_packet_time; my_time_t tmp_time=fec_timeout+first_packet_time;
its.it_value.tv_sec=tmp_time/1000000llu; its.it_value.tv_sec=tmp_time/1000000llu;
its.it_value.tv_nsec=(tmp_time%1000000llu)*1000llu; its.it_value.tv_nsec=(tmp_time%1000000llu)*1000llu;
timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0); timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
} }
if(fec_type==0)//for type 0 use blob if(fec_mode==0)//for type 0 use blob
{ {
assert(blob_encode.input(s,len)==0); assert(blob_encode.input(s,len)==0);
} }
else if(fec_type==1)//for tpe 1 use input_buf and counter else if(fec_mode==1)//for tpe 1 use input_buf and counter
{ {
mylog(log_trace,"counter=%d\n",counter); mylog(log_trace,"counter=%d\n",counter);
assert(len<=65535&&len>=0); assert(len<=65535&&len>=0);
@ -214,22 +215,22 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
fec_data_num=g_fec_data_num; fec_data_num=g_fec_data_num;
fec_redundant_num=g_fec_redundant_num; fec_redundant_num=g_fec_redundant_num;
fec_mtu=g_fec_mtu; fec_mtu=g_fec_mtu;
fec_pending_num=g_fec_pending_num; fec_queue_len=g_fec_queue_len;
fec_pending_time=g_fec_pending_time; fec_timeout=g_fec_timeout;
fec_type=g_fec_type; fec_mode=g_fec_mode;
} }
int about_to_fec=0; int about_to_fec=0;
int delayed_append=0; int delayed_append=0;
//int counter_back=counter; //int counter_back=counter;
assert(fec_type==0||fec_type==1); assert(fec_mode==0||fec_mode==1);
if(fec_type==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) if(fec_mode==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu)
{ {
mylog(log_warn,"message too long len=%d,ignored\n",len); mylog(log_warn,"message too long len=%d,ignored\n",len);
return -1; return -1;
} }
if(fec_type==1&&s!=0&&len>=fec_mtu) if(fec_mode==1&&s!=0&&len>=fec_mtu)
{ {
mylog(log_warn,"message too long len=%d fec_mtu=%d,ignored\n",len,fec_mtu); mylog(log_warn,"message too long len=%d fec_mtu=%d,ignored\n",len,fec_mtu);
return -1; return -1;
@ -241,10 +242,10 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
} }
if(s==0) about_to_fec=1;//now if(s==0) about_to_fec=1;//now
if(fec_type==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet if(fec_mode==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet
if(fec_type==0) assert(counter<fec_pending_num);//counter will never equal fec_pending_num,if that happens fec should already been done. if(fec_mode==0) assert(counter<fec_queue_len);//counter will never equal fec_pending_num,if that happens fec should already been done.
if(fec_type==1) assert(counter<fec_data_num); if(fec_mode==1) assert(counter<fec_data_num);
if(s!=0&&!delayed_append) if(s!=0&&!delayed_append)
@ -252,9 +253,9 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
append(s,len); append(s,len);
} }
if(fec_type==0&& counter==fec_pending_num) about_to_fec=1; if(fec_mode==0&& counter==fec_queue_len) about_to_fec=1;
if(fec_type==1&& counter==fec_data_num) about_to_fec=1; if(fec_mode==1&& counter==fec_data_num) about_to_fec=1;
if(about_to_fec) if(about_to_fec)
@ -272,7 +273,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
int actual_data_num; int actual_data_num;
int actual_redundant_num; int actual_redundant_num;
if(fec_type==0) if(fec_mode==0)
{ {
actual_data_num=fec_data_num; actual_data_num=fec_data_num;
@ -319,8 +320,8 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
write_u32(input_buf[i] + tmp_idx, seq); write_u32(input_buf[i] + tmp_idx, seq);
tmp_idx += sizeof(u32_t); tmp_idx += sizeof(u32_t);
input_buf[i][tmp_idx++] = (unsigned char) fec_type; input_buf[i][tmp_idx++] = (unsigned char) fec_mode;
if (fec_type == 1 && i < actual_data_num) if (fec_mode == 1 && i < actual_data_num)
{ {
input_buf[i][tmp_idx++] = (unsigned char) 0; input_buf[i][tmp_idx++] = (unsigned char) 0;
input_buf[i][tmp_idx++] = (unsigned char) 0; input_buf[i][tmp_idx++] = (unsigned char) 0;
@ -333,7 +334,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
tmp_output_buf[i]=input_buf[i]+tmp_idx; //////caution ,trick here. tmp_output_buf[i]=input_buf[i]+tmp_idx; //////caution ,trick here.
if(fec_type==0) if(fec_mode==0)
{ {
output_len[i]=tmp_idx+fec_len; output_len[i]=tmp_idx+fec_len;
if(i<actual_data_num) if(i<actual_data_num)
@ -403,7 +404,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
memset(&its,0,sizeof(its)); memset(&its,0,sizeof(its));
timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0); timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
if(encode_fast_send&&fec_type==1) if(encode_fast_send&&fec_mode==1)
{ {
int packet_to_send[max_fec_packet_num+5]={0}; int packet_to_send[max_fec_packet_num+5]={0};
int packet_to_send_counter=0; int packet_to_send_counter=0;
@ -427,7 +428,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
} }
else else
{ {
if(encode_fast_send&&s!=0&&fec_type==1) if(encode_fast_send&&s!=0&&fec_mode==1)
{ {
assert(counter>=1); assert(counter>=1);
assert(counter<=255); assert(counter<=255);
@ -442,7 +443,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
write_u32(input_buf[input_buf_idx]+tmp_idx,seq); write_u32(input_buf[input_buf_idx]+tmp_idx,seq);
tmp_idx+=sizeof(u32_t); tmp_idx+=sizeof(u32_t);
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)fec_type; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)fec_mode;
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0;
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0;
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)((u32_t)input_buf_idx); input_buf[input_buf_idx][tmp_idx++]=(unsigned char)((u32_t)input_buf_idx);
@ -464,7 +465,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
if(s!=0&&delayed_append) if(s!=0&&delayed_append)
{ {
assert(fec_type!=1); assert(fec_mode!=1);
append(s,len); append(s,len);
} }

View File

@ -23,9 +23,9 @@ extern u32_t fec_buff_num;
extern int g_fec_data_num; extern int g_fec_data_num;
extern int g_fec_redundant_num; extern int g_fec_redundant_num;
extern int g_fec_mtu; extern int g_fec_mtu;
extern int g_fec_pending_num; extern int g_fec_queue_len;
extern int g_fec_pending_time; //8ms extern int g_fec_timeout; //8ms
extern int g_fec_type; extern int g_fec_mode;
extern int dynamic_update_fec; extern int dynamic_update_fec;
/*end for first time init or dynamic update*/ /*end for first time init or dynamic update*/
@ -109,11 +109,11 @@ class fec_encode_manager_t
private: private:
u32_t seq; u32_t seq;
int fec_type; int fec_mode;
int fec_data_num,fec_redundant_num; int fec_data_num,fec_redundant_num;
int fec_mtu; int fec_mtu;
int fec_pending_num; int fec_queue_len;
int fec_pending_time; int fec_timeout;
my_time_t first_packet_time; my_time_t first_packet_time;
my_time_t first_packet_time_for_output; my_time_t first_packet_time_for_output;
@ -147,12 +147,12 @@ public:
int get_pending_time() int get_pending_time()
{ {
return fec_pending_time; return fec_timeout;
} }
int get_type() int get_type()
{ {
return fec_type; return fec_mode;
} }
u64_t get_timer_fd64(); u64_t get_timer_fd64();
int re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type); int re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type);

135
main.cpp
View File

@ -302,6 +302,84 @@ int from_fec_to_normal(conn_info_t & conn_info,char *data,int len,int & out_n,ch
//my_send(dest,data,len); //my_send(dest,data,len);
return 0; return 0;
} }
int print_parameter()
{
mylog(log_info,"jitter_min=%d jitter_max=%d output_interval_min=%d output_interval_max=%d fec_timeout=%d fec_data_num=%d fec_redundant_num=%d fec_mtu=%d fec_queue_len=%d fec_mode=%d\n",
jitter_min/1000,jitter_max/1000,output_interval_min/1000,output_interval_max/1000,g_fec_timeout/1000,
g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_queue_len,g_fec_mode);
return 0;
}
int handle_command(char *s)
{
int len=strlen(s);
while(len>=1&&s[len-1]=='\n')
s[len-1]=0;
mylog(log_info,"got data from fifo,len=%d,s=[%s]\n",len,s);
int a=-1,b=-1;
if(strncmp(s,"fec",strlen("fec"))==0)
{
mylog(log_info,"got command [fec]\n");
sscanf(s,"fec %d:%d",&a,&b);
if(a<1||b<0||a+b>254)
{
mylog(log_warn,"invaild value\n");
return -1;
}
g_fec_data_num=a;
g_fec_redundant_num=b;
}
else if(strncmp(s,"mtu",strlen("mtu"))==0)
{
mylog(log_info,"got command [mtu]\n");
sscanf(s,"mtu %d",&a);
if(a<100||a>2000)
{
mylog(log_warn,"invaild value\n");
return -1;
}
g_fec_mtu=a;
}
else if(strncmp(s,"queue-len",strlen("queue-len"))==0)
{
mylog(log_info,"got command [queue-len]\n");
sscanf(s,"queue-len %d",&a);
if(a<1||a>10000)
{
mylog(log_warn,"invaild value\n");
return -1;
}
g_fec_queue_len=a;
}
else if(strncmp(s,"mode",strlen("mode"))==0)
{
mylog(log_info,"got command [mode]\n");
sscanf(s,"mode %d",&a);
if(a!=0&&a!=1)
{
mylog(log_warn,"invaild value\n");
return -1;
}
g_fec_mode=a;
}
else if(strncmp(s,"timeout",strlen("timeout"))==0)
{
mylog(log_info,"got command [timeout]\n");
sscanf(s,"timeout %d",&a);
if(a<0||a>1000)
{
mylog(log_warn,"invaild value\n");
return -1;
}
g_fec_timeout=a*1000;
}
else
{
mylog(log_info,"unknown command\n");
}
print_parameter();
return 0;
}
int client_event_loop() int client_event_loop()
{ {
//char buf[buf_len]; //char buf[buf_len];
@ -441,19 +519,13 @@ int client_event_loop()
{ {
char buf[buf_len]; char buf[buf_len];
int len=read (fifo_fd, buf, sizeof (buf)); int len=read (fifo_fd, buf, sizeof (buf));
assert(len>=0); if(len<0)
{
mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno));
continue;
}
buf[len]=0; buf[len]=0;
while(len>=1&&buf[len-1]=='\n') handle_command(buf);
buf[len-1]=0;
mylog(log_info,"got data from fifo,len=%d,s=[%s]\n",len,buf);
if(0)
{
}
else
{
mylog(log_info,"unknown command\n");
}
} }
else if (events[idx].data.u64 == (u64_t)local_listen_fd||events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()) else if (events[idx].data.u64 == (u64_t)local_listen_fd||events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64())
{ {
@ -742,19 +814,13 @@ int server_event_loop()
{ {
char buf[buf_len]; char buf[buf_len];
int len=read (fifo_fd, buf, sizeof (buf)); int len=read (fifo_fd, buf, sizeof (buf));
assert(len>=0); if(len<0)
{
mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno));
continue;
}
buf[len]=0; buf[len]=0;
while(len>=1&&buf[len-1]=='\n') handle_command(buf);
buf[len-1]=0;
mylog(log_info,"got data from fifo,len=%d,s=[%s]\n",len,buf);
if(0)
{
}
else
{
mylog(log_info,"unknown command\n");
}
} }
else if (events[idx].data.u64 == (u64_t)local_listen_fd) else if (events[idx].data.u64 == (u64_t)local_listen_fd)
@ -1165,7 +1231,7 @@ int unit_test()
int * len; int * len;
fec_decode_manager.output(n,s_arr,len); fec_decode_manager.output(n,s_arr,len);
fec_encode_manager.re_init(3,2,g_fec_mtu,g_fec_pending_num,g_fec_pending_time,1); fec_encode_manager.re_init(3,2,g_fec_mtu,g_fec_queue_len,g_fec_timeout,1);
fec_encode_manager.input((char *) a.c_str(), a.length()); fec_encode_manager.input((char *) a.c_str(), a.length());
fec_encode_manager.output(n,s_arr,len); fec_encode_manager.output(n,s_arr,len);
@ -1442,8 +1508,8 @@ void process_arg(int argc, char *argv[])
} }
break; break;
case 'q': case 'q':
sscanf(optarg,"%d",&g_fec_pending_num); sscanf(optarg,"%d",&g_fec_queue_len);
if(g_fec_pending_num<1||g_fec_pending_num>10000) if(g_fec_queue_len<1||g_fec_queue_len>10000)
{ {
mylog(log_fatal,"fec_pending_num should be between 1 and 10000\n"); mylog(log_fatal,"fec_pending_num should be between 1 and 10000\n");
@ -1582,8 +1648,8 @@ void process_arg(int argc, char *argv[])
} }
else if(strcmp(long_options[option_index].name,"mode")==0) else if(strcmp(long_options[option_index].name,"mode")==0)
{ {
sscanf(optarg,"%d",&g_fec_type); sscanf(optarg,"%d",&g_fec_mode);
if(g_fec_type!=0&&g_fec_type!=1) if(g_fec_mode!=0&&g_fec_mode!=1)
{ {
mylog(log_fatal,"mode should be 0 or 1\n"); mylog(log_fatal,"mode should be 0 or 1\n");
myexit(-1); myexit(-1);
@ -1600,14 +1666,14 @@ void process_arg(int argc, char *argv[])
} }
else if(strcmp(long_options[option_index].name,"timeout")==0) else if(strcmp(long_options[option_index].name,"timeout")==0)
{ {
sscanf(optarg,"%d",&g_fec_pending_time); sscanf(optarg,"%d",&g_fec_timeout);
if(g_fec_pending_time<0||g_fec_pending_time>1000) if(g_fec_timeout<0||g_fec_timeout>1000)
{ {
mylog(log_fatal,"fec_pending_time should be between 0 and 1000(1s)\n"); mylog(log_fatal,"fec_pending_time should be between 0 and 1000(1s)\n");
myexit(-1); myexit(-1);
} }
g_fec_pending_time*=1000; g_fec_timeout*=1000;
} }
else if(strcmp(long_options[option_index].name,"fifo")==0) else if(strcmp(long_options[option_index].name,"fifo")==0)
{ {
@ -1652,9 +1718,8 @@ void process_arg(int argc, char *argv[])
program_mode=server_mode; program_mode=server_mode;
} }
mylog(log_info,"jitter_min=%d jitter_max=%d output_interval_min=%d output_interval_max=%d fec_pending_time=%d fec_data_num=%d fec_redundant_num=%d fec_mtu=%d fec_pending_num=%d fec_type=%d\n", print_parameter();
jitter_min/1000,jitter_max/1000,output_interval_min/1000,output_interval_max/1000,g_fec_pending_time/1000,
g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_pending_num,g_fec_type);
} }
int main(int argc, char *argv[]) int main(int argc, char *argv[])