grab connection works

This commit is contained in:
wangyu 2017-07-30 05:53:30 +08:00 committed by wangyu
parent b03c5ebf14
commit b485efc4c3
4 changed files with 97 additions and 55 deletions

View File

@ -65,32 +65,29 @@ using namespace std;
const int max_data_len=65535; const int max_data_len=65535;
const int buf_len=max_data_len+200; const int buf_len=max_data_len+200;
const uint32_t max_handshake_conn_num=10000;
const uint32_t max_ready_conn_num=1000;
const uint32_t anti_replay_window_size=1000;
const int max_conv_num=10000;
const uint32_t client_handshake_timeout=3000; const uint32_t client_handshake_timeout=3000;
const uint32_t server_handshake_timeout=10000; const uint32_t server_handshake_timeout=10000;// this should be much longer than clients. client retry initially ,server retry passtively
const int conv_clear_ratio=10; //conv grabage collecter check 1/10 of all conv one time
const int conn_clear_ratio=10;
const uint32_t heartbeat_timeout=10000;
const uint32_t udp_timeout=3000;
const uint32_t heartbeat_interval=1000; const uint32_t heartbeat_interval=1000;
const uint32_t timer_interval=500; const uint32_t timer_interval=400;//this should be smaller than heartbeat_interval
const int RETRY_TIME=3; const int RETRY_TIME=3;
const uint32_t anti_replay_window_size=1000;
const int max_conv_num=10000;
const uint32_t conv_timeout=120000; //60 second const uint32_t conv_timeout=120000; //60 second
const int conv_clear_ratio=10;
const uint32_t max_handshake_conn_num=10000;
const uint32_t max_ready_conn_num=1000;
const uint32_t conn_timeout=conv_timeout+60000;
const int conn_clear_ratio=10;
const uint32_t client_conn_timeout=10000;
const uint32_t server_conn_timeout=conv_timeout+60000;//this should be 60s+ longer than conv_timeout,so that conv_manager can destruct convs gradually,to avoid latency glicth

View File

@ -269,7 +269,7 @@ int my_decrypt(const char *data,char *output,int &len,char * key)
if(len>max_data_len) return -1; if(len>max_data_len) return -1;
if(cipher_decrypt(data,output,len,key) !=0) {mylog(log_debug,"cipher_decrypt failed \n"); return -1;} if(cipher_decrypt(data,output,len,key) !=0) {mylog(log_debug,"cipher_decrypt failed \n"); return -1;}
if(auth_verify(output,len)!=0) {mylog(log_debug,"auth_verify failed ");return -1;} if(auth_verify(output,len)!=0) {mylog(log_debug,"auth_verify failed\n");return -1;}
return 0; return 0;
} }

123
main.cpp
View File

@ -305,14 +305,7 @@ struct conn_info_t
myexit(-1); myexit(-1);
return *this; return *this;
} }
~conn_info_t() ~conn_info_t();
{
if(conv_manager!=0)
delete conv_manager;
if(anti_replay!=0)
delete anti_replay;
//send_packet_info.protocol=g_packet_info_send.protocol;
}
};//g_conn_info; };//g_conn_info;
struct conn_manager_t struct conn_manager_t
@ -325,11 +318,10 @@ struct conn_manager_t
unordered_map<id_t,conn_info_t *> const_id_mp; unordered_map<id_t,conn_info_t *> const_id_mp;
unordered_map<uint64_t,conn_info_t>::iterator clear_it; unordered_map<uint64_t,conn_info_t*> mp; //put it at end so that it de-consturcts first
unordered_map<uint64_t,conn_info_t*>::iterator clear_it;
unordered_map<uint64_t,conn_info_t> mp; //put it at end so that it de-consturcts first
//uint32_t current_ready_ip;
//uint16_t current_ready_port;
conn_manager_t() conn_manager_t()
{ {
ready_num=0; ready_num=0;
@ -353,6 +345,7 @@ struct conn_manager_t
} }
return 0; return 0;
} }
/*
int insert(uint32_t ip,uint16_t port) int insert(uint32_t ip,uint16_t port)
{ {
uint64_t u64=0; uint64_t u64=0;
@ -361,31 +354,50 @@ struct conn_manager_t
u64|=port; u64|=port;
mp[u64]; mp[u64];
return 0; return 0;
} }*/
conn_info_t & find(uint32_t ip,uint16_t port) //be aware,the adress may change after rehash conn_info_t *& find_insert_p(uint32_t ip,uint16_t port) //be aware,the adress may change after rehash
{ {
uint64_t u64=0; uint64_t u64=0;
u64=ip; u64=ip;
u64<<=32u; u64<<=32u;
u64|=port; u64|=port;
unordered_map<uint64_t,conn_info_t*>::iterator it=mp.find(u64);
if(it==mp.end())
{
mp[u64]=new conn_info_t;
}
return mp[u64]; return mp[u64];
} }
int erase(unordered_map<uint64_t,conn_info_t>::iterator erase_it) conn_info_t & find_insert(uint32_t ip,uint16_t port) //be aware,the adress may change after rehash
{ {
if(erase_it->second.state.server_current_state==server_ready) uint64_t u64=0;
u64=ip;
u64<<=32u;
u64|=port;
unordered_map<uint64_t,conn_info_t*>::iterator it=mp.find(u64);
if(it==mp.end())
{
mp[u64]=new conn_info_t;
}
return *mp[u64];
}
int erase(unordered_map<uint64_t,conn_info_t*>::iterator erase_it)
{
if(erase_it->second->state.server_current_state==server_ready)
{ {
ready_num--; ready_num--;
const_id_mp.erase(erase_it->second.oppsite_const_id); const_id_mp.erase(erase_it->second->oppsite_const_id);
timer_fd_mp.erase(erase_it->second.timer_fd); timer_fd_mp.erase(erase_it->second->timer_fd);
close(erase_it->second.timer_fd);// close will auto delte it from epoll close(erase_it->second->timer_fd);// close will auto delte it from epoll
delete(erase_it->second);
mp.erase(erase_it->first); mp.erase(erase_it->first);
} }
return 0; return 0;
} }
int clear_inactive() int clear_inactive()
{ {
unordered_map<uint64_t,conn_info_t>::iterator it; unordered_map<uint64_t,conn_info_t*>::iterator it;
unordered_map<uint64_t,conn_info_t>::iterator old_it; unordered_map<uint64_t,conn_info_t*>::iterator old_it;
if(disable_conn_clear) return 0; if(disable_conn_clear) return 0;
@ -406,11 +418,11 @@ int clear_inactive()
it=mp.begin(); it=mp.begin();
} }
if(it->second.state.server_current_state==server_ready&& current_time - it->second.last_hb_recv_time <=conn_timeout ) if(it->second->state.server_current_state==server_ready&& current_time - it->second->last_hb_recv_time <=server_conn_timeout )
{ {
it++; it++;
} }
else if(it->second.state.server_current_state!=server_ready&& current_time - it->second.last_state_time <=server_handshake_timeout ) else if(it->second->state.server_current_state!=server_ready&& current_time - it->second->last_state_time <=server_handshake_timeout )
{ {
it++; it++;
} }
@ -428,6 +440,17 @@ int clear_inactive()
}conn_manager; }conn_manager;
conn_info_t::~conn_info_t()
{
if(oppsite_const_id!=0)
conn_manager.const_id_mp.erase(oppsite_const_id);
if(conv_manager!=0)
delete conv_manager;
if(anti_replay!=0)
delete anti_replay;
//send_packet_info.protocol=g_packet_info_send.protocol;
}
int TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT; int TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT;
////////==========================type divider======================================================= ////////==========================type divider=======================================================
@ -969,7 +992,7 @@ int keep_connection_client(conn_info_t &conn_info) //for client
if(conn_info.state.client_current_state==client_ready) if(conn_info.state.client_current_state==client_ready)
{ {
mylog(log_trace,"time %lld %lld\n",get_current_time(),conn_info.last_state_time); mylog(log_trace,"time %lld %lld\n",get_current_time(),conn_info.last_state_time);
if(get_current_time()-conn_info.last_hb_recv_time>heartbeat_timeout) if(get_current_time()-conn_info.last_hb_recv_time>client_conn_timeout)
{ {
conn_info.state.client_current_state=client_nothing; conn_info.state.client_current_state=client_nothing;
conn_info.my_id=get_true_random_number_nz(); conn_info.my_id=get_true_random_number_nz();
@ -1111,7 +1134,8 @@ int set_timer(int epollfd,int &timer_fd)
mylog(log_fatal,"timer_fd create error\n"); mylog(log_fatal,"timer_fd create error\n");
myexit(1); myexit(1);
} }
its.it_interval.tv_nsec=timer_interval*1000ll*1000ll; its.it_interval.tv_sec=(timer_interval/1000);
its.it_interval.tv_nsec=(timer_interval%1000)*1000ll*1000ll;
its.it_value.tv_nsec=1; //imidiately its.it_value.tv_nsec=1; //imidiately
timerfd_settime(timer_fd,0,&its,0); timerfd_settime(timer_fd,0,&its,0);
@ -1140,7 +1164,8 @@ int set_timer_server(int epollfd,int &timer_fd)
mylog(log_fatal,"timer_fd create error\n"); mylog(log_fatal,"timer_fd create error\n");
myexit(1); myexit(1);
} }
its.it_interval.tv_nsec=timer_interval*1000ll*1000ll; its.it_interval.tv_sec=(timer_interval/1000);
its.it_interval.tv_nsec=(timer_interval%1000)*1000ll*1000ll;
its.it_value.tv_nsec=1; //imidiately its.it_value.tv_nsec=1; //imidiately
timerfd_settime(timer_fd,0,&its,0); timerfd_settime(timer_fd,0,&its,0);
@ -1164,10 +1189,13 @@ int client_on_raw_recv(conn_info_t &conn_info)
raw_info_t &raw_info=conn_info.raw_info; raw_info_t &raw_info=conn_info.raw_info;
mylog(log_debug,"i m here\n");
if(conn_info.state.client_current_state==client_nothing )
{
recv_raw(raw_info,data,data_len);//todo change it to something else faster
}
if(conn_info.state.client_current_state==client_syn_sent ) if(conn_info.state.client_current_state==client_syn_sent )
{ {
mylog(log_debug,"i m here3\n");
if(recv_bare(raw_info,data,data_len)!=0) if(recv_bare(raw_info,data,data_len)!=0)
{ {
@ -1205,7 +1233,6 @@ int client_on_raw_recv(conn_info_t &conn_info)
if(conn_info.state.client_current_state==client_ack_sent ) if(conn_info.state.client_current_state==client_ack_sent )
{ {
mylog(log_debug,"i m here2\n");
if(recv_bare(raw_info,data,data_len)!=0) if(recv_bare(raw_info,data,data_len)!=0)
{ {
mylog(log_debug,"recv_bare failed!\n"); mylog(log_debug,"recv_bare failed!\n");
@ -1379,7 +1406,7 @@ int server_on_raw_ready(conn_info_t &conn_info)
if (data[0] == 'h' && data_len == 1) { if (data[0] == 'h' && data_len == 1) {
uint32_t tmp = ntohl(*((uint32_t *) &data[1 + sizeof(uint32_t)])); uint32_t tmp = ntohl(*((uint32_t *) &data[1 + sizeof(uint32_t)]));
mylog(log_debug, "received hb <%x,%x>\n", conn_info.oppsite_id, tmp); mylog(log_debug,"[%s:%d]received hb \n",my_ntoa(recv_info.src_ip),recv_info.src_port);
conn_info.last_hb_recv_time = current_time_rough; conn_info.last_hb_recv_time = current_time_rough;
return 0; return 0;
} else if (data[0] == 'd' && data_len >=int( sizeof(uint32_t) + 1)) { } else if (data[0] == 'd' && data_len >=int( sizeof(uint32_t) + 1)) {
@ -1387,7 +1414,7 @@ int server_on_raw_ready(conn_info_t &conn_info)
conn_info.last_hb_recv_time = current_time_rough; conn_info.last_hb_recv_time = current_time_rough;
mylog(log_debug, "<<<<conv:%u>>>>\n", tmp_conv_id); mylog(log_trace, "<<<<conv:%u>>>>\n", tmp_conv_id);
if (!conn_info.conv_manager->is_conv_used(tmp_conv_id)) { if (!conn_info.conv_manager->is_conv_used(tmp_conv_id)) {
if (conn_info.conv_manager->get_size() >= max_conv_num) { if (conn_info.conv_manager->get_size() >= max_conv_num) {
mylog(log_warn, mylog(log_warn,
@ -1453,11 +1480,11 @@ int server_on_raw_ready(conn_info_t &conn_info)
int fd = int((u64 << 32u) >> 32u); int fd = int((u64 << 32u) >> 32u);
mylog(log_debug, "received a data from fake tcp,len:%d\n", data_len); mylog(log_trace, "received a data from fake tcp,len:%d\n", data_len);
int ret = send(fd, data + 1 + sizeof(uint32_t), int ret = send(fd, data + 1 + sizeof(uint32_t),
data_len - (1 + sizeof(uint32_t)), 0); data_len - (1 + sizeof(uint32_t)), 0);
mylog(log_debug, "%d byte sent ,fd :%d\n ", ret, fd); mylog(log_trace, "%d byte sent ,fd :%d\n ", ret, fd);
if (ret < 0) { if (ret < 0) {
mylog(log_warn, "send returned %d\n", ret); mylog(log_warn, "send returned %d\n", ret);
//perror("what happened????"); //perror("what happened????");
@ -1506,7 +1533,7 @@ int server_on_raw_recv_multi()
return 0; return 0;
} }
} }
conn_info_t &conn_info=conn_manager.find(ip,port); conn_info_t &conn_info=conn_manager.find_insert(ip,port);
conn_info.raw_info=tmp_raw_info; conn_info.raw_info=tmp_raw_info;
packet_info_t &send_info=conn_info.raw_info.send_info; packet_info_t &send_info=conn_info.raw_info.send_info;
packet_info_t &recv_info=conn_info.raw_info.recv_info; packet_info_t &recv_info=conn_info.raw_info.recv_info;
@ -1560,7 +1587,7 @@ int server_on_raw_recv_multi()
mylog(log_info,"reached max_handshake_conn_num,ignored new handshake\n"); mylog(log_info,"reached max_handshake_conn_num,ignored new handshake\n");
return 0; return 0;
} }
conn_info_t & conn_info=conn_manager.find(ip,port);//insert if not exist conn_info_t & conn_info=conn_manager.find_insert(ip,port);//insert if not exist
packet_info_t &send_info=conn_info.raw_info.send_info; packet_info_t &send_info=conn_info.raw_info.send_info;
packet_info_t &recv_info=conn_info.raw_info.recv_info; packet_info_t &recv_info=conn_info.raw_info.recv_info;
@ -1615,7 +1642,7 @@ int server_on_raw_recv_multi()
return 0; return 0;
} }
mylog(log_debug,"!!!\n"); //mylog(log_debug,"!!!\n");
uint32_t tmp_session_id= ntohl(* ((uint32_t *)&data[sizeof(id_t)])); uint32_t tmp_session_id= ntohl(* ((uint32_t *)&data[sizeof(id_t)]));
uint32_t tmp_oppsite_const_id=ntohl(* ((uint32_t *)&data[sizeof(id_t)*2])); uint32_t tmp_oppsite_const_id=ntohl(* ((uint32_t *)&data[sizeof(id_t)*2]));
@ -1653,6 +1680,7 @@ int server_on_raw_recv_multi()
conn_info.state.server_current_state = server_ready; conn_info.state.server_current_state = server_ready;
conn_info.oppsite_const_id=tmp_oppsite_const_id; conn_info.oppsite_const_id=tmp_oppsite_const_id;
conn_manager.ready_num++; conn_manager.ready_num++;
conn_manager.const_id_mp[tmp_oppsite_const_id]=&conn_info;
//conn_info.last_state_time=get_current_time(); //dont change this!!!!!!!!!!!!!!!!!!!!!!!!! //conn_info.last_state_time=get_current_time(); //dont change this!!!!!!!!!!!!!!!!!!!!!!!!!
@ -1697,19 +1725,36 @@ int server_on_raw_recv_multi()
{ {
mylog(log_info,"conn_info.last_state_time<ori_conn_info.last_state_time. ignored new handshake\n"); mylog(log_info,"conn_info.last_state_time<ori_conn_info.last_state_time. ignored new handshake\n");
conn_info.state.server_current_state=server_nothing; conn_info.state.server_current_state=server_nothing;
conn_info.oppsite_const_id=0;
return 0; return 0;
} }
conn_info_t *&p_ori=conn_manager.find_insert_p(ori_conn_info.raw_info.recv_info.src_ip,ori_conn_info.raw_info.recv_info.src_port);
conn_info_t *&p=conn_manager.find_insert_p(conn_info.raw_info.recv_info.src_ip,conn_info.raw_info.recv_info.src_port);
conn_info_t *tmp=p;
p=p_ori;
p_ori=tmp;
mylog(log_info,"grabbed a connection\n"); mylog(log_info,"grabbed a connection\n");
//ori_conn_info.anti_replay
ori_conn_info.state.server_current_state=server_ready;
//ori_conn_info.state.server_current_state=server_ready;
ori_conn_info.raw_info=conn_info.raw_info; ori_conn_info.raw_info=conn_info.raw_info;
ori_conn_info.last_state_time=conn_info.last_state_time; ori_conn_info.last_state_time=conn_info.last_state_time;
ori_conn_info.last_hb_recv_time=conn_info.last_hb_recv_time; ori_conn_info.last_hb_recv_time=conn_info.last_hb_recv_time;
ori_conn_info.last_hb_sent_time=conn_info.last_hb_sent_time; ori_conn_info.last_hb_sent_time=conn_info.last_hb_sent_time;
ori_conn_info.my_id=conn_info.my_id; ori_conn_info.my_id=conn_info.my_id;
ori_conn_info.oppsite_id=conn_info.oppsite_id; ori_conn_info.oppsite_id=conn_info.oppsite_id;
send_safer(ori_conn_info, (char *) "h", 1);
ori_conn_info.anti_replay->re_init();
conn_info.state.server_current_state=server_nothing; conn_info.state.server_current_state=server_nothing;
conn_info.oppsite_const_id=0;
} }
else else
{ {
@ -2370,7 +2415,7 @@ int server_event_loop()
int recv_len=recv(fd,buf,buf_len,0); int recv_len=recv(fd,buf,buf_len,0);
mylog(log_debug,"received a packet from udp_fd,len:%d\n",recv_len); mylog(log_trace,"received a packet from udp_fd,len:%d\n",recv_len);
if(recv_len<0) if(recv_len<0)
{ {

View File

@ -514,7 +514,7 @@ int send_raw_tcp(raw_info_t &raw_info,const char * payload, int payloadlen) {
packet_info_t &send_info=raw_info.send_info; packet_info_t &send_info=raw_info.send_info;
packet_info_t &recv_info=raw_info.recv_info; packet_info_t &recv_info=raw_info.recv_info;
mylog(log_debug,"syn %d\n",send_info.syn); //mylog(log_debug,"syn %d\n",send_info.syn);
char send_raw_tcp_buf0[buf_len]; char send_raw_tcp_buf0[buf_len];
char *send_raw_tcp_buf=send_raw_tcp_buf0; char *send_raw_tcp_buf=send_raw_tcp_buf0;