diff --git a/common.cpp b/common.cpp index 52156bb..46a4d08 100644 --- a/common.cpp +++ b/common.cpp @@ -833,21 +833,6 @@ int create_new_udp(int &new_udp_fd,int remote_address_uint32,int remote_port) } return 0; }*/ -void ip_port_t::from_u64(u64_t u64) -{ - ip=get_u64_h(u64); - port=get_u64_l(u64); -} -u64_t ip_port_t::to_u64() -{ - return pack_u64(ip,port); -} -char * ip_port_t::to_s() -{ - static char res[40]; - sprintf(res,"%s:%d",my_ntoa(ip),port); - return res; -} int round_up_div(int a,int b) { diff --git a/common.h b/common.h index 64a2f82..3d03384 100644 --- a/common.h +++ b/common.h @@ -224,10 +224,11 @@ struct address_t //TODO scope id }; storage_t inner; - address_t() + /*address_t() { clear(); - } + }*/ + void clear() { memset(&inner,0,sizeof(inner)); @@ -440,4 +441,110 @@ int new_connected_socket(int &fd,u32_t ip,int port); int new_listen_socket2(int &fd,address_t &addr); int new_connected_socket2(int &fd,address_t &addr); +struct not_copy_able_t +{ + not_copy_able_t() + { + + } + not_copy_able_t(const not_copy_able_t &other) + { + assert(0==1); + } + const not_copy_able_t & operator=(const not_copy_able_t &other) + { + assert(0==1); + return other; + } +}; + + +template +struct lru_collector_t:not_copy_able_t +{ + //typedef void* key_t; +//#define key_t void* + struct lru_pair_t + { + key_t key; + my_time_t ts; + }; + + unordered_map::iterator> mp; + + list q; + int update(key_t key) + { + assert(mp.find(key)!=mp.end()); + auto it=mp[key]; + q.erase(it); + + my_time_t value=get_current_time(); + if(!q.empty()) + { + assert(value >=q.front().ts); + } + lru_pair_t tmp; tmp.key=key; tmp.ts=value; + q.push_front( tmp); + mp[key]=q.begin(); + + return 0; + } + int new_key(key_t key) + { + assert(mp.find(key)==mp.end()); + + my_time_t value=get_current_time(); + if(!q.empty()) + { + assert(value >=q.front().ts); + } + lru_pair_t tmp; tmp.key=key; tmp.ts=value; + q.push_front( tmp); + mp[key]=q.begin(); + + return 0; + } + int size() + { + return q.size(); + } + int empty() + { + return q.empty(); + } + void clear() + { + mp.clear(); q.clear(); + } + my_time_t ts_of(key_t key) + { + assert(mp.find(key)!=mp.end()); + return mp[key]->ts; + } + + my_time_t peek_back(key_t &key) + { + assert(!q.empty()); + auto it=q.end(); it--; + key=it->key; + return it->ts; + } + void erase(key_t key) + { + assert(mp.find(key)!=mp.end()); + q.erase(mp[key]); + mp.erase(key); + } + /* + void erase_back() + { + assert(!q.empty()); + auto it=q.end(); it--; + key_t key=it->key; + erase(key); + }*/ +}; + + #endif /* COMMON_H_ */ diff --git a/connection.cpp b/connection.cpp index bbbab16..8ad4b9a 100644 --- a/connection.cpp +++ b/connection.cpp @@ -7,7 +7,7 @@ #include "connection.h" -const int disable_conv_clear=0;//a udp connection in the multiplexer is called conversation in this program,conv for short. +//const int disable_conv_clear=0;//a udp connection in the multiplexer is called conversation in this program,conv for short. const int disable_conn_clear=0;//a raw connection is called conn. @@ -20,230 +20,76 @@ void server_clear_function(u64_t u64)//used in conv_manager in server mode.for s assert(fd_manager.exist(fd64)); ev_io &watcher= fd_manager.get_info(fd64).io_watcher; - ip_port_t &ip_port=fd_manager.get_info(fd64).ip_port;// - assert(conn_manager.exist(ip_port));// - ev_loop *loop =conn_manager.find(ip_port).loop; // overkill ? should we just use ev_default_loop(0)? + address_t &addr=fd_manager.get_info(fd64).addr;// + assert(conn_manager.exist(addr));// + ev_loop *loop =conn_manager.find_insert(addr).loop; // overkill ? should we just use ev_default_loop(0)? ev_io_stop(loop,&watcher); fd_manager.fd64_close(fd64); - -} - -//////////////////////////////////////////////////////////////////// - -conv_manager_t::conv_manager_t() -{ - clear_it=conv_last_active_time.begin(); - long long last_clear_time=0; - reserve(); -} -conv_manager_t::~conv_manager_t() -{ - clear(); -} -int conv_manager_t::get_size() -{ - return conv_to_u64.size(); -} -void conv_manager_t::reserve() -{ - u64_to_conv.reserve(10007); - conv_to_u64.reserve(10007); - conv_last_active_time.reserve(10007); -} -void conv_manager_t::clear() -{ - //if(disable_conv_clear) return ;//////what was the purpose of this code? - - if(client_or_server==server_mode) - { - for(auto it=conv_to_u64.begin();it!=conv_to_u64.end();it++) - { - //int fd=int((it->second<<32u)>>32u); - server_clear_function( it->second); - } - } - u64_to_conv.clear(); - conv_to_u64.clear(); - conv_last_active_time.clear(); - - clear_it=conv_last_active_time.begin(); - -} -u32_t conv_manager_t::get_new_conv() -{ - u32_t conv=get_fake_random_number_nz(); - while(conv_to_u64.find(conv)!=conv_to_u64.end()) - { - conv=get_fake_random_number_nz(); - } - return conv; -} -int conv_manager_t::is_conv_used(u32_t conv) -{ - return conv_to_u64.find(conv)!=conv_to_u64.end(); -} -int conv_manager_t::is_u64_used(u64_t u64) -{ - return u64_to_conv.find(u64)!=u64_to_conv.end(); -} -u32_t conv_manager_t::find_conv_by_u64(u64_t u64) -{ - assert(is_u64_used(u64)); - return u64_to_conv[u64]; -} -u64_t conv_manager_t::find_u64_by_conv(u32_t conv) -{ - assert(is_conv_used(conv)); - return conv_to_u64[conv]; -} -int conv_manager_t::update_active_time(u32_t conv) -{ - assert(is_conv_used(conv)); - return conv_last_active_time[conv]=get_current_time(); -} -int conv_manager_t::insert_conv(u32_t conv,u64_t u64)//////todo add capacity ///done at upper level -{ - assert(!is_conv_used(conv)); - int bucket_size_before=conv_last_active_time.bucket_count(); - u64_to_conv[u64]=conv; - conv_to_u64[conv]=u64; - conv_last_active_time[conv]=get_current_time(); - int bucket_size_after=conv_last_active_time.bucket_count(); - if(bucket_size_after!=bucket_size_before) - clear_it=conv_last_active_time.begin(); - return 0; -} -int conv_manager_t::erase_conv(u32_t conv) -{ - //if(disable_conv_clear) return 0; - assert(conv_last_active_time.find(conv)!=conv_last_active_time.end()); - u64_t u64=conv_to_u64[conv]; - if(client_or_server==server_mode) - { - server_clear_function(u64); - } - assert(conv_to_u64.find(conv)!=conv_to_u64.end()); - conv_to_u64.erase(conv); - u64_to_conv.erase(u64); - conv_last_active_time.erase(conv); - return 0; -} -int conv_manager_t::clear_inactive(char * ip_port) -{ - if(get_current_time()-last_clear_time>conv_clear_interval) - { - last_clear_time=get_current_time(); - return clear_inactive0(ip_port); - } - return 0; -} -int conv_manager_t::clear_inactive0(char * ip_port) -{ - if(disable_conv_clear) return 0; - - //map::iterator it; - int cnt=0; - auto it=clear_it; - int size=conv_last_active_time.size(); - int num_to_clean=size/conv_clear_ratio+conv_clear_min; //clear 1/10 each time,to avoid latency glitch - - num_to_clean=min(num_to_clean,size); - - u64_t current_time=get_current_time(); - for(;;) - { - if(cnt>=num_to_clean) break; - if(conv_last_active_time.begin()==conv_last_active_time.end()) break; - - if(it==conv_last_active_time.end()) - { - it=conv_last_active_time.begin(); - } - - if( current_time -it->second >conv_timeout ) - { - //mylog(log_info,"inactive conv %u cleared \n",it->first); - //auto old_it=it; - //it++; - u32_t conv= it->first; - it++; - erase_conv(conv); - if(ip_port==0) - { - mylog(log_info,"conv %x cleared\n",conv); - } - else - { - mylog(log_info,"[%s]conv %x cleared\n",ip_port,conv); - } - } - else - { - it++; - } - cnt++; - } - clear_it=it; - return 0; } //////////////////////////////////////////////////////////////////// - conn_manager_t::conn_manager_t() { - //ready_num=0; - mp.reserve(10007); - //fd64_mp.reserve(100007); - clear_it=mp.begin(); - last_clear_time=0; + mp.reserve(10007); + last_clear_time=0; } -int conn_manager_t::exist(ip_port_t ip_port) +int conn_manager_t::exist(address_t addr) { - u64_t u64=ip_port.to_u64(); - if(mp.find(u64)!=mp.end()) - { - return 1; - } + + if(mp.find(addr)!=mp.end()) + { + return 1; + } + return 0; +} + +conn_info_t *& conn_manager_t::find_insert_p(address_t addr) //be aware,the adress may change after rehash +{ +// u64_t u64=0; + //u64=ip; + //u64<<=32u; + //u64|=port; + unordered_map::iterator it=mp.find(addr); + if(it==mp.end()) + { + mp[addr]=new conn_info_t; + //lru.new_key(addr); + } + else + { + //lru.update(addr); + } + return mp[addr]; +} +conn_info_t & conn_manager_t::find_insert(address_t addr) //be aware,the adress may change after rehash +{ + //u64_t u64=0; + //u64=ip; + //u64<<=32u; + //u64|=port; + unordered_map::iterator it=mp.find(addr); + if(it==mp.end()) + { + mp[addr]=new conn_info_t; + //lru.new_key(addr); + } + else + { + //lru.update(addr); + } + return *mp[addr]; +} +int conn_manager_t::erase(unordered_map::iterator erase_it) +{ + delete(erase_it->second); + mp.erase(erase_it->first); return 0; } - conn_info_t *& conn_manager_t::find_p(ip_port_t ip_port) //todo capacity ///done at upper level - //be aware,the adress may change after rehash - { - assert(exist(ip_port)); - u64_t u64=ip_port.to_u64(); - return mp[u64]; - } - conn_info_t & conn_manager_t::find(ip_port_t ip_port) //be aware,the adress may change after rehash - { - assert(exist(ip_port)); - u64_t u64=ip_port.to_u64(); - return *mp[u64]; - } - int conn_manager_t::insert(ip_port_t ip_port) - { - assert(!exist(ip_port)); - int bucket_size_before=mp.bucket_count(); - mp[ip_port.to_u64()]=new conn_info_t; - int bucket_size_after=mp.bucket_count(); - if(bucket_size_after!=bucket_size_before) - clear_it=mp.begin(); - return 0; - } - int conn_manager_t::erase(unordered_map::iterator erase_it) - { - ////////todo close and erase timer_fd ,check fd64 empty ///dont need - - delete(erase_it->second); - mp.erase(erase_it->first); - - - return 0; - } int conn_manager_t::clear_inactive() { if(get_current_time()-last_clear_time>conn_clear_interval) @@ -253,11 +99,12 @@ int conn_manager_t::clear_inactive() } return 0; } + int conn_manager_t::clear_inactive0() { //mylog(log_info,"called\n"); - unordered_map::iterator it; - unordered_map::iterator old_it; + unordered_map::iterator it; + unordered_map::iterator old_it; if(disable_conn_clear) return 0; @@ -282,7 +129,7 @@ int conn_manager_t::clear_inactive0() it=mp.begin(); } - if(it->second->conv_manager.get_size() >0) + if(it->second->conv_manager.s.get_size() >0) { //mylog(log_info,"[%s:%d]size %d \n",my_ntoa(get_u64_h(it->first)),get_u64_l(it->first),(int)it->second->conv_manager.get_size()); it++; @@ -293,7 +140,8 @@ int conn_manager_t::clear_inactive0() } else { - mylog(log_info,"[%s:%d]inactive conn cleared \n",my_ntoa(get_u64_h(it->first)),get_u64_l(it->first)); + address_t tmp_addr=it->first;// avoid making get_str() const; + mylog(log_info,"{%s} inactive conn cleared \n",tmp_addr.get_str()); old_it=it; it++; erase(old_it); @@ -303,4 +151,3 @@ int conn_manager_t::clear_inactive0() clear_it=it; return 0; } - diff --git a/connection.h b/connection.h index 7ce6ed0..421b119 100644 --- a/connection.h +++ b/connection.h @@ -19,39 +19,189 @@ extern int disable_anti_replay; extern int report_interval; +const int disable_conv_clear=0; + +template struct conv_manager_t // manage the udp connections { //typedef hash_map map; - unordered_map u64_to_conv; //conv and u64 are both supposed to be uniq - unordered_map conv_to_u64; - unordered_map conv_last_active_time; + unordered_map data_to_conv; //conv and u64 are both supposed to be uniq + unordered_map conv_to_data; - unordered_map::iterator clear_it; + lru_collector_t lru; + //unordered_map conv_last_active_time; - //void (*clear_function)(uint64_t u64) ; + //unordered_map::iterator clear_it; + + void (*additional_clear_function)(T data) =0; long long last_clear_time; + conv_manager_t() + { + //clear_it=conv_last_active_time.begin(); + long long last_clear_time=0; + additional_clear_function=0; + } + ~conv_manager_t() + { + clear(); + } + int get_size() + { + return conv_to_data.size(); + } + void reserve() + { + data_to_conv.reserve(10007); + conv_to_data.reserve(10007); + //conv_last_active_time.reserve(10007); + + lru.mp.reserve(10007); + } + void clear() + { + if(disable_conv_clear) return ; + + if(additional_clear_function!=0) + { + for(auto it=conv_to_data.begin();it!=conv_to_data.end();it++) + { + //int fd=int((it->second<<32u)>>32u); + additional_clear_function( it->second); + } + } + data_to_conv.clear(); + conv_to_data.clear(); + + lru.clear(); + //conv_last_active_time.clear(); + + //clear_it=conv_last_active_time.begin(); + + } + u32_t get_new_conv() + { + u32_t conv=get_fake_random_number_nz(); + while(conv_to_data.find(conv)!=conv_to_data.end()) + { + conv=get_fake_random_number_nz(); + } + return conv; + } + int is_conv_used(u32_t conv) + { + return conv_to_data.find(conv)!=conv_to_data.end(); + } + int is_data_used(T data) + { + return data_to_conv.find(data)!=data_to_conv.end(); + } + u32_t find_conv_by_data(T data) + { + return data_to_conv[data]; + } + T find_data_by_conv(u32_t conv) + { + return conv_to_data[conv]; + } + int update_active_time(u32_t conv) + { + //return conv_last_active_time[conv]=get_current_time(); + lru.update(conv); + return 0; + } + int insert_conv(u32_t conv,T data) + { + data_to_conv[data]=conv; + conv_to_data[conv]=data; + //conv_last_active_time[conv]=get_current_time(); + lru.new_key(conv); + return 0; + } + int erase_conv(u32_t conv) + { + if(disable_conv_clear) return 0; + T data=conv_to_data[conv]; + if(additional_clear_function!=0) + { + additional_clear_function(data); + } + conv_to_data.erase(conv); + data_to_conv.erase(data); + //conv_last_active_time.erase(conv); + lru.erase(conv); + return 0; + } + int clear_inactive(char * info=0) + { + if(get_current_time()-last_clear_time>conv_clear_interval) + { + last_clear_time=get_current_time(); + return clear_inactive0(info); + } + return 0; + } + int clear_inactive0(char * info) + { + if(disable_conv_clear) return 0; + + + unordered_map::iterator it; + unordered_map::iterator old_it; + + //map::iterator it; + int cnt=0; + //it=clear_it; + int size=lru.size(); + int num_to_clean=size/conv_clear_ratio+conv_clear_min; //clear 1/10 each time,to avoid latency glitch + + num_to_clean=min(num_to_clean,size); + + my_time_t current_time=get_current_time(); + for(;;) + { + if(cnt>=num_to_clean) break; + if(lru.empty()) break; + + u32_t conv; + my_time_t ts=lru.peek_back(conv); + + if(current_time- ts < conv_timeout) break; + + erase_conv(conv); + if(info==0) + { + mylog(log_info,"conv %x cleared\n",conv); + } + else + { + mylog(log_info,"[%s]conv %x cleared\n",info,conv); + } + cnt++; + } + return 0; + } + + + /* conv_manager_t(); - conv_manager_t(const conv_manager_t &b) - { - assert(0==1); - } ~conv_manager_t(); int get_size(); void reserve(); void clear(); u32_t get_new_conv(); int is_conv_used(u32_t conv); - int is_u64_used(u64_t u64); - u32_t find_conv_by_u64(u64_t u64); - u64_t find_u64_by_conv(u32_t conv); + int is_u64_used(T u64); + u32_t find_conv_by_u64(T u64); + T find_u64_by_conv(u32_t conv); int update_active_time(u32_t conv); - int insert_conv(u32_t conv,u64_t u64); + int insert_conv(u32_t conv,T u64); int erase_conv(u32_t conv); int clear_inactive(char * ip_port=0); - int clear_inactive0(char * ip_port); -}; + int clear_inactive0(char * ip_port);*/ +};//g_conv_manager; + struct inner_stat_t @@ -83,7 +233,7 @@ struct stat_t ); } } - void report_as_server(ip_port_t &ip_port) + void report_as_server(address_t &addr) { if(report_interval!=0 &&get_current_time()-last_report_time>u64_t(report_interval)*1000) { @@ -91,7 +241,7 @@ struct stat_t inner_stat_t &a=fec_to_normal; inner_stat_t &b=normal_to_fec; mylog(log_info,"[report][%s]client-->server:(original:%llu pkt;%llu byte) (fec:%llu pkt;%llu byte) server-->client:(original:%llu pkt;%llu byte) (fec:%llu pkt;%llu byte)\n", - ip_port.to_s(), + addr.get_str(), a.output_packet_num,a.output_packet_size,a.input_packet_num,a.input_packet_size, b.input_packet_num,b.input_packet_size,b.output_packet_num,b.output_packet_size ); @@ -103,7 +253,14 @@ struct stat_t struct conn_info_t //stores info for a raw connection.for client ,there is only one connection,for server there can be thousand of connection since server can //handle multiple clients { - conv_manager_t conv_manager; + struct //conv_manager_t is here to avoid copying when a connection is recovered + //TODO maybe an unconstrained union is better, but struct is okay since conv_manger is small when no data is filled in. + { + conv_manager_t c; + conv_manager_t s; + //avoid templates here and there, avoid pointer and type cast + }conv_manager; + fec_encode_manager_t fec_encode_manager; fec_decode_manager_t fec_decode_manager; ev_timer timer; @@ -139,7 +296,7 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o assert(0==1); } }; - +/* struct conn_manager_t //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections { @@ -161,8 +318,30 @@ struct conn_manager_t //manager for connections. for client,we dont need conn_m int clear_inactive(); int clear_inactive0(); +};*/ + +struct conn_manager_t //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections +{ + + + unordered_map mp; //put it at end so that it de-consturcts first + unordered_map::iterator clear_it; + + long long last_clear_time; + + conn_manager_t(); + int exist(address_t addr); + conn_info_t *& find_insert_p(address_t addr); //be aware,the adress may change after rehash //not true? + conn_info_t & find_insert(address_t addr) ; //be aware,the adress may change after rehash + + int erase(unordered_map::iterator erase_it); +int clear_inactive(); +int clear_inactive0(); + }; +void server_clear_function(u64_t u64); + extern conn_manager_t conn_manager; diff --git a/tunnel_client.cpp b/tunnel_client.cpp index c1070bd..eb21d04 100644 --- a/tunnel_client.cpp +++ b/tunnel_client.cpp @@ -60,25 +60,25 @@ void data_from_local_or_fec_timeout(conn_info_t & conn_info,int is_time_out) mylog(log_trace,"Received packet from %s, len: %d\n", addr.get_str(),data_len); - u64_t u64=ip_port.to_u64(); + //u64_t u64=ip_port.to_u64(); - if(!conn_info.conv_manager.is_u64_used(u64)) + if(!conn_info.conv_manager.c.is_data_used(addr)) { - if(conn_info.conv_manager.get_size() >=max_conv_num) + if(conn_info.conv_manager.c.get_size() >=max_conv_num) { mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n"); return; } - conv=conn_info.conv_manager.get_new_conv(); - conn_info.conv_manager.insert_conv(conv,u64); + conv=conn_info.conv_manager.c.get_new_conv(); + conn_info.conv_manager.c.insert_conv(conv,addr); mylog(log_info,"new packet from %s,conv_id=%x\n",addr.get_str(),conv); } else { - conv=conn_info.conv_manager.find_conv_by_u64(u64); + conv=conn_info.conv_manager.c.find_conv_by_data(addr); mylog(log_trace,"conv=%d\n",conv); } - conn_info.conv_manager.update_active_time(conv); + conn_info.conv_manager.c.update_active_time(conv); char * new_data; int new_len; put_conv(conv,data,data_len,new_data,new_len); @@ -160,18 +160,18 @@ static void remote_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) mylog(log_debug,"get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0"); continue; } - if(!conn_info.conv_manager.is_conv_used(conv)) + if(!conn_info.conv_manager.c.is_conv_used(conv)) { mylog(log_trace,"!conn_info.conv_manager.is_conv_used(conv)"); continue; } - conn_info.conv_manager.update_active_time(conv); + conn_info.conv_manager.c.update_active_time(conv); - u64_t u64=conn_info.conv_manager.find_u64_by_conv(conv); + address_t addr=conn_info.conv_manager.c.find_data_by_conv(conv); dest_t dest; dest.inner.fd_addr.fd=conn_info.local_listen_fd; - dest.inner.fd_ip_port.ip_port.from_u64(u64); + dest.inner.fd_addr.addr=addr; dest.type=type_fd_addr; delay_send(out_delay[i],dest,new_data,new_len); @@ -224,7 +224,7 @@ static void conn_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int re conn_info_t & conn_info= *((conn_info_t*)watcher->data); //read(conn_info.timer.get_timer_fd(), &value, 8); - conn_info.conv_manager.clear_inactive(); + conn_info.conv_manager.c.clear_inactive(); mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n"); conn_info.stat.report_as_client(); diff --git a/tunnel_server.cpp b/tunnel_server.cpp index d272c32..98baee3 100644 --- a/tunnel_server.cpp +++ b/tunnel_server.cpp @@ -64,7 +64,7 @@ void data_from_remote_or_fec_timeout_or_conn_timer(conn_info_t & conn_info,fd64_ assert(fd64==0); //uint64_t value; //read(conn_info.timer.get_timer_fd(), &value, 8); - conn_info.conv_manager.clear_inactive(); + conn_info.conv_manager.s.clear_inactive(); if(debug_force_flush_fec) { from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); @@ -82,10 +82,10 @@ void data_from_remote_or_fec_timeout_or_conn_timer(conn_info_t & conn_info,fd64_ } //fd64_t &fd64 =conn_info.remote_fd64; - assert(conn_info.conv_manager.is_u64_used(fd64)); + assert(conn_info.conv_manager.s.is_data_used(fd64)); - conv=conn_info.conv_manager.find_conv_by_u64(fd64); - conn_info.conv_manager.update_active_time(conv); + conv=conn_info.conv_manager.s.find_conv_by_data(fd64); + conn_info.conv_manager.s.update_active_time(conv); conn_info.update_active_time(); int fd=fd_manager.to_fd(fd64); @@ -168,8 +168,8 @@ static void local_listen_cb(struct ev_loop *loop, struct ev_io *watcher, int rev return; } - conn_manager.insert(addr); - conn_info_t &conn_info=conn_manager.find(addr); + //conn_manager.insert(addr); + conn_info_t &conn_info=conn_manager.find_insert(addr); conn_info.addr=addr; conn_info.loop=ev_default_loop(0); conn_info.local_listen_fd=local_listen_fd; @@ -204,7 +204,7 @@ static void local_listen_cb(struct ev_loop *loop, struct ev_io *watcher, int rev mylog(log_info,"new connection from %s\n",addr.get_str()); } - conn_info_t &conn_info=conn_manager.find(addr); + conn_info_t &conn_info=conn_manager.find_insert(addr); conn_info.update_active_time(); int out_n;char **out_arr;int *out_len;my_time_t *out_delay; @@ -223,9 +223,9 @@ static void local_listen_cb(struct ev_loop *loop, struct ev_io *watcher, int rev } - if (!conn_info.conv_manager.is_conv_used(conv)) + if (!conn_info.conv_manager.s.is_conv_used(conv)) { - if(conn_info.conv_manager.get_size() >=max_conv_num) + if(conn_info.conv_manager.s.get_size() >=max_conv_num) { mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n"); continue; @@ -244,7 +244,7 @@ static void local_listen_cb(struct ev_loop *loop, struct ev_io *watcher, int rev //ev.data.u64 = fd64; //ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_udp_fd, &ev); - conn_info.conv_manager.insert_conv(conv, fd64); + conn_info.conv_manager.s.insert_conv(conv, fd64); fd_manager.get_info(fd64).addr=addr; ev_io &io_watcher=fd_manager.get_info(fd64).io_watcher; @@ -258,8 +258,8 @@ static void local_listen_cb(struct ev_loop *loop, struct ev_io *watcher, int rev mylog(log_info,"[%s]new conv %x,fd %d created,fd64=%llu\n",addr.get_str(),conv,new_udp_fd,fd64); } - conn_info.conv_manager.update_active_time(conv); - fd64_t fd64= conn_info.conv_manager.find_u64_by_conv(conv); + conn_info.conv_manager.s.update_active_time(conv); + fd64_t fd64= conn_info.conv_manager.s.find_data_by_conv(conv); dest_t dest; dest.type=type_fd64; dest.inner.fd64=fd64;