From a71576c1805de0900d351e34906aee8bae5e0c92 Mon Sep 17 00:00:00 2001 From: wangyu- Date: Wed, 27 Sep 2017 10:47:32 -0500 Subject: [PATCH] just commit --- common.cpp | 2 +- connection.cpp | 35 +++++++++++++--------- connection.h | 16 ++++------ delay_manager.cpp | 15 ++++++---- delay_manager.h | 4 +-- fd_manager.cpp | 11 ++++--- fd_manager.h | 2 +- lib/rs.c | 28 ++++++++++++++++++ lib/rs.h | 4 +++ main.cpp | 75 ++++++++++++++++++++++++++++++++++++++--------- packet.cpp | 2 +- packet.h | 2 +- 12 files changed, 140 insertions(+), 56 deletions(-) diff --git a/common.cpp b/common.cpp index c4d3c6e..f999390 100644 --- a/common.cpp +++ b/common.cpp @@ -22,7 +22,7 @@ char iptables_rule[200]=""; program_mode_t program_mode=unset_mode;//0 unset; 1client 2server -u64_t get_current_time() +u64_t get_current_time()//ms { timespec tmp_time; clock_gettime(CLOCK_MONOTONIC, &tmp_time); diff --git a/connection.cpp b/connection.cpp index c357e43..3c6b11c 100644 --- a/connection.cpp +++ b/connection.cpp @@ -92,9 +92,13 @@ conv_manager_t::~conv_manager_t() } int conv_manager_t::insert_conv(u32_t conv,u64_t u64)//////todo add capacity { + int bucket_size_before=conv_last_active_time.bucket_count(); u64_to_conv[u64]=conv; conv_to_u64[conv]=u64; conv_last_active_time[conv]=get_current_time(); + int bucket_size_after=conv_last_active_time.bucket_count(); + if(bucket_size_after!=bucket_size_before) + clear_it=conv_last_active_time.begin(); return 0; } int conv_manager_t::erase_conv(u32_t conv) @@ -168,13 +172,13 @@ conv_manager_t::~conv_manager_t() } conn_manager_t::conn_manager_t() { - ready_num=0; + //ready_num=0; mp.reserve(100007); //fd64_mp.reserve(100007); clear_it=mp.begin(); last_clear_time=0; } - int conn_manager_t::exist_ip_port(ip_port_t ip_port) + int conn_manager_t::exist(ip_port_t ip_port) { u64_t u64=ip_port.to_u64(); if(mp.find(u64)!=mp.end()) @@ -193,27 +197,30 @@ conv_manager_t::~conv_manager_t() mp[u64]; return 0; }*/ - conn_info_t *& conn_manager_t::find_insert_p(ip_port_t ip_port) //todo capacity + + conn_info_t *& conn_manager_t::find_p(ip_port_t ip_port) //todo capacity //be aware,the adress may change after rehash { + assert(exist(ip_port)); u64_t u64=ip_port.to_u64(); - unordered_map::iterator it=mp.find(u64); - if(it==mp.end()) - { - mp[u64]=new conn_info_t; - } return mp[u64]; } - conn_info_t & conn_manager_t::find_insert(ip_port_t ip_port) //be aware,the adress may change after rehash + conn_info_t & conn_manager_t::find(ip_port_t ip_port) //be aware,the adress may change after rehash { + assert(exist(ip_port)); u64_t u64=ip_port.to_u64(); - unordered_map::iterator it=mp.find(u64); - if(it==mp.end()) - { - mp[u64]=new conn_info_t; - } return *mp[u64]; } + int conn_manager_t::insert(ip_port_t ip_port) + { + assert(!exist(ip_port)); + int bucket_size_before=mp.bucket_count(); + mp[ip_port.to_u64()]=new conn_info_t; + int bucket_size_after=mp.bucket_count(); + if(bucket_size_after!=bucket_size_before) + clear_it=mp.begin(); + return 0; + } /* int conn_manager_t::exist_fd64(fd64_t fd64) { diff --git a/connection.h b/connection.h index 72490f1..a4302d0 100644 --- a/connection.h +++ b/connection.h @@ -75,27 +75,21 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o struct conn_manager_t //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections { - u32_t ready_num; - -// unordered_map fd64_mp; unordered_map mp;// to conn_info_t; - //put it at end so that it de-consturcts first - unordered_map::iterator clear_it; - long long last_clear_time; conn_manager_t(); - int exist_ip_port(ip_port_t); - conn_info_t *& find_insert_p(ip_port_t); //be aware,the adress may change after rehash - conn_info_t & find_insert(ip_port_t) ; //be aware,the adress may change after rehash + int exist(ip_port_t); + conn_info_t *& find_p(ip_port_t); //be aware,the adress may change after rehash + conn_info_t & find(ip_port_t) ; //be aware,the adress may change after rehash + int insert(ip_port_t); /* int exist_fd64(fd64_t fd64); void insert_fd64(fd64_t fd64,ip_port_t); ip_port_t find_by_fd64(fd64_t fd64);*/ - - int erase(unordered_map::iterator erase_it); +int erase(unordered_map::iterator erase_it); int clear_inactive(); int clear_inactive0(); diff --git a/delay_manager.cpp b/delay_manager.cpp index 1a8360e..5d6190e 100644 --- a/delay_manager.cpp +++ b/delay_manager.cpp @@ -34,14 +34,19 @@ delay_manager_t::~delay_manager_t() { //TODO ,we currently dont need to deconstruct it } -/* + int delay_manager_t::get_timer_fd() { - return delay_timer_fd; -}*/ - -int delay_manager_t::add(my_time_t delay,delay_data_t &delay_data) + return timer_fd; +} +//int add(my_time_t delay,const dest_t &dest,const char *data,int len); +int delay_manager_t::add(my_time_t delay,const dest_t &dest,char *data,int len) { + delay_data_t delay_data; + delay_data.dest=dest; + delay_data.data=data; + delay_data.len=len; + if(capacity!=0&&int(delay_mp.size()) >=capacity) { mylog(log_warn,"max pending packet reached,ignored\n"); diff --git a/delay_manager.h b/delay_manager.h index 93afdcc..d137395 100644 --- a/delay_manager.h +++ b/delay_manager.h @@ -43,9 +43,9 @@ struct delay_manager_t multimap delay_mp; //unit us,1 us=0.001ms delay_manager_t(); ~delay_manager_t(); - //int get_timer_fd(); + int get_timer_fd(); int check(); - int add(my_time_t delay,delay_data_t &delay_data); + int add(my_time_t delay,const dest_t &dest,char *data,int len); }; #endif /* DELAY_MANAGER_H_ */ diff --git a/fd_manager.cpp b/fd_manager.cpp index 99ee673..cf48d44 100644 --- a/fd_manager.cpp +++ b/fd_manager.cpp @@ -48,12 +48,11 @@ void fd_manager_t::close(fd64_t fd64) close(fd); //return 0; } -void fd_manager_t::reserve() +void fd_manager_t::reserve(int n) { - fd_to_fd64_mp.reserve(10007); - fd64_to_fd_mp.reserve(10007); - fd_info_mp.reserve(10007); - //return 0; + fd_to_fd64_mp.reserve(n); + fd64_to_fd_mp.reserve(n); + fd_info_mp.reserve(n); } u64_t fd_manager_t::create(int fd) { @@ -67,7 +66,7 @@ u64_t fd_manager_t::create(int fd) fd_manager_t::fd_manager_t() { counter=u32_t(-1); - counter+=2; + counter+=10; } fd_info_t & fd_manager_t::get_info(fd64_t fd64) { diff --git a/fd_manager.h b/fd_manager.h index 014f0fb..33df80e 100644 --- a/fd_manager.h +++ b/fd_manager.h @@ -21,7 +21,7 @@ struct fd_manager_t //conver fd to a uniq 64bit number,avoid fd value conflict int exist(fd64_t fd64); int to_fd(fd64_t); void close(fd64_t fd64); - void reserve(); + void reserve(int n); u64_t create(int fd); fd_manager_t(); private: diff --git a/lib/rs.c b/lib/rs.c index 813655e..4e5d5ef 100644 --- a/lib/rs.c +++ b/lib/rs.c @@ -5,6 +5,8 @@ * Author: root */ #include "rs.h" +#include "stdlib.h" +#include "string.h" void rs_encode(void *code,char *data[],int size) { @@ -42,3 +44,29 @@ int rs_decode(void *code,char *data[],int size) } return fec_decode(code,(void**)data,index,size); } + +static void * (*table)[256]=0; +void* get_code(int k,int n) +{ + if (table==0) + { + table=(void* (*)[256]) malloc(sizeof(void*)*256*256); + memset(table,0,sizeof(void*)*256*256); + } + if(table[k][n]==0) + { + table[k][n]=fec_new(k,n); + } + return table[k][n]; +} +void rs_encode2(int k,int n,char *data[],int size) +{ + void* code=get_code(k,n); + rs_encode(code,data,size); +} + +int rs_decode2(int k,int n,char *data[],int size) +{ + void* code=get_code(k,n); + return rs_decode(code,data,size); +} diff --git a/lib/rs.h b/lib/rs.h index 1e7fa32..46e89bf 100644 --- a/lib/rs.h +++ b/lib/rs.h @@ -40,6 +40,10 @@ void rs_encode(void *code,char *data[],int size); int rs_decode(void *code,char *data[],int size); +void rs_encode2(int k,int n,char *data[],int size); + +int rs_decode2(int k,int n,char *data[],int size); + diff --git a/main.cpp b/main.cpp index 6525303..32db4dd 100644 --- a/main.cpp +++ b/main.cpp @@ -100,6 +100,21 @@ int new_connected_socket(int &fd,u32_t ip,int port) } return 0; } +int delay_send(my_time_t delay,const dest_t &dest,char *data,int len) +{ + return delay_manager.add(delay,dest,data,len);; +} +int from_normal_to_fec(const dest_t &dest,char *data,int len) +{ + delay_send(0,dest,data,len); + delay_send(1000*1000,dest,data,len); + return 0; +} +int from_fec_to_normal(const dest_t &dest,char *data,int len) +{ + my_send(dest,data,len); + return 0; +} int client_event_loop() { //char buf[buf_len]; @@ -142,6 +157,14 @@ int client_event_loop() myexit(-1); } + ev.events = EPOLLIN; + ev.data.u64 = delay_manager.get_timer_fd(); + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev); + if (ret!= 0) { + mylog(log_fatal,"add delay_manager.get_timer_fd() error\n"); + myexit(-1); + } + while(1)//////////////////////// { if(about_to_exit) myexit(0); @@ -208,7 +231,14 @@ int client_event_loop() dest.type=type_fd64_conv; dest.inner.fd64=remote_fd64; dest.conv=conv; - my_send(dest,data,data_len); + from_normal_to_fec(dest,data,data_len); + //my_send(dest,data,data_len); + } + else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) { + uint64_t value; + read(delay_manager.get_timer_fd(), &value, 8); + //printf("",delay_mp.size()); + //fflush(stdout); } else if(events[idx].data.u64>u32_t(-1) ) { @@ -246,9 +276,11 @@ int client_event_loop() dest_t dest; dest.inner.ip_port.from_u64(u64); dest.type=type_ip_port; - my_send(dest,new_data,new_len); + from_fec_to_normal(dest,new_data,new_len); mylog(log_trace,"[%s] send packet\n",dest.inner.ip_port.to_s()); } + + /* else if(events[idx].data.u64 ==(u64_t)timer_fd) { @@ -265,6 +297,7 @@ int client_event_loop() myexit(-1); } } + delay_manager.check(); } return 0; } @@ -298,6 +331,13 @@ int server_event_loop() myexit(-1); } + ev.events = EPOLLIN; + ev.data.u64 = delay_manager.get_timer_fd(); + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev); + if (ret!= 0) { + mylog(log_fatal,"add delay_manager.get_timer_fd() error\n"); + myexit(-1); + } mylog(log_info,"now listening at %s:%d\n",my_ntoa(local_ip_uint32),local_port); while(1)//////////////////////// @@ -352,12 +392,13 @@ int server_event_loop() ip_port_t ip_port; ip_port.ip=udp_new_addr_in.sin_addr.s_addr; ip_port.port=ntohs(udp_new_addr_in.sin_port); - if(!conn_manager.exist_ip_port(ip_port)) + if(!conn_manager.exist(ip_port)) { - conn_info_t &conn_info=conn_manager.find_insert(ip_port); + conn_manager.insert(ip_port); + conn_info_t &conn_info=conn_manager.find(ip_port); conn_info.conv_manager.reserve(); } - conn_info_t &conn_info=conn_manager.find_insert(ip_port); + conn_info_t &conn_info=conn_manager.find(ip_port); u32_t conv; char *new_data; @@ -396,8 +437,7 @@ int server_event_loop() dest_t dest; dest.type=type_fd64; dest.inner.fd64=fd64; - my_send(dest,new_data,new_len); - + from_fec_to_normal(dest,new_data,new_len); //int fd = int((u64 << 32u) >> 32u); //////////////////////////////todo @@ -437,6 +477,12 @@ int server_event_loop() mylog(log_debug,"(events[idx].data.u64 >>32u) == 2u ,%llu,%llu,%llu \n",begin_time,end_time,end_time-begin_time); } }*/ + else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) { + uint64_t value; + read(delay_manager.get_timer_fd(), &value, 8); + //printf("",delay_mp.size()); + //fflush(stdout); + } else if (events[idx].data.u64 >u32_t(-1)) { char data[buf_len]; @@ -452,11 +498,11 @@ int server_event_loop() assert(fd_manager.exist_info(fd64)); ip_port_t ip_port=fd_manager.get_info(fd64).ip_port; - assert(conn_manager.exist_ip_port(ip_port)); + assert(conn_manager.exist(ip_port)); - conn_info_t* p_conn_info=conn_manager.find_insert_p(ip_port); + //conn_info_t* p_conn_info=conn_manager.find_insert_p(ip_port); - conn_info_t &conn_info=*p_conn_info; + conn_info_t &conn_info=conn_manager.find(ip_port); assert(conn_info.conv_manager.is_u64_used(fd64)); @@ -484,7 +530,7 @@ int server_event_loop() dest.type=type_ip_port_conv; dest.conv=conv; dest.inner.ip_port=ip_port; - my_send(dest,data,data_len); + from_normal_to_fec(dest,data,data_len); mylog(log_trace,"[%s] send packet\n",ip_port.to_s()); } @@ -495,6 +541,7 @@ int server_event_loop() } } + delay_manager.check(); } return 0; } @@ -514,7 +561,7 @@ int unit_test() { data[i]=arr[i]; } - rs_encode(code,data,3); + rs_encode2(3,6,data,3); //printf("%d %d",(int)(unsigned char)arr[5][0],(int)('a'^'b'^'c'^'d'^'e')); for(i=0;i<6;i++) @@ -526,7 +573,7 @@ int unit_test() //data[1]=0; //data[5]=0; - int ret=rs_decode(code,data,3); + int ret=rs_decode2(3,6,data,3); printf("ret:%d\n",ret); for(i=0;i<6;i++) @@ -891,7 +938,7 @@ int main(int argc, char *argv[]) local_ip_uint32=inet_addr(local_ip); remote_ip_uint32=inet_addr(remote_ip); - + fd_manager.reserve(10007); if(program_mode==client_mode) { diff --git a/packet.cpp b/packet.cpp index 2d886d3..771c413 100644 --- a/packet.cpp +++ b/packet.cpp @@ -234,7 +234,7 @@ int send_fd (int fd,char * buf, int len,int flags) } //enum delay_type_t {none=0,enum_sendto_u64,enum_send_fd,client_to_local,client_to_remote,server_to_local,server_to_remote}; -int my_send(dest_t &dest,char *data,int len) +int my_send(const dest_t &dest,char *data,int len) { switch(dest.type) { diff --git a/packet.h b/packet.h index 772bd11..95eb484 100644 --- a/packet.h +++ b/packet.h @@ -24,7 +24,7 @@ extern int random_drop; extern int local_listen_fd; -int my_send(dest_t &dest,char *data,int len); +int my_send(const dest_t &dest,char *data,int len); void encrypt_0(char * input,int &len,char *key); void decrypt_0(char * input,int &len,char *key);