From 36445720bbd59d58fc965770c5b1b3a42712c405 Mon Sep 17 00:00:00 2001 From: wangyu- Date: Mon, 25 Sep 2017 10:40:01 -0500 Subject: [PATCH] just commit3 --- connection.cpp | 345 ++++++++++++++++++++++++++++++++++++++++++++++ connection.h | 114 +++++++++++++++ delay_manager.cpp | 111 +++++++++++++++ delay_manager.h | 51 +++++++ 4 files changed, 621 insertions(+) create mode 100644 connection.cpp create mode 100644 connection.h create mode 100644 delay_manager.cpp create mode 100644 delay_manager.h diff --git a/connection.cpp b/connection.cpp new file mode 100644 index 0000000..e6e8dd8 --- /dev/null +++ b/connection.cpp @@ -0,0 +1,345 @@ +/* + * connection.cpp + * + * Created on: Sep 23, 2017 + * Author: root + */ + +#include "connection.h" + +int disable_anti_replay=0;//if anti_replay windows is diabled + +const int disable_conv_clear=0;//a udp connection in the multiplexer is called conversation in this program,conv for short. + +const int disable_conn_clear=0;//a raw connection is called conn. + +conn_manager_t conn_manager; + +void server_clear_function(u64_t u64); + +conv_manager_t::conv_manager_t() + { + clear_it=conv_last_active_time.begin(); + long long last_clear_time=0; + //clear_function=0; + } +conv_manager_t::~conv_manager_t() + { + clear(); + } + int conv_manager_t::get_size() + { + return conv_to_u64.size(); + } + void conv_manager_t::reserve() + { + u64_to_conv.reserve(10007); + conv_to_u64.reserve(10007); + conv_last_active_time.reserve(10007); + } + void conv_manager_t::clear() + { + if(disable_conv_clear) return ; + + if(program_mode==server_mode) + { + for(it=conv_to_u64.begin();it!=conv_to_u64.end();it++) + { + //int fd=int((it->second<<32u)>>32u); + server_clear_function( it->second); + } + } + u64_to_conv.clear(); + conv_to_u64.clear(); + conv_last_active_time.clear(); + + clear_it=conv_last_active_time.begin(); + + } + u32_t conv_manager_t::get_new_conv() + { + u32_t conv=get_true_random_number_nz(); + while(conv_to_u64.find(conv)!=conv_to_u64.end()) + { + conv=get_true_random_number_nz(); + } + return conv; + } + int conv_manager_t::is_conv_used(u32_t conv) + { + return conv_to_u64.find(conv)!=conv_to_u64.end(); + } + int conv_manager_t::is_u64_used(u64_t u64) + { + return u64_to_conv.find(u64)!=u64_to_conv.end(); + } + u32_t conv_manager_t::find_conv_by_u64(u64_t u64) + { + return u64_to_conv[u64]; + } + u64_t conv_manager_t::find_u64_by_conv(u32_t conv) + { + return conv_to_u64[conv]; + } + int conv_manager_t::update_active_time(u32_t conv) + { + return conv_last_active_time[conv]=get_current_time(); + } + int conv_manager_t::insert_conv(u32_t conv,u64_t u64) + { + u64_to_conv[u64]=conv; + conv_to_u64[conv]=u64; + conv_last_active_time[conv]=get_current_time(); + return 0; + } + int conv_manager_t::erase_conv(u32_t conv) + { + if(disable_conv_clear) return 0; + u64_t u64=conv_to_u64[conv]; + if(program_mode==server_mode) + { + server_clear_function(u64); + } + conv_to_u64.erase(conv); + u64_to_conv.erase(u64); + conv_last_active_time.erase(conv); + return 0; + } + int conv_manager_t::clear_inactive(char * ip_port) + { + if(get_current_time()-last_clear_time>conv_clear_interval) + { + last_clear_time=get_current_time(); + return clear_inactive0(ip_port); + } + return 0; + } + int conv_manager_t::clear_inactive0(char * ip_port) + { + if(disable_conv_clear) return 0; + + + //map::iterator it; + int cnt=0; + it=clear_it; + int size=conv_last_active_time.size(); + int num_to_clean=size/conv_clear_ratio+conv_clear_min; //clear 1/10 each time,to avoid latency glitch + + num_to_clean=min(num_to_clean,size); + + u64_t current_time=get_current_time(); + for(;;) + { + if(cnt>=num_to_clean) break; + if(conv_last_active_time.begin()==conv_last_active_time.end()) break; + + if(it==conv_last_active_time.end()) + { + it=conv_last_active_time.begin(); + } + + if( current_time -it->second >conv_timeout ) + { + //mylog(log_info,"inactive conv %u cleared \n",it->first); + old_it=it; + it++; + u32_t conv= old_it->first; + erase_conv(old_it->first); + if(ip_port==0) + { + mylog(log_info,"conv %x cleared\n",conv); + } + else + { + mylog(log_info,"[%s]conv %x cleared\n",ip_port,conv); + } + } + else + { + it++; + } + cnt++; + } + return 0; + } + + conn_manager_t::conn_manager_t() + { + ready_num=0; + mp.reserve(10007); + clear_it=mp.begin(); + timer_fd_mp.reserve(10007); + const_id_mp.reserve(10007); + udp_fd_mp.reserve(100007); + last_clear_time=0; + //current_ready_ip=0; + // current_ready_port=0; + } + int conn_manager_t::exist(u32_t ip,uint16_t port) + { + u64_t u64=0; + u64=ip; + u64<<=32u; + u64|=port; + if(mp.find(u64)!=mp.end()) + { + return 1; + } + return 0; + } + /* + int insert(uint32_t ip,uint16_t port) + { + uint64_t u64=0; + u64=ip; + u64<<=32u; + u64|=port; + mp[u64]; + return 0; + }*/ + conn_info_t *& conn_manager_t::find_insert_p(u32_t ip,uint16_t port) //be aware,the adress may change after rehash + { + u64_t u64=0; + u64=ip; + u64<<=32u; + u64|=port; + unordered_map::iterator it=mp.find(u64); + if(it==mp.end()) + { + mp[u64]=new conn_info_t; + } + return mp[u64]; + } + conn_info_t & conn_manager_t::find_insert(u32_t ip,uint16_t port) //be aware,the adress may change after rehash + { + u64_t u64=0; + u64=ip; + u64<<=32u; + u64|=port; + unordered_map::iterator it=mp.find(u64); + if(it==mp.end()) + { + mp[u64]=new conn_info_t; + } + return *mp[u64]; + } + int conn_manager_t::erase(unordered_map::iterator erase_it) + { + if(erase_it->second->state.server_current_state==server_ready) + { + ready_num--; + assert(i32_t(ready_num)!=-1); + assert(erase_it->second!=0); + assert(erase_it->second->timer_fd !=0); + assert(erase_it->second->oppsite_const_id!=0); + assert(const_id_mp.find(erase_it->second->oppsite_const_id)!=const_id_mp.end()); + assert(timer_fd_mp.find(erase_it->second->timer_fd)!=timer_fd_mp.end()); + + const_id_mp.erase(erase_it->second->oppsite_const_id); + timer_fd_mp.erase(erase_it->second->timer_fd); + close(erase_it->second->timer_fd);// close will auto delte it from epoll + delete(erase_it->second); + mp.erase(erase_it->first); + } + else + { + assert(erase_it->second->blob==0); + assert(erase_it->second->timer_fd ==0); + assert(erase_it->second->oppsite_const_id==0); + delete(erase_it->second); + mp.erase(erase_it->first); + } + return 0; + } +int conn_manager_t::clear_inactive() +{ + if(get_current_time()-last_clear_time>conn_clear_interval) + { + last_clear_time=get_current_time(); + return clear_inactive0(); + } + return 0; +} +int conn_manager_t::clear_inactive0() +{ + unordered_map::iterator it; + unordered_map::iterator old_it; + + if(disable_conn_clear) return 0; + + //map::iterator it; + int cnt=0; + it=clear_it; + int size=mp.size(); + int num_to_clean=size/conn_clear_ratio+conn_clear_min; //clear 1/10 each time,to avoid latency glitch + + mylog(log_trace,"mp.size() %d\n", size); + + num_to_clean=min(num_to_clean,(int)mp.size()); + u64_t current_time=get_current_time(); + + for(;;) + { + if(cnt>=num_to_clean) break; + if(mp.begin()==mp.end()) break; + + if(it==mp.end()) + { + it=mp.begin(); + } + + if(it->second->state.server_current_state==server_ready &¤t_time - it->second->last_hb_recv_time <=server_conn_timeout) + { + it++; + } + else if(it->second->state.server_current_state!=server_ready&& current_time - it->second->last_state_time <=server_handshake_timeout ) + { + it++; + } + else if(it->second->blob!=0&&it->second->blob->conv_manager.get_size() >0) + { + assert(it->second->state.server_current_state==server_ready); + it++; + } + else + { + mylog(log_info,"[%s:%d]inactive conn cleared \n",my_ntoa(it->second->raw_info.recv_info.src_ip),it->second->raw_info.recv_info.src_port); + old_it=it; + it++; + erase(old_it); + } + cnt++; + } + return 0; +} + + +void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection), +//so we have to close the fd when conv expires +{ + int fd=int(u64); + int ret; + assert(fd!=0); + /* + epoll_event ev; + + ev.events = EPOLLIN; + ev.data.u64 = u64; + + ret = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev); + if (ret!=0) + { + mylog(log_fatal,"fd:%d epoll delete failed!!!!\n",fd); + myexit(-1); //this shouldnt happen + }*/ //no need + ret= close(fd); //closed fd should be auto removed from epoll + + if (ret!=0) + { + mylog(log_fatal,"close fd %d failed !!!!\n",fd); + myexit(-1); //this shouldnt happen + } + //mylog(log_fatal,"size:%d !!!!\n",conn_manager.udp_fd_mp.size()); + assert(conn_manager.udp_fd_mp.find(fd)!=conn_manager.udp_fd_mp.end()); + conn_manager.udp_fd_mp.erase(fd); +} diff --git a/connection.h b/connection.h new file mode 100644 index 0000000..a4dfea7 --- /dev/null +++ b/connection.h @@ -0,0 +1,114 @@ +/* + * connection.h + * + * Created on: Sep 23, 2017 + * Author: root + */ + +#ifndef CONNECTION_H_ +#define CONNECTION_H_ + +extern int disable_anti_replay; + +#include "connection.h" +#include "common.h" +#include "log.h" +#include "delay_manager.h" + + + +struct anti_replay_t //its for anti replay attack,similar to openvpn/ipsec 's anti replay window +{ + u64_t max_packet_received; + char window[anti_replay_window_size]; + anti_replay_seq_t anti_replay_seq; + anti_replay_seq_t get_new_seq_for_send(); + anti_replay_t(); + void re_init(); + + int is_vaild(u64_t seq); +};//anti_replay; + + +struct conv_manager_t // manage the udp connections +{ + //typedef hash_map map; + unordered_map u64_to_conv; //conv and u64 are both supposed to be uniq + unordered_map conv_to_u64; + + unordered_map conv_last_active_time; + + unordered_map::iterator clear_it; + + unordered_map::iterator it; + unordered_map::iterator old_it; + + //void (*clear_function)(uint64_t u64) ; + + long long last_clear_time; + + conv_manager_t(); + ~conv_manager_t(); + int get_size(); + void reserve(); + void clear(); + u32_t get_new_conv(); + int is_conv_used(u32_t conv); + int is_u64_used(u64_t u64); + u32_t find_conv_by_u64(u64_t u64); + u64_t find_u64_by_conv(u32_t conv); + int update_active_time(u32_t conv); + int insert_conv(u32_t conv,u64_t u64); + int erase_conv(u32_t conv); + int clear_inactive(char * ip_port=0); + int clear_inactive0(char * ip_port); +};//g_conv_manager; + +struct conn_info_t //stores info for a raw connection.for client ,there is only one connection,for server there can be thousand of connection since server can +//handle multiple clients +{ + conv_manager_t conv_manager; + anti_replay_t anti_replay; +};//g_conn_info; + +struct conn_manager_t //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections +{ + + u32_t ready_num; + + unordered_map udp_fd_mp; //a bit dirty to used pointer,but can void unordered_map search + unordered_map timer_fd_mp;//we can use pointer here since unordered_map.rehash() uses shallow copy + + unordered_map const_id_mp; + + unordered_map mp; //put it at end so that it de-consturcts first + + unordered_map::iterator clear_it; + + long long last_clear_time; + + conn_manager_t(); + int exist(u32_t ip,uint16_t port); + /* + int insert(uint32_t ip,uint16_t port) + { + uint64_t u64=0; + u64=ip; + u64<<=32u; + u64|=port; + mp[u64]; + return 0; + }*/ + conn_info_t *& find_insert_p(u32_t ip,uint16_t port); //be aware,the adress may change after rehash + conn_info_t & find_insert(u32_t ip,uint16_t port) ; //be aware,the adress may change after rehash + + int erase(unordered_map::iterator erase_it); +int clear_inactive(); +int clear_inactive0(); + +}; + +extern conn_manager_t conn_manager; + + +#endif /* CONNECTION_H_ */ diff --git a/delay_manager.cpp b/delay_manager.cpp new file mode 100644 index 0000000..1a8360e --- /dev/null +++ b/delay_manager.cpp @@ -0,0 +1,111 @@ +/* + * delay_manager.cpp + * + * Created on: Sep 15, 2017 + * Author: root + */ +#include "delay_manager.h" +#include "log.h" +#include "packet.h" + +int delay_data_t::handle() +{ + return my_send(dest,data,len)>=0; +} + + +delay_manager_t::delay_manager_t() +{ + capacity=0; + + if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0) + { + mylog(log_fatal,"timer_fd create error"); + myexit(1); + } + + itimerspec zero_its; + memset(&zero_its, 0, sizeof(zero_its)); + + timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0); + +} +delay_manager_t::~delay_manager_t() +{ + //TODO ,we currently dont need to deconstruct it +} +/* +int delay_manager_t::get_timer_fd() +{ + return delay_timer_fd; +}*/ + +int delay_manager_t::add(my_time_t delay,delay_data_t &delay_data) +{ + if(capacity!=0&&int(delay_mp.size()) >=capacity) + { + mylog(log_warn,"max pending packet reached,ignored\n"); + return -1; + } + if(delay==0) + { + int ret=delay_data.handle(); + if (ret != 0) { + mylog(log_debug, "handle() return %d\n", ret); + } + return 0; + } + + delay_data_t tmp=delay_data; + tmp.data=(char *)malloc(delay_data.len); + + memcpy(tmp.data,delay_data.data,delay_data.len); + + my_time_t tmp_time=get_current_time_us(); + tmp_time+=delay; + + delay_mp.insert(make_pair(tmp_time,tmp)); + + return 0; +} + +int delay_manager_t::check() +{ + if(!delay_mp.empty()) + { + my_time_t current_time; + + multimap::iterator it; + while(1) + { + int ret=0; + it=delay_mp.begin(); + if(it==delay_mp.end()) break; + + current_time=get_current_time_us(); + if(it->first <= current_time) + { + ret=it->second.handle(); + if (ret != 0) { + mylog(log_debug, "handle() return %d\n", ret); + } + free(it->second.data); + delay_mp.erase(it); + } + else + { + break; + } + + } + if(!delay_mp.empty()) + { + itimerspec its; + memset(&its.it_interval,0,sizeof(its.it_interval)); + its.it_value.tv_sec=delay_mp.begin()->first/1000000llu; + its.it_value.tv_nsec=(delay_mp.begin()->first%1000000llu)*1000llu; + timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0); + } + } + return 0; +} diff --git a/delay_manager.h b/delay_manager.h new file mode 100644 index 0000000..93afdcc --- /dev/null +++ b/delay_manager.h @@ -0,0 +1,51 @@ +/* + * delay_manager.h + * + * Created on: Sep 15, 2017 + * Author: root + */ + +#ifndef DELAY_MANAGER_H_ +#define DELAY_MANAGER_H_ + +#include "common.h" +#include "packet.h" + +//enum delay_type_t {none=0,enum_sendto_u64,enum_send_fd,client_to_local,client_to_remote,server_to_local,server_to_remote}; + +/* +struct fd_ip_port_t +{ + int fd; + u32_t ip; + u32_t port; +}; +union dest_t +{ + fd_ip_port_t fd_ip_port; + int fd; + u64_t u64; +}; +*/ +struct delay_data_t +{ + dest_t dest; + //int left_time;// + char * data; + int len; + int handle(); +}; + +struct delay_manager_t +{ + int timer_fd; + int capacity; + multimap delay_mp; //unit us,1 us=0.001ms + delay_manager_t(); + ~delay_manager_t(); + //int get_timer_fd(); + int check(); + int add(my_time_t delay,delay_data_t &delay_data); +}; + +#endif /* DELAY_MANAGER_H_ */