change ip_port to address_t in conn_manager

This commit is contained in:
wangyu- 2018-07-20 03:44:53 -05:00
parent f050946ac5
commit f90d1abe05
4 changed files with 84 additions and 61 deletions

View File

@ -118,6 +118,14 @@ struct address_t //TODO scope id
{ {
memset(&inner,0,sizeof(inner)); memset(&inner,0,sizeof(inner));
} }
int from_ip_port(u32_t ip, int port)
{
clear();
inner.ipv4.sin_family=AF_INET;
inner.ipv4.sin_port=htons(port);
inner.ipv4.sin_addr.s_addr=ip;
return 0;
}
int from_str(char * str); int from_str(char * str);
int from_sockaddr(sockaddr *,socklen_t); int from_sockaddr(sockaddr *,socklen_t);

View File

@ -123,6 +123,8 @@ conn_manager_t conn_manager;
assert(blob==0); assert(blob==0);
blob=new blob_t; blob=new blob_t;
blob->conv_manager.s.additional_clear_function=server_clear_function;
} }
conn_info_t::conn_info_t(const conn_info_t&b) conn_info_t::conn_info_t(const conn_info_t&b)
@ -173,7 +175,7 @@ conn_manager_t conn_manager;
{ {
ready_num=0; ready_num=0;
mp.reserve(10007); mp.reserve(10007);
clear_it=mp.begin(); //clear_it=mp.begin();
// timer_fd_mp.reserve(10007); // timer_fd_mp.reserve(10007);
const_id_mp.reserve(10007); const_id_mp.reserve(10007);
// udp_fd_mp.reserve(100007); // udp_fd_mp.reserve(100007);
@ -181,13 +183,13 @@ conn_manager_t conn_manager;
//current_ready_ip=0; //current_ready_ip=0;
// current_ready_port=0; // current_ready_port=0;
} }
int conn_manager_t::exist(u32_t ip,uint16_t port) int conn_manager_t::exist(address_t addr)
{ {
u64_t u64=0; //u64_t u64=0;
u64=ip; //u64=ip;
u64<<=32u; //u64<<=32u;
u64|=port; //u64|=port;
if(mp.find(u64)!=mp.end()) if(mp.find(addr)!=mp.end())
{ {
return 1; return 1;
} }
@ -203,33 +205,33 @@ conn_manager_t conn_manager;
mp[u64]; mp[u64];
return 0; return 0;
}*/ }*/
conn_info_t *& conn_manager_t::find_insert_p(u32_t ip,uint16_t port) //be aware,the adress may change after rehash conn_info_t *& conn_manager_t::find_insert_p(address_t addr) //be aware,the adress may change after rehash
{ {
u64_t u64=0; // u64_t u64=0;
u64=ip; //u64=ip;
u64<<=32u; //u64<<=32u;
u64|=port; //u64|=port;
unordered_map<u64_t,conn_info_t*>::iterator it=mp.find(u64); unordered_map<address_t,conn_info_t*>::iterator it=mp.find(addr);
if(it==mp.end()) if(it==mp.end())
{ {
mp[u64]=new conn_info_t; mp[addr]=new conn_info_t;
} }
return mp[u64]; return mp[addr];
} }
conn_info_t & conn_manager_t::find_insert(u32_t ip,uint16_t port) //be aware,the adress may change after rehash conn_info_t & conn_manager_t::find_insert(address_t addr) //be aware,the adress may change after rehash
{ {
u64_t u64=0; //u64_t u64=0;
u64=ip; //u64=ip;
u64<<=32u; //u64<<=32u;
u64|=port; //u64|=port;
unordered_map<u64_t,conn_info_t*>::iterator it=mp.find(u64); unordered_map<address_t,conn_info_t*>::iterator it=mp.find(addr);
if(it==mp.end()) if(it==mp.end())
{ {
mp[u64]=new conn_info_t; mp[addr]=new conn_info_t;
} }
return *mp[u64]; return *mp[addr];
} }
int conn_manager_t::erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it) int conn_manager_t::erase(unordered_map<address_t,conn_info_t*>::iterator erase_it)
{ {
if(erase_it->second->state.server_current_state==server_ready) if(erase_it->second->state.server_current_state==server_ready)
{ {
@ -280,14 +282,14 @@ int conn_manager_t::clear_inactive()
} }
int conn_manager_t::clear_inactive0() int conn_manager_t::clear_inactive0()
{ {
unordered_map<u64_t,conn_info_t*>::iterator it; //unordered_map<u64_t,conn_info_t*>::iterator it;
unordered_map<u64_t,conn_info_t*>::iterator old_it; // unordered_map<u64_t,conn_info_t*>::iterator old_it;
if(disable_conn_clear) return 0; if(disable_conn_clear) return 0;
//map<uint32_t,uint64_t>::iterator it; //map<uint32_t,uint64_t>::iterator it;
int cnt=0; int cnt=0;
it=clear_it; //it=clear_it;
int size=mp.size(); int size=mp.size();
int num_to_clean=size/conn_clear_ratio+conn_clear_min; //clear 1/10 each time,to avoid latency glitch int num_to_clean=size/conn_clear_ratio+conn_clear_min; //clear 1/10 each time,to avoid latency glitch
@ -299,36 +301,43 @@ int conn_manager_t::clear_inactive0()
for(;;) for(;;)
{ {
if(cnt>=num_to_clean) break; if(cnt>=num_to_clean) break;
if(mp.begin()==mp.end()) break;
if(it==mp.end()) if(lru.empty()) break;
{ address_t key;
it=mp.begin(); my_time_t ts=lru.peek_back(key);
}
//if(mp.begin()==mp.end()) break;
//if(it==mp.end())
//{
// it=mp.begin();
//}
auto it=mp.find(key);
if(it->second->state.server_current_state==server_ready &&current_time - it->second->last_hb_recv_time <=server_conn_timeout) if(it->second->state.server_current_state==server_ready &&current_time - it->second->last_hb_recv_time <=server_conn_timeout)
{ {
it++; //it++;
} }
else if(it->second->state.server_current_state!=server_ready&& current_time - it->second->last_state_time <=server_handshake_timeout ) else if(it->second->state.server_current_state!=server_ready&& current_time - it->second->last_state_time <=server_handshake_timeout )
{ {
it++; //it++;
} }
else if(it->second->blob!=0&&it->second->blob->conv_manager.s.get_size() >0) else if(it->second->blob!=0&&it->second->blob->conv_manager.s.get_size() >0)
{ {
assert(it->second->state.server_current_state==server_ready); assert(it->second->state.server_current_state==server_ready);
it++; //it++;
} }
else else
{ {
mylog(log_info,"[%s:%d]inactive conn cleared \n",my_ntoa(it->second->raw_info.recv_info.src_ip),it->second->raw_info.recv_info.src_port); mylog(log_info,"[%s:%d]inactive conn cleared \n",my_ntoa(it->second->raw_info.recv_info.src_ip),it->second->raw_info.recv_info.src_port);
old_it=it; //old_it=it;
it++; //it++;
erase(old_it); erase(it);
} }
cnt++; cnt++;
} }
clear_it=it; //clear_it=it;
return 0; return 0;
} }

View File

@ -217,16 +217,17 @@ struct conv_manager_t // manage the udp connections
int clear_inactive0(char * ip_port);*/ int clear_inactive0(char * ip_port);*/
};//g_conv_manager; };//g_conv_manager;
struct blob_t:not_copy_able_t //used in conn_info_t. conv_manager_t and anti_replay_t are costly data structures ,we dont allocate them until its necessary struct blob_t:not_copy_able_t //used in conn_info_t.
{ {
struct //TODO change to unconstrained union struct //conv_manager_t is here to avoid copying when a connection is recovered
//TODO maybe an unconstrained union is better, but struct is okay since conv_manger is small when no data is filled in.
{ {
conv_manager_t<address_t> c; conv_manager_t<address_t> c;
conv_manager_t<u64_t> s; conv_manager_t<u64_t> s;
//conv_manager_t<address_t> test; //avoid templates here and there, avoid pointer and type cast
}conv_manager; }conv_manager;
anti_replay_t anti_replay; anti_replay_t anti_replay;//anti_replay_t is here bc its huge,its allocation is delayed.
}; };
struct conn_info_t //stores info for a raw connection.for client ,there is only one connection,for server there can be thousand of connection since server can struct conn_info_t //stores info for a raw connection.for client ,there is only one connection,for server there can be thousand of connection since server can
//handle multiple clients //handle multiple clients
@ -279,14 +280,16 @@ struct conn_manager_t //manager for connections. for client,we dont need conn_m
unordered_map<id_t,conn_info_t *> const_id_mp; unordered_map<id_t,conn_info_t *> const_id_mp;
unordered_map<u64_t,conn_info_t*> mp; //put it at end so that it de-consturcts first unordered_map<address_t,conn_info_t*> mp; //put it at end so that it de-consturcts first
unordered_map<u64_t,conn_info_t*>::iterator clear_it; lru_collector_t<address_t> lru;
//unordered_map<u64_t,conn_info_t*>::iterator clear_it;
long long last_clear_time; long long last_clear_time;
conn_manager_t(); conn_manager_t();
int exist(u32_t ip,uint16_t port); int exist(address_t addr);
/* /*
int insert(uint32_t ip,uint16_t port) int insert(uint32_t ip,uint16_t port)
{ {
@ -297,10 +300,10 @@ struct conn_manager_t //manager for connections. for client,we dont need conn_m
mp[u64]; mp[u64];
return 0; return 0;
}*/ }*/
conn_info_t *& find_insert_p(u32_t ip,uint16_t port); //be aware,the adress may change after rehash conn_info_t *& find_insert_p(address_t addr); //be aware,the adress may change after rehash //not true?
conn_info_t & find_insert(u32_t ip,uint16_t port) ; //be aware,the adress may change after rehash conn_info_t & find_insert(address_t addr) ; //be aware,the adress may change after rehash
int erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it); int erase(unordered_map<address_t,conn_info_t*>::iterator erase_it);
int clear_inactive(); int clear_inactive();
int clear_inactive0(); int clear_inactive0();

View File

@ -565,9 +565,11 @@ int server_on_raw_recv_multi() //called when server received an raw packet
mylog(log_trace,"[%s]peek_raw\n",ip_port); mylog(log_trace,"[%s]peek_raw\n",ip_port);
int data_len; char *data; int data_len; char *data;
address_t addr;
addr.from_ip_port(ip,port);
if(raw_mode==mode_faketcp&&peek_info.syn==1) if(raw_mode==mode_faketcp&&peek_info.syn==1)
{ {
if(!conn_manager.exist(ip,port)||conn_manager.find_insert(ip,port).state.server_current_state!=server_ready) if(!conn_manager.exist(addr)||conn_manager.find_insert(addr).state.server_current_state!=server_ready)
{//reply any syn ,before state become ready {//reply any syn ,before state become ready
raw_info_t tmp_raw_info; raw_info_t tmp_raw_info;
@ -610,7 +612,7 @@ int server_on_raw_recv_multi() //called when server received an raw packet
} }
return 0; return 0;
} }
if(!conn_manager.exist(ip,port)) if(!conn_manager.exist(addr))
{ {
if(conn_manager.mp.size()>=max_handshake_conn_num) if(conn_manager.mp.size()>=max_handshake_conn_num)
{ {
@ -649,7 +651,7 @@ int server_on_raw_recv_multi() //called when server received an raw packet
mylog(log_info,"[%s]got packet from a new ip\n",ip_port); mylog(log_info,"[%s]got packet from a new ip\n",ip_port);
conn_info_t &conn_info=conn_manager.find_insert(ip,port); conn_info_t &conn_info=conn_manager.find_insert(addr);
conn_info.raw_info=tmp_raw_info; conn_info.raw_info=tmp_raw_info;
raw_info_t &raw_info=conn_info.raw_info; raw_info_t &raw_info=conn_info.raw_info;
@ -690,7 +692,7 @@ int server_on_raw_recv_multi() //called when server received an raw packet
conn_info_t & conn_info=conn_manager.find_insert(ip,port);//insert if not exist conn_info_t & conn_info=conn_manager.find_insert(addr);//insert if not exist
packet_info_t &send_info=conn_info.raw_info.send_info; packet_info_t &send_info=conn_info.raw_info.send_info;
packet_info_t &recv_info=conn_info.raw_info.recv_info; packet_info_t &recv_info=conn_info.raw_info.recv_info;
raw_info_t &raw_info=conn_info.raw_info; raw_info_t &raw_info=conn_info.raw_info;
@ -969,7 +971,6 @@ int server_on_raw_recv_pre_ready(conn_info_t &conn_info,char * ip_port,u32_t tmp
} }
conn_info.prepare(); conn_info.prepare();
conn_info.blob->conv_manager.s.additional_clear_function=server_clear_function;
conn_info.state.server_current_state = server_ready; conn_info.state.server_current_state = server_ready;
conn_info.oppsite_const_id=tmp_oppsite_const_id; conn_info.oppsite_const_id=tmp_oppsite_const_id;
conn_manager.ready_num++; conn_manager.ready_num++;
@ -1027,18 +1028,20 @@ int server_on_raw_recv_pre_ready(conn_info_t &conn_info,char * ip_port,u32_t tmp
conn_info.oppsite_const_id=0; conn_info.oppsite_const_id=0;
return 0; return 0;
} }
if(!conn_manager.exist(ori_conn_info.raw_info.recv_info.src_ip,ori_conn_info.raw_info.recv_info.src_port))//TODO remove this address_t addr1;addr1.from_ip_port(ori_conn_info.raw_info.recv_info.src_ip,ori_conn_info.raw_info.recv_info.src_port);
if(!conn_manager.exist(addr1))//TODO remove this
{ {
mylog(log_fatal,"[%s]this shouldnt happen\n",ip_port); mylog(log_fatal,"[%s]this shouldnt happen\n",ip_port);
myexit(-1); myexit(-1);
} }
if(!conn_manager.exist(conn_info.raw_info.recv_info.src_ip,conn_info.raw_info.recv_info.src_port))//TODO remove this address_t addr2;addr2.from_ip_port(conn_info.raw_info.recv_info.src_ip,conn_info.raw_info.recv_info.src_port);
if(!conn_manager.exist(addr2))//TODO remove this
{ {
mylog(log_fatal,"[%s]this shouldnt happen2\n",ip_port); mylog(log_fatal,"[%s]this shouldnt happen2\n",ip_port);
myexit(-1); myexit(-1);
} }
conn_info_t *&p_ori=conn_manager.find_insert_p(ori_conn_info.raw_info.recv_info.src_ip,ori_conn_info.raw_info.recv_info.src_port); conn_info_t *&p_ori=conn_manager.find_insert_p(addr1);
conn_info_t *&p=conn_manager.find_insert_p(conn_info.raw_info.recv_info.src_ip,conn_info.raw_info.recv_info.src_port); conn_info_t *&p=conn_manager.find_insert_p(addr2);
conn_info_t *tmp=p; conn_info_t *tmp=p;
p=p_ori; p=p_ori;
p_ori=tmp; p_ori=tmp;
@ -1227,7 +1230,7 @@ int client_event_loop()
//g_packet_info.src_ip=source_address_uint32; //g_packet_info.src_ip=source_address_uint32;
//g_packet_info.src_port=source_port; //g_packet_info.src_port=source_port;
udp_fd=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); udp_fd=socket(remote_addr.get_type(), SOCK_DGRAM, IPPROTO_UDP);
set_buf_size(udp_fd,socket_buf_size); set_buf_size(udp_fd,socket_buf_size);
int yes = 1; int yes = 1;
@ -1480,11 +1483,11 @@ int server_event_loop()
if(raw_mode==mode_faketcp) if(raw_mode==mode_faketcp)
{ {
bind_fd=socket(AF_INET,SOCK_STREAM,0); bind_fd=socket(local_addr.get_type(),SOCK_STREAM,0);
} }
else if(raw_mode==mode_udp||raw_mode==mode_icmp)//bind an adress to avoid collision,for icmp,there is no port,just bind a udp port else if(raw_mode==mode_udp||raw_mode==mode_icmp)//bind an adress to avoid collision,for icmp,there is no port,just bind a udp port
{ {
bind_fd=socket(AF_INET,SOCK_DGRAM,0); bind_fd=socket(local_addr.get_type(),SOCK_DGRAM,0);
} }
//struct sockaddr_in temp_bind_addr={0}; //struct sockaddr_in temp_bind_addr={0};