From 946b615acf1041e619ef1362ba39213d81e3e6a4 Mon Sep 17 00:00:00 2001 From: wangyu- Date: Thu, 19 Jul 2018 01:23:25 -0500 Subject: [PATCH] conv_manager_t changed to template --- common.h | 30 ++++++++ connection.cpp | 157 ++--------------------------------------- connection.h | 187 +++++++++++++++++++++++++++++++++++++++++++++---- main.cpp | 37 +++++----- 4 files changed, 230 insertions(+), 181 deletions(-) diff --git a/common.h b/common.h index 2ed0896..780c480 100644 --- a/common.h +++ b/common.h @@ -185,6 +185,36 @@ struct address_t //TODO scope id char* get_ip(); }; +namespace std { +template <> + struct hash + { + std::size_t operator()(const address_t& key) const + { + + //return address_t::hash_function(k); + return sdbm((unsigned char*)&key.inner,sizeof(key.inner)); + } + }; +} + +struct not_copy_able_t +{ + not_copy_able_t() + { + + } + not_copy_able_t(const not_copy_able_t &other) + { + assert(0==1); + } + const not_copy_able_t & operator=(const not_copy_able_t &other) + { + assert(0==1); + return other; + } +}; + const int max_data_len=1800; const int buf_len=max_data_len+400; diff --git a/connection.cpp b/connection.cpp index a4f55cf..9582e14 100644 --- a/connection.cpp +++ b/connection.cpp @@ -12,7 +12,7 @@ int disable_anti_replay=0;//if anti_replay windows is diabled -const int disable_conv_clear=0;//a udp connection in the multiplexer is called conversation in this program,conv for short. + const int disable_conn_clear=0;//a raw connection is called conn. @@ -75,154 +75,6 @@ conn_manager_t conn_manager; } -void server_clear_function(u64_t u64); - -conv_manager_t::conv_manager_t() - { - clear_it=conv_last_active_time.begin(); - long long last_clear_time=0; - //clear_function=0; - } -conv_manager_t::~conv_manager_t() - { - clear(); - } - int conv_manager_t::get_size() - { - return conv_to_u64.size(); - } - void conv_manager_t::reserve() - { - u64_to_conv.reserve(10007); - conv_to_u64.reserve(10007); - conv_last_active_time.reserve(10007); - } - void conv_manager_t::clear() - { - if(disable_conv_clear) return ; - - if(program_mode==server_mode) - { - for(it=conv_to_u64.begin();it!=conv_to_u64.end();it++) - { - //int fd=int((it->second<<32u)>>32u); - server_clear_function( it->second); - } - } - u64_to_conv.clear(); - conv_to_u64.clear(); - conv_last_active_time.clear(); - - clear_it=conv_last_active_time.begin(); - - } - u32_t conv_manager_t::get_new_conv() - { - u32_t conv=get_true_random_number_nz(); - while(conv_to_u64.find(conv)!=conv_to_u64.end()) - { - conv=get_true_random_number_nz(); - } - return conv; - } - int conv_manager_t::is_conv_used(u32_t conv) - { - return conv_to_u64.find(conv)!=conv_to_u64.end(); - } - int conv_manager_t::is_u64_used(u64_t u64) - { - return u64_to_conv.find(u64)!=u64_to_conv.end(); - } - u32_t conv_manager_t::find_conv_by_u64(u64_t u64) - { - return u64_to_conv[u64]; - } - u64_t conv_manager_t::find_u64_by_conv(u32_t conv) - { - return conv_to_u64[conv]; - } - int conv_manager_t::update_active_time(u32_t conv) - { - return conv_last_active_time[conv]=get_current_time(); - } - int conv_manager_t::insert_conv(u32_t conv,u64_t u64) - { - u64_to_conv[u64]=conv; - conv_to_u64[conv]=u64; - conv_last_active_time[conv]=get_current_time(); - return 0; - } - int conv_manager_t::erase_conv(u32_t conv) - { - if(disable_conv_clear) return 0; - u64_t u64=conv_to_u64[conv]; - if(program_mode==server_mode) - { - server_clear_function(u64); - } - conv_to_u64.erase(conv); - u64_to_conv.erase(u64); - conv_last_active_time.erase(conv); - return 0; - } - int conv_manager_t::clear_inactive(char * ip_port) - { - if(get_current_time()-last_clear_time>conv_clear_interval) - { - last_clear_time=get_current_time(); - return clear_inactive0(ip_port); - } - return 0; - } - int conv_manager_t::clear_inactive0(char * ip_port) - { - if(disable_conv_clear) return 0; - - - //map::iterator it; - int cnt=0; - it=clear_it; - int size=conv_last_active_time.size(); - int num_to_clean=size/conv_clear_ratio+conv_clear_min; //clear 1/10 each time,to avoid latency glitch - - num_to_clean=min(num_to_clean,size); - - u64_t current_time=get_current_time(); - for(;;) - { - if(cnt>=num_to_clean) break; - if(conv_last_active_time.begin()==conv_last_active_time.end()) break; - - if(it==conv_last_active_time.end()) - { - it=conv_last_active_time.begin(); - } - - if( current_time -it->second >conv_timeout ) - { - //mylog(log_info,"inactive conv %u cleared \n",it->first); - old_it=it; - it++; - u32_t conv= old_it->first; - erase_conv(old_it->first); - if(ip_port==0) - { - mylog(log_info,"conv %x cleared\n",conv); - } - else - { - mylog(log_info,"[%s]conv %x cleared\n",ip_port,conv); - } - } - else - { - it++; - } - cnt++; - } - clear_it=it; - return 0; - } void conn_info_t::recover(const conn_info_t &conn_info) @@ -268,18 +120,23 @@ conv_manager_t::~conv_manager_t() } void conn_info_t::prepare() { + assert(blob==0); blob=new blob_t; } + conn_info_t::conn_info_t(const conn_info_t&b) { + assert(0==1); //mylog(log_error,"called!!!!!!!!!!!!!\n"); *this=b; if(blob!=0) { blob=new blob_t(*b.blob); + } } + conn_info_t& conn_info_t::operator=(const conn_info_t& b) { mylog(log_fatal,"not allowed\n"); @@ -457,7 +314,7 @@ int conn_manager_t::clear_inactive0() { it++; } - else if(it->second->blob!=0&&it->second->blob->conv_manager.get_size() >0) + else if(it->second->blob!=0&&it->second->blob->conv_manager.s.get_size() >0) { assert(it->second->state.server_current_state==server_ready); it++; diff --git a/connection.h b/connection.h index c476abe..2c70fb4 100644 --- a/connection.h +++ b/connection.h @@ -16,6 +16,8 @@ extern int disable_anti_replay; #include "network.h" #include "misc.h" +const int disable_conv_clear=0;//a udp connection in the multiplexer is called conversation in this program,conv for short. + struct anti_replay_t //its for anti replay attack,similar to openvpn/ipsec 's anti replay window @@ -30,24 +32,177 @@ struct anti_replay_t //its for anti replay attack,similar to openvpn/ipsec 's a int is_vaild(u64_t seq); };//anti_replay; +void server_clear_function(u64_t u64); +#include + +template struct conv_manager_t // manage the udp connections { //typedef hash_map map; - unordered_map u64_to_conv; //conv and u64 are both supposed to be uniq - unordered_map conv_to_u64; + unordered_map data_to_conv; //conv and u64 are both supposed to be uniq + unordered_map conv_to_data; unordered_map conv_last_active_time; unordered_map::iterator clear_it; - unordered_map::iterator it; - unordered_map::iterator old_it; - - //void (*clear_function)(uint64_t u64) ; + void (*additional_clear_function)(T data) =0; long long last_clear_time; + conv_manager_t() + { + clear_it=conv_last_active_time.begin(); + long long last_clear_time=0; + additional_clear_function=0; + } + ~conv_manager_t() + { + clear(); + } + int get_size() + { + return conv_to_data.size(); + } + void reserve() + { + data_to_conv.reserve(10007); + conv_to_data.reserve(10007); + conv_last_active_time.reserve(10007); + } + void clear() + { + if(disable_conv_clear) return ; + + if(additional_clear_function!=0) + { + for(auto it=conv_to_data.begin();it!=conv_to_data.end();it++) + { + //int fd=int((it->second<<32u)>>32u); + additional_clear_function( it->second); + } + } + data_to_conv.clear(); + conv_to_data.clear(); + conv_last_active_time.clear(); + + clear_it=conv_last_active_time.begin(); + + } + u32_t get_new_conv() + { + u32_t conv=get_true_random_number_nz(); + while(conv_to_data.find(conv)!=conv_to_data.end()) + { + conv=get_true_random_number_nz(); + } + return conv; + } + int is_conv_used(u32_t conv) + { + return conv_to_data.find(conv)!=conv_to_data.end(); + } + int is_data_used(T data) + { + return data_to_conv.find(data)!=data_to_conv.end(); + } + u32_t find_conv_by_data(T data) + { + return data_to_conv[data]; + } + u64_t find_data_by_conv(u32_t conv) + { + return conv_to_data[conv]; + } + int update_active_time(u32_t conv) + { + return conv_last_active_time[conv]=get_current_time(); + } + int insert_conv(u32_t conv,u64_t u64) + { + data_to_conv[u64]=conv; + conv_to_data[conv]=u64; + conv_last_active_time[conv]=get_current_time(); + return 0; + } + int erase_conv(u32_t conv) + { + if(disable_conv_clear) return 0; + u64_t u64=conv_to_data[conv]; + if(program_mode==server_mode) + { + server_clear_function(u64); + } + conv_to_data.erase(conv); + data_to_conv.erase(u64); + conv_last_active_time.erase(conv); + return 0; + } + int clear_inactive(char * ip_port=0) + { + if(get_current_time()-last_clear_time>conv_clear_interval) + { + last_clear_time=get_current_time(); + return clear_inactive0(ip_port); + } + return 0; + } + int clear_inactive0(char * ip_port) + { + if(disable_conv_clear) return 0; + + + unordered_map::iterator it; + unordered_map::iterator old_it; + + //map::iterator it; + int cnt=0; + it=clear_it; + int size=conv_last_active_time.size(); + int num_to_clean=size/conv_clear_ratio+conv_clear_min; //clear 1/10 each time,to avoid latency glitch + + num_to_clean=min(num_to_clean,size); + + u64_t current_time=get_current_time(); + for(;;) + { + if(cnt>=num_to_clean) break; + if(conv_last_active_time.begin()==conv_last_active_time.end()) break; + + if(it==conv_last_active_time.end()) + { + it=conv_last_active_time.begin(); + } + + if( current_time -it->second >conv_timeout ) + { + //mylog(log_info,"inactive conv %u cleared \n",it->first); + old_it=it; + it++; + u32_t conv= old_it->first; + erase_conv(old_it->first); + if(ip_port==0) + { + mylog(log_info,"conv %x cleared\n",conv); + } + else + { + mylog(log_info,"[%s]conv %x cleared\n",ip_port,conv); + } + } + else + { + it++; + } + cnt++; + } + clear_it=it; + return 0; + } + + + /* conv_manager_t(); ~conv_manager_t(); int get_size(); @@ -55,19 +210,25 @@ struct conv_manager_t // manage the udp connections void clear(); u32_t get_new_conv(); int is_conv_used(u32_t conv); - int is_u64_used(u64_t u64); - u32_t find_conv_by_u64(u64_t u64); - u64_t find_u64_by_conv(u32_t conv); + int is_u64_used(T u64); + u32_t find_conv_by_u64(T u64); + T find_u64_by_conv(u32_t conv); int update_active_time(u32_t conv); - int insert_conv(u32_t conv,u64_t u64); + int insert_conv(u32_t conv,T u64); int erase_conv(u32_t conv); int clear_inactive(char * ip_port=0); - int clear_inactive0(char * ip_port); + int clear_inactive0(char * ip_port);*/ };//g_conv_manager; -struct blob_t //used in conn_info_t. conv_manager_t and anti_replay_t are costly data structures ,we dont allocate them until its necessary +struct blob_t:not_copy_able_t //used in conn_info_t. conv_manager_t and anti_replay_t are costly data structures ,we dont allocate them until its necessary { - conv_manager_t conv_manager; + struct //TODO change to unconstrained union + { + conv_manager_t c; + conv_manager_t s; + conv_manager_t test; + }conv_manager; + anti_replay_t anti_replay; }; struct conn_info_t //stores info for a raw connection.for client ,there is only one connection,for server there can be thousand of connection since server can diff --git a/main.cpp b/main.cpp index 42d2dea..44748ad 100755 --- a/main.cpp +++ b/main.cpp @@ -23,7 +23,7 @@ int client_on_timer(conn_info_t &conn_info) //for client. called when a timer is packet_info_t &send_info=conn_info.raw_info.send_info; packet_info_t &recv_info=conn_info.raw_info.recv_info; raw_info_t &raw_info=conn_info.raw_info; - conn_info.blob->conv_manager.clear_inactive(); + conn_info.blob->conv_manager.c.clear_inactive(); mylog(log_trace,"timer!\n"); mylog(log_trace,"roller my %d,oppsite %d,%lld\n",int(conn_info.my_roller),int(conn_info.oppsite_roller),conn_info.last_oppsite_roller_time); @@ -310,7 +310,7 @@ int server_on_timer_multi(conn_info_t &conn_info,char * ip_port) //for server. if(conn_info.state.server_current_state==server_ready) { - conn_info.blob->conv_manager.clear_inactive(ip_port); + conn_info.blob->conv_manager.s.clear_inactive(ip_port); /* if( get_current_time()-conn_info.last_hb_recv_time>heartbeat_timeout ) { @@ -496,15 +496,15 @@ int client_on_raw_recv(conn_info_t &conn_info) //called when raw fd received a p memcpy(&tmp_conv_id,&data[0],sizeof(tmp_conv_id)); tmp_conv_id=ntohl(tmp_conv_id); - if(!conn_info.blob->conv_manager.is_conv_used(tmp_conv_id)) + if(!conn_info.blob->conv_manager.c.is_conv_used(tmp_conv_id)) { mylog(log_info,"unknow conv %d,ignore\n",tmp_conv_id); return 0; } - conn_info.blob->conv_manager.update_active_time(tmp_conv_id); + conn_info.blob->conv_manager.c.update_active_time(tmp_conv_id); - u64_t u64=conn_info.blob->conv_manager.find_u64_by_conv(tmp_conv_id); + u64_t u64=conn_info.blob->conv_manager.c.find_data_by_conv(tmp_conv_id); sockaddr_in tmp_sockaddr={0}; @@ -845,8 +845,8 @@ int server_on_raw_recv_ready(conn_info_t &conn_info,char * ip_port,char type,cha conn_info.last_hb_recv_time = get_current_time(); mylog(log_trace, "conv:%u\n", tmp_conv_id); - if (!conn_info.blob->conv_manager.is_conv_used(tmp_conv_id)) { - if (conn_info.blob->conv_manager.get_size() >= max_conv_num) { + if (!conn_info.blob->conv_manager.s.is_conv_used(tmp_conv_id)) { + if (conn_info.blob->conv_manager.s.get_size() >= max_conv_num) { mylog(log_warn, "[%s]ignored new conv %x connect bc max_conv_num exceed\n",ip_port, tmp_conv_id); @@ -907,7 +907,7 @@ int server_on_raw_recv_ready(conn_info_t &conn_info,char * ip_port,char type,cha return -1; } - conn_info.blob->conv_manager.insert_conv(tmp_conv_id, new_udp_fd64); + conn_info.blob->conv_manager.s.insert_conv(tmp_conv_id, new_udp_fd64); @@ -924,9 +924,9 @@ int server_on_raw_recv_ready(conn_info_t &conn_info,char * ip_port,char type,cha } - fd64_t fd64 = conn_info.blob->conv_manager.find_u64_by_conv(tmp_conv_id); + fd64_t fd64 = conn_info.blob->conv_manager.s.find_data_by_conv(tmp_conv_id); - conn_info.blob->conv_manager.update_active_time(tmp_conv_id); + conn_info.blob->conv_manager.s.update_active_time(tmp_conv_id); int fd = fd_manager.to_fd(fd64); @@ -968,6 +968,7 @@ int server_on_raw_recv_pre_ready(conn_info_t &conn_info,char * ip_port,u32_t tmp } conn_info.prepare(); + conn_info.blob->conv_manager.s.additional_clear_function=server_clear_function; conn_info.state.server_current_state = server_ready; conn_info.oppsite_const_id=tmp_oppsite_const_id; conn_manager.ready_num++; @@ -1405,23 +1406,23 @@ int client_event_loop() u64_t u64=((u64_t(udp_new_addr_in.sin_addr.s_addr))<<32u)+ntohs(udp_new_addr_in.sin_port); u32_t conv; - if(!conn_info.blob->conv_manager.is_u64_used(u64)) + if(!conn_info.blob->conv_manager.c.is_data_used(u64)) { - if(conn_info.blob->conv_manager.get_size() >=max_conv_num) + if(conn_info.blob->conv_manager.c.get_size() >=max_conv_num) { mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n"); continue; } - conv=conn_info.blob->conv_manager.get_new_conv(); - conn_info.blob->conv_manager.insert_conv(conv,u64); + conv=conn_info.blob->conv_manager.c.get_new_conv(); + conn_info.blob->conv_manager.c.insert_conv(conv,u64); mylog(log_info,"new packet from %s:%d,conv_id=%x\n",inet_ntoa(udp_new_addr_in.sin_addr),ntohs(udp_new_addr_in.sin_port),conv); } else { - conv=conn_info.blob->conv_manager.find_conv_by_u64(u64); + conv=conn_info.blob->conv_manager.c.find_conv_by_data(u64); } - conn_info.blob->conv_manager.update_active_time(conv); + conn_info.blob->conv_manager.c.update_active_time(conv); if(conn_info.state.client_current_state==client_ready) { @@ -1713,9 +1714,9 @@ int server_event_loop() conn_info_t &conn_info=*p_conn_info; - assert(conn_info.blob->conv_manager.is_u64_used(fd64)); + assert(conn_info.blob->conv_manager.s.is_data_used(fd64)); - u32_t conv_id=conn_info.blob->conv_manager.find_conv_by_u64(fd64); + u32_t conv_id=conn_info.blob->conv_manager.s.find_conv_by_data(fd64); int fd=fd_manager.to_fd(fd64);