more robust,more log

This commit is contained in:
wangyu- 2017-10-17 12:35:03 -05:00
parent 9e96a9432b
commit 93cbe528cf
7 changed files with 192 additions and 68 deletions

View File

@ -15,7 +15,7 @@ int about_to_exit=0;
raw_mode_t raw_mode=mode_faketcp;
unordered_map<int, const char*> raw_mode_tostring = {{mode_faketcp, "faketcp"}, {mode_udp, "udp"}, {mode_icmp, "icmp"}};
int max_pending_packet=0;
int delay_capacity=0;
//static int random_number_fd=-1;
char iptables_rule[200]="";
//int is_client = 0, is_server = 0;

View File

@ -63,7 +63,7 @@ typedef short i16_t;
typedef u64_t my_time_t;
const int max_data_len=2000;
const int max_data_len=2200;
const int buf_len=max_data_len+200;
const u32_t conv_clear_interval=200;
@ -117,7 +117,7 @@ enum program_mode_t {unset_mode=0,client_mode,server_mode};
extern program_mode_t program_mode;
extern unordered_map<int, const char*> raw_mode_tostring ;
extern int max_pending_packet;
extern int delay_capacity;
typedef u32_t id_t;

View File

@ -11,6 +11,7 @@ const int disable_conv_clear=0;//a udp connection in the multiplexer is called c
const int disable_conn_clear=0;//a raw connection is called conn.
int report_interval=0;
void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection),
//so we have to close the fd when conv expires

View File

@ -17,6 +17,7 @@ extern int disable_anti_replay;
#include "fd_manager.h"
#include "fec_manager.h"
extern int report_interval;
struct conv_manager_t // manage the udp connections
{
@ -51,6 +52,54 @@ struct conv_manager_t // manage the udp connections
int clear_inactive(char * ip_port=0);
int clear_inactive0(char * ip_port);
};
struct inner_stat_t
{
u64_t input_packet_num;
u64_t input_packet_size;
u64_t output_packet_num;
u64_t output_packet_size;
};
struct stat_t
{
u64_t last_report_time;
inner_stat_t normal_to_fec;
inner_stat_t fec_to_normal;
stat_t()
{
memset(this,0,sizeof(stat_t));
}
void report_as_client()
{
if(report_interval!=0 &&get_current_time()-last_report_time>u64_t(report_interval)*1000)
{
last_report_time=get_current_time();
inner_stat_t &a=normal_to_fec;
inner_stat_t &b=fec_to_normal;
mylog(log_info,"[report]client-->server:(original:%llu pkt;%llu byte) (fec:%llu pkt,%llu byte) server-->client:(original:%llu pkt;%llu byte) (fec:%llu pkt;%llu byte)\n",
a.input_packet_num,a.input_packet_size,a.output_packet_num,a.output_packet_size,
b.output_packet_num,b.output_packet_size,b.input_packet_num,b.input_packet_size
);
}
}
void report_as_server(ip_port_t &ip_port)
{
if(report_interval!=0 &&get_current_time()-last_report_time>u64_t(report_interval)*1000)
{
last_report_time=get_current_time();
inner_stat_t &a=fec_to_normal;
inner_stat_t &b=normal_to_fec;
mylog(log_info,"[report][%s]client-->server:(original:%llu pkt;%llu byte) (fec:%llu pkt;%llu byte) server-->client:(original:%llu pkt;%llu byte) (fec:%llu pkt;%llu byte)\n",
ip_port.to_s(),
a.output_packet_num,a.output_packet_size,a.input_packet_num,a.input_packet_size,
b.input_packet_num,b.input_packet_size,b.output_packet_num,b.output_packet_size
);
}
}
};
struct conn_info_t //stores info for a raw connection.for client ,there is only one connection,for server there can be thousand of connection since server can
//handle multiple clients
{
@ -60,6 +109,7 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o
my_timer_t timer;
ip_port_t ip_port;
u64_t last_active_time;
stat_t stat;
conn_info_t()
{
}

View File

@ -47,7 +47,7 @@ int blob_encode_t::input(char *s,int len)
assert(current_len+len+sizeof(u16_t) +100<sizeof(input_buf));
assert(len<=65535&&len>=0);
counter++;
assert(counter<=max_fec_pending_packet_num);
assert(counter<=max_blob_packet_num);
write_u16(input_buf+current_len,len);
current_len+=sizeof(u16_t);
memcpy(input_buf+current_len,s,len);
@ -99,7 +99,7 @@ int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr)
if(parser_pos+(int)sizeof(u32_t)>current_len) {mylog(log_info,"failed 0\n");return -1;}
n=(int)read_u32(input_buf+parser_pos);
if(n>max_fec_pending_packet_num) {mylog(log_info,"failed 1\n");return -1;}
if(n>max_blob_packet_num) {mylog(log_info,"failed 1\n");return -1;}
s_arr=output_buf;
len_arr=output_len;
@ -146,7 +146,7 @@ int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pen
fec_pending_time=pending_time;
this->type=type;
assert(data_num+redundant_num<255);
assert(data_num+redundant_num<max_fec_packet_num);
counter=0;
blob_encode.clear();
ready_for_output=0;
@ -171,11 +171,11 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/)
its.it_value.tv_nsec=(tmp_time%1000000llu)*1000llu;
timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
}
if(type==0)
if(type==0)//for type 0 use blob
{
assert(blob_encode.input(s,len)==0);
}
else if(type==1)
else if(type==1)//for tpe 1 use input_buf and counter
{
mylog(log_trace,"counter=%d\n",counter);
assert(len<=65535&&len>=0);
@ -221,7 +221,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
if(type==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet
if(type==0) assert(counter<fec_pending_num);//counter cant = fec_pending_num,if that happens fec should already been done.
if(type==0) assert(counter<fec_pending_num);//counter will never equal fec_pending_num,if that happens fec should already been done.
if(type==1) assert(counter<fec_data_num);
@ -230,7 +230,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
append(s,len);
}
if(type==0&& counter==fec_pending_num) {about_to_fec=1;} //
if(type==0&& counter==fec_pending_num) about_to_fec=1;
if(type==1&& counter==fec_data_num) about_to_fec=1;
@ -408,27 +408,27 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
{
assert(counter>=1);
assert(counter<=255);
int buf_idx=counter-1;
int input_buf_idx=counter-1;
assert(ready_for_output==0);
ready_for_output=1;
first_packet_time_for_output=0;
output_n=1;
int tmp_idx=0;
write_u32(input_buf[buf_idx]+tmp_idx,seq);
write_u32(input_buf[input_buf_idx]+tmp_idx,seq);
tmp_idx+=sizeof(u32_t);
input_buf[buf_idx][tmp_idx++]=(unsigned char)type;
input_buf[buf_idx][tmp_idx++]=(unsigned char)0;
input_buf[buf_idx][tmp_idx++]=(unsigned char)0;
input_buf[buf_idx][tmp_idx++]=(unsigned char)((u32_t)buf_idx);
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)type;
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0;
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0;
input_buf[input_buf_idx][tmp_idx++]=(unsigned char)((u32_t)input_buf_idx);
output_len[0]=input_len[buf_idx]+tmp_idx;
output_buf[0]=input_buf[buf_idx];
output_len[0]=input_len[input_buf_idx]+tmp_idx;
output_buf[0]=input_buf[input_buf_idx];
if(0)
{
printf("seq=%u,buf_idx=%d\n",seq,buf_idx);
printf("seq=%u,buf_idx=%d\n",seq,input_buf_idx);
for(int j=0;j<output_len[0];j++)
{
log_bare(log_warn,"0x%02x,",(u32_t)(unsigned char)output_buf[0][j]);
@ -481,7 +481,15 @@ int fec_decode_manager_t::re_init()
int fec_decode_manager_t::input(char *s,int len)
{
assert(s!=0);
assert(len+100<buf_len);//guarenteed by upper level
int tmp_idx=0;
int tmp_header_len=sizeof(u32_t)+sizeof(char)*4;
if(len<tmp_header_len)
{
mylog(log_warn,"len =%d\n",len);
return -1;
}
u32_t seq=read_u32(s+tmp_idx);
tmp_idx+=sizeof(u32_t);
int type=(unsigned char)s[tmp_idx++];
@ -490,20 +498,21 @@ int fec_decode_manager_t::input(char *s,int len)
int inner_index=(unsigned char)s[tmp_idx++];
len=len-tmp_idx;
mylog(log_trace,"input\n");
assert(len+100<buf_len);
//mylog(log_trace,"input\n");
if(len<0)
{
mylog(log_warn,"len<0\n");
return -1;
}
if(type==1&&len<(int)sizeof(u16_t))
{
mylog(log_warn,"type==1&&len<2\n");
return -1;
}
if(type==1)
{
if(len<(int)sizeof(u16_t))
{
mylog(log_warn,"type==1&&len<2\n");
return -1;
}
if(data_num==0&&(int)( read_u16(s+tmp_idx)+sizeof(u16_t))!=len)
{
mylog(log_warn,"inner_index<data_num&&read_u16(s+tmp_idx)+sizeof(u16_t)!=len %d %d\n",(int)( read_u16(s+tmp_idx)+sizeof(u16_t)),len);
@ -511,9 +520,14 @@ int fec_decode_manager_t::input(char *s,int len)
}
}
if(type==0&&data_num==0)
{
mylog(log_warn,"unexpected type==0&&data_num==0\n");
return -1;
}
if(data_num+redundant_num>=max_fec_packet_num)
{
mylog(log_warn,"failed here\n");
mylog(log_warn,"data_num+redundant_num>=max_fec_packet_num\n");
return -1;
}
if(!anti_replay.is_vaild(seq))
@ -524,18 +538,11 @@ int fec_decode_manager_t::input(char *s,int len)
if(mp[seq].group_mp.find(inner_index)!=mp[seq].group_mp.end() )
{
mylog(log_debug,"dup fec index\n");
mylog(log_debug,"dup fec index\n");//duplicate can happen on a normal network, so its just log_debug
return -1;
}
if(type==0&&data_num==0)
{
mylog(log_warn,"unexpected type==0&&data_num==0\n");
return -1;
}
int ok=1;
if(mp[seq].type==-1)
mp[seq].type=type;
else
@ -543,7 +550,7 @@ int fec_decode_manager_t::input(char *s,int len)
if(mp[seq].type!=type)
{
mylog(log_warn,"type mismatch\n");
ok=0;
return -1;
}
}
@ -561,18 +568,12 @@ int fec_decode_manager_t::input(char *s,int len)
{
if(mp[seq].data_num!=data_num||mp[seq].redundant_num!=redundant_num||mp[seq].len!=len)
{
mylog(log_warn,"unexpected here\n");
ok=0;
mylog(log_warn,"unexpected mp[seq].data_num!=data_num||mp[seq].redundant_num!=redundant_num||mp[seq].len!=len\n");
return -1;
}
}
}
if(ok==0)
{
mylog(log_warn,"fec packets invaild\n");
return -1;
}
if(fec_data[index].used!=0)
{
u32_t tmp_seq=fec_data[index].seq;
@ -607,7 +608,13 @@ int fec_decode_manager_t::input(char *s,int len)
int about_to_fec=0;
if(type==0)
{
assert((int)inner_mp.size()<=data_num);
//assert((int)inner_mp.size()<=data_num);
if((int)inner_mp.size()>data_num)
{
mylog(log_warn,"inner_mp.size()>data_num\n");
anti_replay.set_invaild(seq);
goto end;
}
if((int)inner_mp.size()==data_num)
about_to_fec=1;
}
@ -615,6 +622,12 @@ int fec_decode_manager_t::input(char *s,int len)
{
if(mp[seq].data_num!=-1)
{
if((int)inner_mp.size()>data_num+1)
{
mylog(log_warn,"inner_mp.size()>data_num+1\n");
anti_replay.set_invaild(seq);
goto end;
}
if((int)inner_mp.size()>=mp[seq].data_num)
{
about_to_fec=1;
@ -637,17 +650,25 @@ int fec_decode_manager_t::input(char *s,int len)
fec_tmp_arr[it->first]=fec_data[it->second].buf;
}
assert(rs_decode2(group_data_num,group_data_num+group_redundant_num,fec_tmp_arr,len)==0); //the input data has been modified in-place
//this line should always succeed
blob_decode.clear();
for(int i=0;i<group_data_num;i++)
{
blob_decode.input(fec_tmp_arr[i],len);
}
blob_decode.output(output_n,output_s_arr,output_len_arr);
if(blob_decode.output(output_n,output_s_arr,output_len_arr)==0)
{
mylog(log_warn,"blob_decode failed\n");
//ready_for_output=0;
anti_replay.set_invaild(seq);
goto end;
}
assert(ready_for_output==0);
ready_for_output=1;
anti_replay.set_invaild(seq);
}
else
else//type==1
{
@ -679,7 +700,6 @@ int fec_decode_manager_t::input(char *s,int len)
if(fec_data[it->second].len > max_len)
max_len=fec_data[it->second].len;
}
if(max_len!=mp[seq].len)
{
data_check_ok=0;
@ -687,13 +707,15 @@ int fec_decode_manager_t::input(char *s,int len)
}
if(data_check_ok==0)
{
ready_for_output=0;
//ready_for_output=0;
mylog(log_warn,"data_check_ok==0\n");
anti_replay.set_invaild(seq);
goto end;
}
for(auto it=inner_mp.begin();it!=inner_mp.end();it++)
{
int tmp_idx=it->second;
assert(max_len>=fec_data[tmp_idx].len);//guarenteed by data_check_ok
memset(fec_data[tmp_idx].buf+fec_data[tmp_idx].len,0,max_len-fec_data[tmp_idx].len);
}
@ -706,7 +728,7 @@ int fec_decode_manager_t::input(char *s,int len)
}
mylog(log_trace,"fec done,%d %d,missed_packet_counter=%d\n",group_data_num,group_redundant_num,missed_packet_counter);
assert(rs_decode2(group_data_num,group_data_num+group_redundant_num,output_s_arr_buf,max_len)==0);
assert(rs_decode2(group_data_num,group_data_num+group_redundant_num,output_s_arr_buf,max_len)==0);//this should always succeed
for(int i=0;i<group_data_num;i++)
{
@ -751,9 +773,9 @@ int fec_decode_manager_t::input(char *s,int len)
ready_for_output=0;
}
anti_replay.set_invaild(seq);
}
}// end of type==1
}
else
else //not about_to_fec
{
if(decode_fast_send)

View File

@ -12,10 +12,11 @@
#include "log.h"
#include "lib/rs.h"
const int max_fec_pending_packet_num=1000;
const int max_fec_packet_num=255;
const u32_t anti_replay_buff_size=10000;
const u32_t fec_buff_num=1000;
const int max_blob_packet_num=20000;//how many packet can be contain in a blob_t ,can be set very large
const u32_t anti_replay_buff_size=20000;//can be set very large
const int max_fec_packet_num=255;// this is the limitation of the rs lib
const u32_t fec_buff_num=2000;// how many packet can fec_decode_manager hold. shouldnt be very large,or it will cost huge memory
struct anti_replay_t
@ -83,8 +84,8 @@ struct blob_decode_t
int last_len;
int counter;
char *output_buf[max_fec_pending_packet_num+100];
int output_len[max_fec_pending_packet_num+100];
char *output_buf[max_blob_packet_num+100];
int output_len[max_blob_packet_num+100];
blob_decode_t();
int clear();

View File

@ -50,8 +50,8 @@ u32_t local_ip_uint32,remote_ip_uint32=0;
char local_ip[100], remote_ip[100];
int local_port = -1, remote_port = -1;
u64_t last_report_time=0;
int report_interval=0;
//u64_t last_report_time=0;
conn_manager_t conn_manager;
delay_manager_t delay_manager;
@ -144,9 +144,15 @@ int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,ch
//static int counter=0;
out_delay=out_delay_buf;
//out_len=out_len_buf;
inner_stat_t &inner_stat=conn_info.stat.normal_to_fec;
if(disable_fec)
{
assert(data!=0);
inner_stat.input_packet_num++;
inner_stat.input_packet_size+=len;
inner_stat.output_packet_num++;
inner_stat.output_packet_size+=len;
if(data==0) return 0;
out_n=1;
static char *data_static;
@ -156,9 +162,15 @@ int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,ch
out_arr=&data_static;
out_len=&len_static;
out_delay[0]=0;
}
else
{
if(data!=0)
{
inner_stat.input_packet_num++;
inner_stat.input_packet_size+=len;
}
//counter++;
conn_info.fec_encode_manager.input(data,len);
@ -177,9 +189,10 @@ int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,ch
{
my_time_t common_latency=0;
my_time_t first_packet_time=conn_info.fec_encode_manager.get_first_packet_time();
my_time_t current_time=get_current_time_us();
if(fix_latency==1&&first_packet_time!=0)
{
my_time_t current_time=get_current_time_us();
my_time_t tmp;
if((my_time_t)fec_pending_time >=(current_time - first_packet_time))
{
@ -209,7 +222,11 @@ int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,ch
}
for(int i=0;i<out_n;i++)
{
inner_stat.output_packet_num++;
inner_stat.output_packet_size+=out_len[i];
log_bare(log_trace,"%d ",out_len[i]);
}
log_bare(log_trace,"\n");
@ -223,10 +240,17 @@ int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,ch
}
int from_fec_to_normal(conn_info_t & conn_info,char *data,int len,int & out_n,char **&out_arr,int *&out_len,my_time_t *&out_delay)
{
static my_time_t out_delay_buf[max_fec_pending_packet_num+100]={0};
static my_time_t out_delay_buf[max_blob_packet_num+100]={0};
out_delay=out_delay_buf;
inner_stat_t &inner_stat=conn_info.stat.fec_to_normal;
if(disable_fec)
{
assert(data!=0);
inner_stat.input_packet_num++;
inner_stat.input_packet_size+=len;
inner_stat.output_packet_num++;
inner_stat.output_packet_size+=len;
if(data==0) return 0;
out_n=1;
static char *data_static;
@ -240,6 +264,12 @@ int from_fec_to_normal(conn_info_t & conn_info,char *data,int len,int & out_n,ch
else
{
if(data!=0)
{
inner_stat.input_packet_num++;
inner_stat.input_packet_size+=len;
}
conn_info.fec_decode_manager.input(data,len);
//int n;char ** s_arr;int* len_arr;
@ -247,8 +277,12 @@ int from_fec_to_normal(conn_info_t & conn_info,char *data,int len,int & out_n,ch
for(int i=0;i<out_n;i++)
{
out_delay_buf[i]=0;
inner_stat.output_packet_num++;
inner_stat.output_packet_size+=out_len[i];
}
}
mylog(log_trace,"from_fec_to_normal input_len=%d,output_n=%d,input_seq=%u\n",len,out_n,read_u32(data));
@ -368,6 +402,8 @@ int client_event_loop()
conn_info.conv_manager.clear_inactive();
mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n");
conn_info.stat.report_as_client();
if(debug_force_flush_fec)
{
int out_n;char **out_arr;int *out_len;my_time_t *out_delay;
@ -835,6 +871,8 @@ int server_event_loop()
{
from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
}
conn_info.stat.report_as_server(ip_port);
continue;
}
else
@ -1166,6 +1204,8 @@ void process_arg(int argc, char *argv[])
{"sock-buf", required_argument, 0, 1},
{"random-drop", required_argument, 0, 1},
{"report", required_argument, 0, 1},
{"delay-capacity", required_argument, 0, 1},
{"mtu", required_argument, 0, 'm'},
{NULL, 0, 0, 0}
};
int option_index = 0;
@ -1312,9 +1352,9 @@ void process_arg(int argc, char *argv[])
else
{
sscanf(optarg,"%d:%d\n",&fec_data_num,&fec_redundant_num);
if(fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>255)
if(fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>254)
{
mylog(log_fatal,"fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>255\n");
mylog(log_fatal,"fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>254\n");
myexit(-1);
}
}
@ -1329,9 +1369,9 @@ void process_arg(int argc, char *argv[])
break;
case 'm':
sscanf(optarg,"%d",&fec_mtu);
if(fec_mtu<500||fec_mtu>1600)
if(fec_mtu<100||fec_mtu>2000)
{
mylog(log_fatal,"fec_mtu should be between 500 and 1600\n");
mylog(log_fatal,"fec_mtu should be between 100 and 2000\n");
myexit(-1);
}
break;
@ -1452,6 +1492,16 @@ void process_arg(int argc, char *argv[])
myexit(-1);
}
}
else if(strcmp(long_options[option_index].name,"delay-capacity")==0)
{
sscanf(optarg,"%d",&delay_capacity);
if(delay_capacity<0)
{
mylog(log_fatal,"delay_capacity must be >=0 \n");
myexit(-1);
}
}
else if(strcmp(long_options[option_index].name,"report")==0)
{
sscanf(optarg,"%d",&report_interval);
@ -1541,7 +1591,7 @@ int main(int argc, char *argv[])
int i, j, k;
process_arg(argc,argv);
delay_manager.set_capacity(max_pending_packet);
delay_manager.set_capacity(delay_capacity);
local_ip_uint32=inet_addr(local_ip);
remote_ip_uint32=inet_addr(remote_ip);