diff --git a/fec_manager.cpp b/fec_manager.cpp index 559bd8b..2515c65 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -9,7 +9,7 @@ #include "log.h" #include "common.h" #include "lib/rs.h" - +#include "fd_manager.h" blob_encode_t::blob_encode_t() { @@ -38,10 +38,10 @@ int blob_encode_t::get_shard_len(int n,int next_packet_len) int blob_encode_t::input(char *s,int len) { - assert(current_len+len+sizeof(u16_t) <=256*buf_len); + assert(current_len+len+sizeof(u16_t) <=max_fec_packet_num*buf_len); assert(len<=65535&&len>=0); counter++; - assert(counter<=max_packet_num); + assert(counter<=max_normal_packet_num); write_u16(buf+current_len,len); current_len+=sizeof(u16_t); memcpy(buf+current_len,s,len); @@ -51,7 +51,6 @@ int blob_encode_t::input(char *s,int len) int blob_encode_t::output(int n,char ** &s_arr,int & len) { - static char *output_arr[256+100]; len=round_up_div(current_len,n); write_u32(buf,counter); for(int i=0;icurrent_len) return -1; n=(int)read_u32(buf+parser_pos); - if(n>max_packet_num) {mylog(log_info,"failed 1\n");return -1;} + if(n>max_normal_packet_num) {mylog(log_info,"failed 1\n");return -1;} s_arr=s_buf; len_arr=len_buf; @@ -115,23 +112,46 @@ int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr) fec_encode_manager_t::fec_encode_manager_t() { - re_init(4,2,1200); + //int timer_fd; + if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0) + { + mylog(log_fatal,"timer_fd create error"); + myexit(1); + } + timer_fd64=fd_manager.create(timer_fd); + + re_init(4,2,1200,100,10000); } -int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu) +fec_encode_manager_t::~fec_encode_manager_t() +{ + fd_manager.close(timer_fd64); +} +u64_t fec_encode_manager_t::get_timer_fd64() +{ + return timer_fd64; +} +int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time) { fec_data_num=data_num; fec_redundant_num=redundant_num; fec_mtu=mtu; + fec_pending_num=pending_num; counter=0; blob_encode.clear(); ready_for_output=0; seq=0; + + itimerspec zero_its; + memset(&zero_its, 0, sizeof(zero_its)); + + timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0); + return 0; } int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) { - if(s==0 ||blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) + if(s==0 ||blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu||counter>=fec_pending_num) { char ** blob_output; int blob_len; @@ -226,7 +246,7 @@ int fec_decode_manager_t::input(char *s,int len) { return -1; } - if(data_num+redundant_num>255) + if(data_num+redundant_num>max_fec_packet_num) { return -1; } @@ -277,7 +297,7 @@ int fec_decode_manager_t::input(char *s,int len) if((int)inner_mp.size()==data_num) { - char *fec_tmp_arr[256+5]={0}; + char *fec_tmp_arr[max_fec_packet_num+5]={0}; for(auto it=inner_mp.begin();it!=inner_mp.end();it++) { fec_tmp_arr[it->first]=fec_data[it->second].buf; diff --git a/fec_manager.h b/fec_manager.h index 79fad39..b064d9c 100644 --- a/fec_manager.h +++ b/fec_manager.h @@ -12,8 +12,8 @@ #include "log.h" #include "lib/rs.h" -const int max_packet_num=1000; - +const int max_normal_packet_num=1000; +const int max_fec_packet_num=255; const u32_t anti_replay_buff_size=30000; const u32_t fec_buff_size=3000; @@ -58,10 +58,13 @@ struct anti_replay_t struct blob_encode_t { - char buf[(256+5)*buf_len]; + char buf[(max_fec_packet_num+5)*buf_len]; int current_len; int counter; + + char *output_arr[max_fec_packet_num+100]; + blob_encode_t(); int clear(); @@ -76,11 +79,14 @@ struct blob_encode_t struct blob_decode_t { - char buf[(256+5)*buf_len]; + char buf[(max_fec_packet_num+5)*buf_len]; int current_len; int last_len; int counter; + char *s_buf[max_normal_packet_num+100]; + int len_buf[max_normal_packet_num+100]; + blob_decode_t(); int clear(); int input(char *input,int len); @@ -91,17 +97,23 @@ class fec_encode_manager_t { int fec_data_num,fec_redundant_num; int fec_mtu; - char buf[256+5][buf_len+100]; - char *output_buf[256+5]; + int fec_pending_num; + int fec_pending_time; + char buf[max_fec_packet_num+5][buf_len+100]; + char *output_buf[max_fec_packet_num+5]; int output_len; int ready_for_output; u32_t seq; int counter; + int timer_fd; + u64_t timer_fd64; blob_encode_t blob_encode; public: fec_encode_manager_t(); - int re_init(int data_num,int redundant_num,int mtu); + + u64_t get_timer_fd64(); + int re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time); int input(char *s,int len/*,int &is_first_packet*/); int output(int &n,char ** &s_arr,int &len); }; @@ -124,6 +136,7 @@ class fec_decode_manager_t unordered_map > mp; blob_decode_t blob_decode; + int output_n; char ** output_s_arr; int * output_len_arr; diff --git a/main.cpp b/main.cpp index ae9a26a..7bc1b59 100644 --- a/main.cpp +++ b/main.cpp @@ -30,7 +30,8 @@ int mtu_warn=1350; int fec_data_num=3; int fec_redundant_num=2; int fec_mtu=30; - +int fec_pending_num=5; +int fec_pending_time=10000; u32_t local_ip_uint32,remote_ip_uint32=0; char local_ip[100], remote_ip[100]; int local_port = -1, remote_port = -1; @@ -110,44 +111,95 @@ int delay_send(my_time_t delay,const dest_t &dest,char *data,int len) { return delay_manager.add(delay,dest,data,len);; } -int from_normal_to_fec(conn_info_t & conn_info,const dest_t &dest,char *data,int len) +int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,char **&out_arr,int *&out_len,int *&out_delay) { + static int out_delay_buf[max_fec_packet_num+100]={0}; + static int out_len_buf[max_fec_packet_num+100]={0}; static int counter=0; + out_delay=out_delay_buf; + out_len=out_len_buf; + + if(0) + { + if(data==0) return 0; + out_n=1; + static char *data_static; + data_static=data; + static int len_static; + len_static=len; + out_arr=&data_static; + out_len=&len_static; + } + else + { counter++; conn_info.fec_encode_manager.input(data,len); + //if(counter%5==0) //conn_info.fec_encode_manager.input(0,0); - int n; - char **s_arr; - int s_arr_len; + //int n; + //char **s_arr; + //int s_len; - conn_info.fec_encode_manager.output(n,s_arr,s_arr_len); + int tmp_out_len; + conn_info.fec_encode_manager.output(out_n,out_arr,tmp_out_len); - for(int i=0;i",n); + /* for(int i=0;i\n",s_arr[i]); - } + }*/ //my_send(dest,data,len); return 0; } @@ -164,7 +216,7 @@ int client_event_loop() conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack init_listen_socket(); - conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu); + conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time); epoll_fd = epoll_create1(0); @@ -203,6 +255,11 @@ int client_event_loop() myexit(-1); } + u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64(); + ev.events = EPOLLIN; + ev.data.u64 = fd64; + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev); + while(1)//////////////////////// { if(about_to_exit) myexit(0); @@ -266,14 +323,20 @@ int client_event_loop() dest_t dest; - dest.type=type_fd64_conv; + dest.type=type_fd64; dest.inner.fd64=remote_fd64; - //char * new_data; - //int new_len; - //put_conv(conv,data,data_len,new_data,new_len); - dest.conv=conv; - from_normal_to_fec(conn_info,dest,data,data_len); + char * new_data; + int new_len; + put_conv(conv,data,data_len,new_data,new_len); + + int out_n;char **out_arr;int *out_len;int *out_delay; + //dest.conv=conv; + from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay); + for(int i=0;i%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); } - u32_t conv; - char *new_data; - int new_len; - if(get_conv(conv,data,data_len,new_data,new_len)!=0) - continue; - if(!conn_info.conv_manager.is_conv_used(conv))continue; - u64_t u64=conn_info.conv_manager.find_u64_by_conv(conv); - dest_t dest; - dest.inner.ip_port.from_u64(u64); - dest.type=type_ip_port; - from_fec_to_normal(conn_info,dest,new_data,new_len); - mylog(log_trace,"[%s] send packet\n",dest.inner.ip_port.to_s()); + + + int out_n;char **out_arr;int *out_len;int *out_delay; + from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay); + + for(int i=0;i> 32u); //////////////////////////////todo @@ -531,6 +617,7 @@ int server_event_loop() { char data[buf_len]; int data_len; + u32_t conv; fd64_t fd64=events[idx].data.u64; if(!fd_manager.exist(fd64)) //fd64 has been closed { @@ -550,7 +637,7 @@ int server_event_loop() assert(conn_info.conv_manager.is_u64_used(fd64)); - u32_t conv=conn_info.conv_manager.find_conv_by_u64(fd64); + conv=conn_info.conv_manager.find_conv_by_u64(fd64); int fd=fd_manager.to_fd(fd64); data_len=recv(fd,data,max_data_len,0); @@ -571,16 +658,22 @@ int server_event_loop() } dest_t dest; - dest.type=type_ip_port_conv; - dest.conv=conv; + dest.type=type_ip_port; + //dest.conv=conv; dest.inner.ip_port=ip_port; - //char * new_data; - //int new_len; - //put_conv(conv,data,data_len,new_data,new_len); + char * new_data; + int new_len; + put_conv(conv,data,data_len,new_data,new_len); - from_normal_to_fec(conn_info,dest,data,data_len); - mylog(log_trace,"[%s] send packet\n",ip_port.to_s()); + int out_n;char **out_arr;int *out_len;int *out_delay; + + from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay); + for(int i=0;i