From f90d1abe053d8e187d217584eab6859a5eb01bee Mon Sep 17 00:00:00 2001 From: wangyu- Date: Fri, 20 Jul 2018 03:44:53 -0500 Subject: [PATCH] change ip_port to address_t in conn_manager --- common.h | 8 +++++ connection.cpp | 87 ++++++++++++++++++++++++++++---------------------- connection.h | 23 +++++++------ main.cpp | 27 +++++++++------- 4 files changed, 84 insertions(+), 61 deletions(-) diff --git a/common.h b/common.h index 2127965..49364a0 100644 --- a/common.h +++ b/common.h @@ -118,6 +118,14 @@ struct address_t //TODO scope id { memset(&inner,0,sizeof(inner)); } + int from_ip_port(u32_t ip, int port) + { + clear(); + inner.ipv4.sin_family=AF_INET; + inner.ipv4.sin_port=htons(port); + inner.ipv4.sin_addr.s_addr=ip; + return 0; + } int from_str(char * str); int from_sockaddr(sockaddr *,socklen_t); diff --git a/connection.cpp b/connection.cpp index 9582e14..c169fb2 100644 --- a/connection.cpp +++ b/connection.cpp @@ -123,6 +123,8 @@ conn_manager_t conn_manager; assert(blob==0); blob=new blob_t; + blob->conv_manager.s.additional_clear_function=server_clear_function; + } conn_info_t::conn_info_t(const conn_info_t&b) @@ -173,7 +175,7 @@ conn_manager_t conn_manager; { ready_num=0; mp.reserve(10007); - clear_it=mp.begin(); + //clear_it=mp.begin(); // timer_fd_mp.reserve(10007); const_id_mp.reserve(10007); // udp_fd_mp.reserve(100007); @@ -181,13 +183,13 @@ conn_manager_t conn_manager; //current_ready_ip=0; // current_ready_port=0; } - int conn_manager_t::exist(u32_t ip,uint16_t port) + int conn_manager_t::exist(address_t addr) { - u64_t u64=0; - u64=ip; - u64<<=32u; - u64|=port; - if(mp.find(u64)!=mp.end()) + //u64_t u64=0; + //u64=ip; + //u64<<=32u; + //u64|=port; + if(mp.find(addr)!=mp.end()) { return 1; } @@ -203,33 +205,33 @@ conn_manager_t conn_manager; mp[u64]; return 0; }*/ - conn_info_t *& conn_manager_t::find_insert_p(u32_t ip,uint16_t port) //be aware,the adress may change after rehash + conn_info_t *& conn_manager_t::find_insert_p(address_t addr) //be aware,the adress may change after rehash { - u64_t u64=0; - u64=ip; - u64<<=32u; - u64|=port; - unordered_map::iterator it=mp.find(u64); + // u64_t u64=0; + //u64=ip; + //u64<<=32u; + //u64|=port; + unordered_map::iterator it=mp.find(addr); if(it==mp.end()) { - mp[u64]=new conn_info_t; + mp[addr]=new conn_info_t; } - return mp[u64]; + return mp[addr]; } - conn_info_t & conn_manager_t::find_insert(u32_t ip,uint16_t port) //be aware,the adress may change after rehash + conn_info_t & conn_manager_t::find_insert(address_t addr) //be aware,the adress may change after rehash { - u64_t u64=0; - u64=ip; - u64<<=32u; - u64|=port; - unordered_map::iterator it=mp.find(u64); + //u64_t u64=0; + //u64=ip; + //u64<<=32u; + //u64|=port; + unordered_map::iterator it=mp.find(addr); if(it==mp.end()) { - mp[u64]=new conn_info_t; + mp[addr]=new conn_info_t; } - return *mp[u64]; + return *mp[addr]; } - int conn_manager_t::erase(unordered_map::iterator erase_it) + int conn_manager_t::erase(unordered_map::iterator erase_it) { if(erase_it->second->state.server_current_state==server_ready) { @@ -280,14 +282,14 @@ int conn_manager_t::clear_inactive() } int conn_manager_t::clear_inactive0() { - unordered_map::iterator it; - unordered_map::iterator old_it; + //unordered_map::iterator it; + // unordered_map::iterator old_it; if(disable_conn_clear) return 0; //map::iterator it; int cnt=0; - it=clear_it; + //it=clear_it; int size=mp.size(); int num_to_clean=size/conn_clear_ratio+conn_clear_min; //clear 1/10 each time,to avoid latency glitch @@ -299,36 +301,43 @@ int conn_manager_t::clear_inactive0() for(;;) { if(cnt>=num_to_clean) break; - if(mp.begin()==mp.end()) break; - if(it==mp.end()) - { - it=mp.begin(); - } + if(lru.empty()) break; + address_t key; + my_time_t ts=lru.peek_back(key); + + //if(mp.begin()==mp.end()) break; + + //if(it==mp.end()) + //{ + // it=mp.begin(); + //} + + auto it=mp.find(key); if(it->second->state.server_current_state==server_ready &¤t_time - it->second->last_hb_recv_time <=server_conn_timeout) { - it++; + //it++; } else if(it->second->state.server_current_state!=server_ready&& current_time - it->second->last_state_time <=server_handshake_timeout ) { - it++; + //it++; } else if(it->second->blob!=0&&it->second->blob->conv_manager.s.get_size() >0) { assert(it->second->state.server_current_state==server_ready); - it++; + //it++; } else { mylog(log_info,"[%s:%d]inactive conn cleared \n",my_ntoa(it->second->raw_info.recv_info.src_ip),it->second->raw_info.recv_info.src_port); - old_it=it; - it++; - erase(old_it); + //old_it=it; + //it++; + erase(it); } cnt++; } - clear_it=it; + //clear_it=it; return 0; } diff --git a/connection.h b/connection.h index c434f4e..7eeeebd 100644 --- a/connection.h +++ b/connection.h @@ -217,16 +217,17 @@ struct conv_manager_t // manage the udp connections int clear_inactive0(char * ip_port);*/ };//g_conv_manager; -struct blob_t:not_copy_able_t //used in conn_info_t. conv_manager_t and anti_replay_t are costly data structures ,we dont allocate them until its necessary +struct blob_t:not_copy_able_t //used in conn_info_t. { - struct //TODO change to unconstrained union + struct //conv_manager_t is here to avoid copying when a connection is recovered + //TODO maybe an unconstrained union is better, but struct is okay since conv_manger is small when no data is filled in. { conv_manager_t c; conv_manager_t s; - //conv_manager_t test; + //avoid templates here and there, avoid pointer and type cast }conv_manager; - anti_replay_t anti_replay; + anti_replay_t anti_replay;//anti_replay_t is here bc its huge,its allocation is delayed. }; struct conn_info_t //stores info for a raw connection.for client ,there is only one connection,for server there can be thousand of connection since server can //handle multiple clients @@ -279,14 +280,16 @@ struct conn_manager_t //manager for connections. for client,we dont need conn_m unordered_map const_id_mp; - unordered_map mp; //put it at end so that it de-consturcts first + unordered_map mp; //put it at end so that it de-consturcts first - unordered_map::iterator clear_it; + lru_collector_t lru; + + //unordered_map::iterator clear_it; long long last_clear_time; conn_manager_t(); - int exist(u32_t ip,uint16_t port); + int exist(address_t addr); /* int insert(uint32_t ip,uint16_t port) { @@ -297,10 +300,10 @@ struct conn_manager_t //manager for connections. for client,we dont need conn_m mp[u64]; return 0; }*/ - conn_info_t *& find_insert_p(u32_t ip,uint16_t port); //be aware,the adress may change after rehash - conn_info_t & find_insert(u32_t ip,uint16_t port) ; //be aware,the adress may change after rehash + conn_info_t *& find_insert_p(address_t addr); //be aware,the adress may change after rehash //not true? + conn_info_t & find_insert(address_t addr) ; //be aware,the adress may change after rehash - int erase(unordered_map::iterator erase_it); + int erase(unordered_map::iterator erase_it); int clear_inactive(); int clear_inactive0(); diff --git a/main.cpp b/main.cpp index 2fc59eb..0cad4b3 100755 --- a/main.cpp +++ b/main.cpp @@ -565,9 +565,11 @@ int server_on_raw_recv_multi() //called when server received an raw packet mylog(log_trace,"[%s]peek_raw\n",ip_port); int data_len; char *data; + address_t addr; + addr.from_ip_port(ip,port); if(raw_mode==mode_faketcp&&peek_info.syn==1) { - if(!conn_manager.exist(ip,port)||conn_manager.find_insert(ip,port).state.server_current_state!=server_ready) + if(!conn_manager.exist(addr)||conn_manager.find_insert(addr).state.server_current_state!=server_ready) {//reply any syn ,before state become ready raw_info_t tmp_raw_info; @@ -610,7 +612,7 @@ int server_on_raw_recv_multi() //called when server received an raw packet } return 0; } - if(!conn_manager.exist(ip,port)) + if(!conn_manager.exist(addr)) { if(conn_manager.mp.size()>=max_handshake_conn_num) { @@ -649,7 +651,7 @@ int server_on_raw_recv_multi() //called when server received an raw packet mylog(log_info,"[%s]got packet from a new ip\n",ip_port); - conn_info_t &conn_info=conn_manager.find_insert(ip,port); + conn_info_t &conn_info=conn_manager.find_insert(addr); conn_info.raw_info=tmp_raw_info; raw_info_t &raw_info=conn_info.raw_info; @@ -690,7 +692,7 @@ int server_on_raw_recv_multi() //called when server received an raw packet - conn_info_t & conn_info=conn_manager.find_insert(ip,port);//insert if not exist + conn_info_t & conn_info=conn_manager.find_insert(addr);//insert if not exist packet_info_t &send_info=conn_info.raw_info.send_info; packet_info_t &recv_info=conn_info.raw_info.recv_info; raw_info_t &raw_info=conn_info.raw_info; @@ -969,7 +971,6 @@ int server_on_raw_recv_pre_ready(conn_info_t &conn_info,char * ip_port,u32_t tmp } conn_info.prepare(); - conn_info.blob->conv_manager.s.additional_clear_function=server_clear_function; conn_info.state.server_current_state = server_ready; conn_info.oppsite_const_id=tmp_oppsite_const_id; conn_manager.ready_num++; @@ -1027,18 +1028,20 @@ int server_on_raw_recv_pre_ready(conn_info_t &conn_info,char * ip_port,u32_t tmp conn_info.oppsite_const_id=0; return 0; } - if(!conn_manager.exist(ori_conn_info.raw_info.recv_info.src_ip,ori_conn_info.raw_info.recv_info.src_port))//TODO remove this + address_t addr1;addr1.from_ip_port(ori_conn_info.raw_info.recv_info.src_ip,ori_conn_info.raw_info.recv_info.src_port); + if(!conn_manager.exist(addr1))//TODO remove this { mylog(log_fatal,"[%s]this shouldnt happen\n",ip_port); myexit(-1); } - if(!conn_manager.exist(conn_info.raw_info.recv_info.src_ip,conn_info.raw_info.recv_info.src_port))//TODO remove this + address_t addr2;addr2.from_ip_port(conn_info.raw_info.recv_info.src_ip,conn_info.raw_info.recv_info.src_port); + if(!conn_manager.exist(addr2))//TODO remove this { mylog(log_fatal,"[%s]this shouldnt happen2\n",ip_port); myexit(-1); } - conn_info_t *&p_ori=conn_manager.find_insert_p(ori_conn_info.raw_info.recv_info.src_ip,ori_conn_info.raw_info.recv_info.src_port); - conn_info_t *&p=conn_manager.find_insert_p(conn_info.raw_info.recv_info.src_ip,conn_info.raw_info.recv_info.src_port); + conn_info_t *&p_ori=conn_manager.find_insert_p(addr1); + conn_info_t *&p=conn_manager.find_insert_p(addr2); conn_info_t *tmp=p; p=p_ori; p_ori=tmp; @@ -1227,7 +1230,7 @@ int client_event_loop() //g_packet_info.src_ip=source_address_uint32; //g_packet_info.src_port=source_port; - udp_fd=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + udp_fd=socket(remote_addr.get_type(), SOCK_DGRAM, IPPROTO_UDP); set_buf_size(udp_fd,socket_buf_size); int yes = 1; @@ -1480,11 +1483,11 @@ int server_event_loop() if(raw_mode==mode_faketcp) { - bind_fd=socket(AF_INET,SOCK_STREAM,0); + bind_fd=socket(local_addr.get_type(),SOCK_STREAM,0); } else if(raw_mode==mode_udp||raw_mode==mode_icmp)//bind an adress to avoid collision,for icmp,there is no port,just bind a udp port { - bind_fd=socket(AF_INET,SOCK_DGRAM,0); + bind_fd=socket(local_addr.get_type(),SOCK_DGRAM,0); } //struct sockaddr_in temp_bind_addr={0};