From 3790eaf67e840abe0a11e3b343ff59b1f144904c Mon Sep 17 00:00:00 2001 From: wangyu- Date: Tue, 26 Sep 2017 10:51:10 -0500 Subject: [PATCH] changed every send to dest_t style,and it works --- common.cpp | 6 +++ common.h | 6 ++- main.cpp | 130 +++++++++++++++++++++++++++++------------------------ packet.cpp | 68 +++++++++++++++++++++++++--- 4 files changed, 144 insertions(+), 66 deletions(-) diff --git a/common.cpp b/common.cpp index 2322d88..c4d3c6e 100644 --- a/common.cpp +++ b/common.cpp @@ -425,3 +425,9 @@ u64_t ip_port_t::to_u64() { return pack_u64(ip,port); } +char * ip_port_t::to_s() +{ + static char res[40]; + sprintf(res,"%s:%d",my_ntoa(ip),port); + return res; +} diff --git a/common.h b/common.h index aa65d7b..1fce585 100644 --- a/common.h +++ b/common.h @@ -126,7 +126,7 @@ typedef u64_t anti_replay_seq_t; typedef u64_t fd64_t; -enum dest_type{none=0,type_ip_port,type_fd64,type_fd}; +enum dest_type{none=0,type_ip_port,type_fd64,type_ip_port_conv,type_fd64_conv/*,type_fd*/}; struct ip_port_t @@ -135,18 +135,20 @@ struct ip_port_t int port; void from_u64(u64_t u64); u64_t to_u64(); + char * to_s(); }; union inner_t { ip_port_t ip_port; - int fd; + //int fd; fd64_t fd64; }; struct dest_t { dest_type type; inner_t inner; + u32_t conv; }; struct fd_info_t diff --git a/main.cpp b/main.cpp index 76ebdca..2259cd4 100644 --- a/main.cpp +++ b/main.cpp @@ -107,6 +107,7 @@ int client_event_loop() int yes = 1; int epoll_fd; int remote_fd; + fd64_t remote_fd64; conn_info_t conn_info; @@ -130,10 +131,10 @@ int client_event_loop() } assert(new_connected_socket(remote_fd,remote_ip_uint32,remote_port)==0); - + remote_fd64=fd_manager.create(remote_fd); ev.events = EPOLLIN; - ev.data.u64 = remote_fd; + ev.data.u64 = remote_fd64; ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, remote_fd, &ev); if (ret!= 0) { @@ -160,48 +161,7 @@ int client_event_loop() } int idx; for (idx = 0; idx < nfds; ++idx) { - if (events[idx].data.u64 == (u64_t)remote_fd) - { - char data[buf_len]; - int data_len =recv(remote_fd,data,max_data_len,0); - mylog(log_trace, "received data from udp fd %d, len=%d\n", remote_fd,data_len); - if(data_len<0) - { - if(errno==ECONNREFUSED) - { - //conn_manager.clear_list.push_back(udp_fd); - mylog(log_debug, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno)); - } - - mylog(log_warn, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno)); - continue; - } - if(data_len>mtu_warn) - { - mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); - } - u32_t conv; - char *new_data; - int new_len; - get_conv(conv,data,data_len,new_data,new_len); - if(!conn_info.conv_manager.is_conv_used(conv))continue; - u64_t u64=conn_info.conv_manager.find_conv_by_u64(conv); - dest_t dest; - dest.inner.ip_port.from_u64(u64); - dest.type=type_ip_port; - my_send(dest,new_data,new_len); - } - /* - else if(events[idx].data.u64 ==(u64_t)timer_fd) - { - u64_t value; - read(timer_fd, &value, 8); - client_on_timer(conn_info); - - mylog(log_trace,"epoll_trigger_counter: %d \n",epoll_trigger_counter); - epoll_trigger_counter=0; - }*/ - else if (events[idx].data.u64 == (u64_t)local_listen_fd) + if (events[idx].data.u64 == (u64_t)local_listen_fd) { char data[buf_len]; int data_len; @@ -222,7 +182,7 @@ int client_event_loop() ip_port_t ip_port; ip_port.ip=udp_new_addr_in.sin_addr.s_addr; - ip_port.port=udp_new_addr_in.sin_port; + ip_port.port=ntohs(udp_new_addr_in.sin_port); u64_t u64=ip_port.to_u64(); u32_t conv; @@ -243,16 +203,65 @@ int client_event_loop() } conn_info.conv_manager.update_active_time(conv); + + dest_t dest; + dest.type=type_fd64_conv; + dest.inner.fd64=remote_fd64; + dest.conv=conv; + my_send(dest,data,data_len); + } + else if (events[idx].data.u64 == remote_fd64) + { + char data[buf_len]; + if(!fd_manager.exist(remote_fd64)) //fd64 has been closed + { + continue; + } + int fd=fd_manager.to_fd(remote_fd64); + int data_len =recv(fd,data,max_data_len,0); + mylog(log_trace, "received data from udp fd %d, len=%d\n", remote_fd,data_len); + if(data_len<0) + { + if(errno==ECONNREFUSED) + { + //conn_manager.clear_list.push_back(udp_fd); + mylog(log_debug, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno)); + } + + mylog(log_warn, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno)); + continue; + } + if(data_len>mtu_warn) + { + mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); + } + u32_t conv; char *new_data; int new_len; - put_conv(conv,data,data_len,new_data,new_len); + if(get_conv(conv,data,data_len,new_data,new_len)!=0) + continue; + if(!conn_info.conv_manager.is_conv_used(conv))continue; + u64_t u64=conn_info.conv_manager.find_u64_by_conv(conv); dest_t dest; - dest.type=type_fd; - dest.inner.fd=remote_fd; + dest.inner.ip_port.from_u64(u64); + dest.type=type_ip_port; my_send(dest,new_data,new_len); - //send_fd(remote_fd,new_data,new_len,0); - //send_data_safer(conn_info,buf,recv_len,conv); - ///////////////////todo + mylog(log_trace,"[%s] send packet\n",dest.inner.ip_port.to_s()); + } + /* + else if(events[idx].data.u64 ==(u64_t)timer_fd) + { + u64_t value; + read(timer_fd, &value, 8); + client_on_timer(conn_info); + + mylog(log_trace,"epoll_trigger_counter: %d \n",epoll_trigger_counter); + epoll_trigger_counter=0; + }*/ + + else if(events[idx].data.u64>u32_t(-1) ) + { + assert(!fd_manager.exist(events[idx].data.u64));//this fd64 has been closed } else { @@ -346,7 +355,7 @@ int server_event_loop() ip_port_t ip_port; ip_port.ip=udp_new_addr_in.sin_addr.s_addr; - ip_port.port=udp_new_addr_in.sin_port; + ip_port.port=ntohs(udp_new_addr_in.sin_port); if(!conn_manager.exist_ip_port(ip_port)) { conn_info_t &conn_info=conn_manager.find_insert(ip_port); @@ -357,7 +366,8 @@ int server_event_loop() u32_t conv; char *new_data; int new_len; - get_conv(conv,data,data_len,new_data,new_len); + if(get_conv(conv,data,data_len,new_data,new_len)!=0) + continue; /* id_t tmp_conv_id; @@ -391,6 +401,8 @@ int server_event_loop() dest.type=type_fd64; dest.inner.fd64=fd64; my_send(dest,new_data,new_len); + + //int fd = int((u64 << 32u) >> 32u); //////////////////////////////todo @@ -466,19 +478,19 @@ int server_event_loop() continue; } + if(data_len>=mtu_warn) { mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); } - char *new_data; - int new_len; - put_conv(conv,data,data_len,new_data,new_len); dest_t dest; - dest.type=type_ip_port; + dest.type=type_ip_port_conv; + dest.conv=conv; dest.inner.ip_port=ip_port; - my_send(dest,new_data,new_len); - ////////todo send data + my_send(dest,data,data_len); + mylog(log_trace,"[%s] send packet\n",ip_port.to_s()); + } else { diff --git a/packet.cpp b/packet.cpp index e912152..2d886d3 100644 --- a/packet.cpp +++ b/packet.cpp @@ -205,7 +205,6 @@ int sendto_fd_ip_port (int fd,u32_t ip,int port,char * buf, int len,int flags) memset(&tmp_sockaddr,0,sizeof(tmp_sockaddr)); tmp_sockaddr.sin_family = AF_INET; tmp_sockaddr.sin_addr.s_addr = ip; - tmp_sockaddr.sin_port = htons(uint16_t(port)); return sendto(fd, buf, @@ -244,6 +243,14 @@ int my_send(dest_t &dest,char *data,int len) return sendto_ip_port(dest.inner.ip_port.ip,dest.inner.ip_port.port,data,len,0); break; } + case type_ip_port_conv: + { + char *new_data; + int new_len; + put_conv(dest.conv,data,len,new_data,new_len); + return sendto_ip_port(dest.inner.ip_port.ip,dest.inner.ip_port.port,new_data,new_len,0); + break; + } case type_fd64: { if(!fd_manager.exist(dest.inner.fd64)) return -1; @@ -251,17 +258,53 @@ int my_send(dest_t &dest,char *data,int len) return send_fd(fd,data,len,0); break; } + case type_fd64_conv: + { + char *new_data; + int new_len; + put_conv(dest.conv,data,len,new_data,new_len); + + if(!fd_manager.exist(dest.inner.fd64)) return -1; + int fd=fd_manager.to_fd(dest.inner.fd64); + return send_fd(fd,new_data,new_len,0); + } + /* case type_fd: { send_fd(dest.inner.fd,data,len,0); break; - } + }*/ default: assert(0==1); } return 0; } +/* + * this function comes from http://www.hackersdelight.org/hdcodetxt/crc.c.txt + */ +unsigned int crc32h(unsigned char *message,int len) { + int i, crc; + unsigned int byte, c; + const unsigned int g0 = 0xEDB88320, g1 = g0>>1, + g2 = g0>>2, g3 = g0>>3, g4 = g0>>4, g5 = g0>>5, + g6 = (g0>>6)^g0, g7 = ((g0>>6)^g0)>>1; + + i = 0; + crc = 0xFFFFFFFF; + while (i!=len) { // Get next byte. + byte = message[i]; + crc = crc ^ byte; + c = ((crc<<31>>31) & g7) ^ ((crc<<30>>31) & g6) ^ + ((crc<<29>>31) & g5) ^ ((crc<<28>>31) & g4) ^ + ((crc<<27>>31) & g3) ^ ((crc<<26>>31) & g2) ^ + ((crc<<25>>31) & g1) ^ ((crc<<24>>31) & g0); + crc = ((unsigned)crc >> 8) ^ c; + i = i + 1; + } + return ~crc; +} + int put_conv(u32_t conv,char * input,int len_in,char *&output,int &len_out) { static char buf[buf_len]; @@ -269,7 +312,10 @@ int put_conv(u32_t conv,char * input,int len_in,char *&output,int &len_out) u32_t n_conv=htonl(conv); memcpy(output,&n_conv,sizeof(n_conv)); memcpy(output+sizeof(n_conv),input,len_in); - len_out=len_in+(int)(sizeof(n_conv)); + u32_t crc32=crc32h((unsigned char *)output,len_in+sizeof(crc32)); + u32_t crc32_n=htonl(crc32); + len_out=len_in+(int)(sizeof(n_conv))+(int)sizeof(crc32_n); + memcpy(output+len_in+(int)(sizeof(n_conv)),&crc32_n,sizeof(crc32_n)); return 0; } int get_conv(u32_t &conv,char *input,int len_in,char *&output,int &len_out ) @@ -278,7 +324,19 @@ int get_conv(u32_t &conv,char *input,int len_in,char *&output,int &len_out ) memcpy(&n_conv,input,sizeof(n_conv)); conv=ntohl(n_conv); output=input+sizeof(n_conv); - len_out=len_in-(int)(sizeof(n_conv)); - if(len_out<0) return -1; + u32_t crc32_n; + len_out=len_in-(int)sizeof(n_conv)-(int)sizeof(crc32_n); + if(len_out<0) + { + mylog(log_debug,"len_out<0\n"); + return -1; + } + memcpy(&crc32_n,input+len_in-(int)sizeof(crc32_n),sizeof(crc32_n)); + u32_t crc32=ntohl(crc32_n); + if(crc32!=crc32h((unsigned char *)input,len_in-(int)sizeof(crc32_n))) + { + mylog(log_debug,"crc32 check failed\n"); + return -1; + } return 0; }