diff --git a/connection.h b/connection.h index a4302d0..f8a9076 100644 --- a/connection.h +++ b/connection.h @@ -15,6 +15,7 @@ extern int disable_anti_replay; #include "log.h" #include "delay_manager.h" #include "fd_manager.h" +#include "fec_manager.h" /* @@ -67,7 +68,8 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o //handle multiple clients { conv_manager_t conv_manager; - //anti_replay_t anti_replay; + fec_encode_manager_t fec_encode_manager; + fec_decode_manager_t fec_decode_manager; fd64_t timer_fd; ip_port_t ip_port; };//g_conn_info; diff --git a/fec_manager.cpp b/fec_manager.cpp index 2506573..559bd8b 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -9,8 +9,6 @@ #include "log.h" #include "common.h" #include "lib/rs.h" -u32_t seq=0; - blob_encode_t::blob_encode_t() @@ -66,7 +64,6 @@ int blob_encode_t::output(int n,char ** &s_arr,int & len) blob_decode_t::blob_decode_t() { clear(); - } int blob_decode_t::clear() { @@ -118,46 +115,50 @@ int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr) fec_encode_manager_t::fec_encode_manager_t() { - re_init(); + re_init(4,2,1200); } -int fec_encode_manager_t::re_init() +int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu) { - fec_data_num=4; - fec_redundant_num=2; - fec_mtu=1200; + fec_data_num=data_num; + fec_redundant_num=redundant_num; + fec_mtu=mtu; counter=0; blob_encode.clear(); ready_for_output=0; + seq=0; return 0; } -int fec_encode_manager_t::input(char *s,int len,int &is_first_packet) +int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) { - is_first_packet=0; if(s==0 ||blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) { char ** blob_output; int blob_len; - assert(counter!=0); + if(counter==0) + { + if(s==0) return 0;//relax this restriction temporarily + else mylog(log_warn,"message too long,ignored\n"); + } blob_encode.output(fec_data_num,blob_output,blob_len); for(int i=0;i255) + { + return -1; + } + if(!anti_replay.is_vaild(seq)) + { + return 0; + } + if(!mp[seq].empty()) + { + int first_idx=mp[seq].begin()->second; + int ok=1; + if(fec_data[first_idx].data_num!=data_num) + ok=0; + if(fec_data[first_idx].redundant_num!=redundant_num) + ok=0; + if(fec_data[first_idx].len!=len) + ok=0; + if(ok==0) + { + return 0; + } + } + if(fec_data[index].used!=0) + { + int tmp_seq=fec_data[index].seq; + anti_replay.set_invaild(tmp_seq); + if(mp.find(tmp_seq)!=mp.end()) + { + mp.erase(tmp_seq); + } + } + fec_data[index].used=1; + fec_data[index].seq=seq; + fec_data[index].type=type; + fec_data[index].data_num=data_num; + fec_data[index].redundant_num=redundant_num; + fec_data[index].idx=inner_index; + fec_data[index].len=len; + memcpy(fec_data[index].buf,s+tmp_idx,len); + mp[seq][inner_index]=index; + + index++; + if(index==int(anti_replay_buff_size)) index=0; + + map &inner_mp=mp[seq]; + assert((int)inner_mp.size()<=data_num); + if((int)inner_mp.size()==data_num) + { + + char *fec_tmp_arr[256+5]={0}; + for(auto it=inner_mp.begin();it!=inner_mp.end();it++) + { + fec_tmp_arr[it->first]=fec_data[it->second].buf; + } + rs_decode2(data_num,data_num+redundant_num,fec_tmp_arr,len); //the input data has been modified in-place + blob_decode.clear(); + for(int i=0;i > mp; blob_decode_t blob_decode; - fec_decode_manager_t() - { - for(int i=0;i<(int)fec_buff_size;i++) - fec_data[i].used=0; - ready_for_output=0; - } - int output_n; char ** output_s_arr; int * output_len_arr; - int ready_for_output; - int input(char *s,int len) - { - char *ori_s=s; - u32_t seq=read_u32(s); - s+=sizeof(u32_t); - int data_num=(unsigned char)*(s++); - int redundant_num=(unsigned char)*(s++); - int innder_index=(unsigned char)*(s++); - int type=(unsigned char)*(s++); - len=len-int(s-ori_s); - if(len<0) - { - return -1; - } - - if(!anti_replay.is_vaild(seq)) - { - return 0; - } - if(!mp[seq].empty()) - { - int tmp_idx=mp[seq].begin()->second; - int ok=1; - if(data_num+redundant_num>255) - ok=0; - if(fec_data[tmp_idx].data_num!=data_num||fec_data[tmp_idx].redundant_num!=redundant_num||fec_data[tmp_idx].len!=len) - { - ok=0; - } - if(ok==0) - { - return 0; - } - } - if(fec_data[index].used!=0) - { - int tmp_seq=fec_data[index].seq; - anti_replay.set_invaild(tmp_seq); - if(mp.find(tmp_seq)!=mp.end()) - { - mp.erase(tmp_seq); - } - } - - fec_data[index].used=1; - fec_data[index].seq=seq; - fec_data[index].data_num=data_num; - fec_data[index].redundant_num=redundant_num; - fec_data[index].idx=innder_index; - fec_data[index].type=type; - fec_data[index].len=len; - mp[seq][innder_index]=index; - - - map &inner_mp=mp[seq]; - if((int)inner_mp.size()>=data_num) - { - anti_replay.set_invaild(seq); - char *fec_tmp_arr[256+5]={0}; - for(auto it=inner_mp.begin();it!=inner_mp.end();it++) - { - fec_tmp_arr[it->first]=fec_data[it->second].buf; - } - rs_decode2(data_num,redundant_num,fec_tmp_arr,len); - blob_decode.clear(); - for(int i=0;i",n); + for(int i=0;i\n",s_arr[i]); + } + //my_send(dest,data,len); return 0; } int client_event_loop() @@ -125,10 +160,12 @@ int client_event_loop() int remote_fd; fd64_t remote_fd64; - conn_info_t conn_info; - + conn_info_t *conn_info_p=new conn_info_t; + conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack init_listen_socket(); + conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu); + epoll_fd = epoll_create1(0); const int max_events = 4096; @@ -231,8 +268,12 @@ int client_event_loop() dest_t dest; dest.type=type_fd64_conv; dest.inner.fd64=remote_fd64; + + //char * new_data; + //int new_len; + //put_conv(conv,data,data_len,new_data,new_len); dest.conv=conv; - from_normal_to_fec(dest,data,data_len); + from_normal_to_fec(conn_info,dest,data,data_len); //my_send(dest,data,data_len); } else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) { @@ -277,7 +318,7 @@ int client_event_loop() dest_t dest; dest.inner.ip_port.from_u64(u64); dest.type=type_ip_port; - from_fec_to_normal(dest,new_data,new_len); + from_fec_to_normal(conn_info,dest,new_data,new_len); mylog(log_trace,"[%s] send packet\n",dest.inner.ip_port.to_s()); } @@ -311,7 +352,7 @@ int server_event_loop() int epoll_fd; int remote_fd; - conn_info_t conn_info; +// conn_info_t conn_info; init_listen_socket(); @@ -397,10 +438,12 @@ int server_event_loop() { conn_manager.insert(ip_port); conn_info_t &conn_info=conn_manager.find(ip_port); + conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu); conn_info.conv_manager.reserve(); } conn_info_t &conn_info=conn_manager.find(ip_port); + u32_t conv; char *new_data; int new_len; @@ -438,7 +481,7 @@ int server_event_loop() dest_t dest; dest.type=type_fd64; dest.inner.fd64=fd64; - from_fec_to_normal(dest,new_data,new_len); + from_fec_to_normal(conn_info,dest,new_data,new_len); //int fd = int((u64 << 32u) >> 32u); //////////////////////////////todo @@ -531,7 +574,12 @@ int server_event_loop() dest.type=type_ip_port_conv; dest.conv=conv; dest.inner.ip_port=ip_port; - from_normal_to_fec(dest,data,data_len); + + //char * new_data; + //int new_len; + //put_conv(conv,data,data_len,new_data,new_len); + + from_normal_to_fec(conn_info,dest,data,data_len); mylog(log_trace,"[%s] send packet\n",ip_port.to_s()); } @@ -619,9 +667,83 @@ int unit_test() printf("<%d:%s>",len_arr[i],buf); } printf("\n"); + static fec_encode_manager_t fec_encode_manager; + static fec_decode_manager_t fec_decode_manager; + + { + + string a = "11111"; + string b = "22"; + string c = "33333333"; + + fec_encode_manager.input((char *) a.c_str(), a.length()); + fec_encode_manager.input((char *) b.c_str(), b.length()); + fec_encode_manager.input((char *) c.c_str(), c.length()); + fec_encode_manager.input(0, 0); + + int n; + char **s_arr; + int len; + + + fec_encode_manager.output(n,s_arr,len); + printf("",n,len); + + for(int i=0;i",n); + for(int i=0;i\n",s_arr[i]); + } + } + + + } + + { + string a = "aaaaaaa"; + string b = "bbbbbbbbbbbbb"; + string c = "ccc"; + + fec_encode_manager.input((char *) a.c_str(), a.length()); + fec_encode_manager.input((char *) b.c_str(), b.length()); + fec_encode_manager.input((char *) c.c_str(), c.length()); + fec_encode_manager.input(0, 0); + + int n; + char **s_arr; + int len; + + + fec_encode_manager.output(n,s_arr,len); + printf("",n,len); + + for(int i=0;i",n); + for(int i=0;i\n",s_arr[i]); + } + } + } - fec_encode_manager_t fec_encode_manager; - fec_decode_manager_t fec_decode_manager; return 0; } diff --git a/makefile b/makefile index bd35cf2..ef08b3b 100755 --- a/makefile +++ b/makefile @@ -17,10 +17,7 @@ TAR=${NAME}_binaries.tar.gz `echo ${TARGETS}|sed -r 's/([^ ]+)/speeder_\1/g'` all:git_version rm -f ${NAME} - ${cc_local} -o ${NAME} -I. ${SOURCES} ${FLAGS} -lrt -ggdb -static -O3 -fast: git_version - rm -f ${NAME} - ${cc_local} -o ${NAME} -I. ${SOURCES} ${FLAGS} -lrt -ggdb + ${cc_local} -o ${NAME} -I. ${SOURCES} ${FLAGS} -lrt -ggdb -static debug: git_version rm -f ${NAME} ${cc_local} -o ${NAME} -I. ${SOURCES} ${FLAGS} -lrt -Wformat-nonliteral -D MY_DEBUG