changed every send to dest_t style,and it works

This commit is contained in:
wangyu- 2017-09-26 10:51:10 -05:00
parent 507b960ba3
commit 3790eaf67e
4 changed files with 144 additions and 66 deletions

View File

@ -425,3 +425,9 @@ u64_t ip_port_t::to_u64()
{ {
return pack_u64(ip,port); return pack_u64(ip,port);
} }
char * ip_port_t::to_s()
{
static char res[40];
sprintf(res,"%s:%d",my_ntoa(ip),port);
return res;
}

View File

@ -126,7 +126,7 @@ typedef u64_t anti_replay_seq_t;
typedef u64_t fd64_t; typedef u64_t fd64_t;
enum dest_type{none=0,type_ip_port,type_fd64,type_fd}; enum dest_type{none=0,type_ip_port,type_fd64,type_ip_port_conv,type_fd64_conv/*,type_fd*/};
struct ip_port_t struct ip_port_t
@ -135,18 +135,20 @@ struct ip_port_t
int port; int port;
void from_u64(u64_t u64); void from_u64(u64_t u64);
u64_t to_u64(); u64_t to_u64();
char * to_s();
}; };
union inner_t union inner_t
{ {
ip_port_t ip_port; ip_port_t ip_port;
int fd; //int fd;
fd64_t fd64; fd64_t fd64;
}; };
struct dest_t struct dest_t
{ {
dest_type type; dest_type type;
inner_t inner; inner_t inner;
u32_t conv;
}; };
struct fd_info_t struct fd_info_t

130
main.cpp
View File

@ -107,6 +107,7 @@ int client_event_loop()
int yes = 1; int yes = 1;
int epoll_fd; int epoll_fd;
int remote_fd; int remote_fd;
fd64_t remote_fd64;
conn_info_t conn_info; conn_info_t conn_info;
@ -130,10 +131,10 @@ int client_event_loop()
} }
assert(new_connected_socket(remote_fd,remote_ip_uint32,remote_port)==0); assert(new_connected_socket(remote_fd,remote_ip_uint32,remote_port)==0);
remote_fd64=fd_manager.create(remote_fd);
ev.events = EPOLLIN; ev.events = EPOLLIN;
ev.data.u64 = remote_fd; ev.data.u64 = remote_fd64;
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, remote_fd, &ev); ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, remote_fd, &ev);
if (ret!= 0) { if (ret!= 0) {
@ -160,48 +161,7 @@ int client_event_loop()
} }
int idx; int idx;
for (idx = 0; idx < nfds; ++idx) { for (idx = 0; idx < nfds; ++idx) {
if (events[idx].data.u64 == (u64_t)remote_fd) if (events[idx].data.u64 == (u64_t)local_listen_fd)
{
char data[buf_len];
int data_len =recv(remote_fd,data,max_data_len,0);
mylog(log_trace, "received data from udp fd %d, len=%d\n", remote_fd,data_len);
if(data_len<0)
{
if(errno==ECONNREFUSED)
{
//conn_manager.clear_list.push_back(udp_fd);
mylog(log_debug, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
}
mylog(log_warn, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
continue;
}
if(data_len>mtu_warn)
{
mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
}
u32_t conv;
char *new_data;
int new_len;
get_conv(conv,data,data_len,new_data,new_len);
if(!conn_info.conv_manager.is_conv_used(conv))continue;
u64_t u64=conn_info.conv_manager.find_conv_by_u64(conv);
dest_t dest;
dest.inner.ip_port.from_u64(u64);
dest.type=type_ip_port;
my_send(dest,new_data,new_len);
}
/*
else if(events[idx].data.u64 ==(u64_t)timer_fd)
{
u64_t value;
read(timer_fd, &value, 8);
client_on_timer(conn_info);
mylog(log_trace,"epoll_trigger_counter: %d \n",epoll_trigger_counter);
epoll_trigger_counter=0;
}*/
else if (events[idx].data.u64 == (u64_t)local_listen_fd)
{ {
char data[buf_len]; char data[buf_len];
int data_len; int data_len;
@ -222,7 +182,7 @@ int client_event_loop()
ip_port_t ip_port; ip_port_t ip_port;
ip_port.ip=udp_new_addr_in.sin_addr.s_addr; ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
ip_port.port=udp_new_addr_in.sin_port; ip_port.port=ntohs(udp_new_addr_in.sin_port);
u64_t u64=ip_port.to_u64(); u64_t u64=ip_port.to_u64();
u32_t conv; u32_t conv;
@ -243,16 +203,65 @@ int client_event_loop()
} }
conn_info.conv_manager.update_active_time(conv); conn_info.conv_manager.update_active_time(conv);
dest_t dest;
dest.type=type_fd64_conv;
dest.inner.fd64=remote_fd64;
dest.conv=conv;
my_send(dest,data,data_len);
}
else if (events[idx].data.u64 == remote_fd64)
{
char data[buf_len];
if(!fd_manager.exist(remote_fd64)) //fd64 has been closed
{
continue;
}
int fd=fd_manager.to_fd(remote_fd64);
int data_len =recv(fd,data,max_data_len,0);
mylog(log_trace, "received data from udp fd %d, len=%d\n", remote_fd,data_len);
if(data_len<0)
{
if(errno==ECONNREFUSED)
{
//conn_manager.clear_list.push_back(udp_fd);
mylog(log_debug, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
}
mylog(log_warn, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno));
continue;
}
if(data_len>mtu_warn)
{
mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
}
u32_t conv;
char *new_data; char *new_data;
int new_len; int new_len;
put_conv(conv,data,data_len,new_data,new_len); if(get_conv(conv,data,data_len,new_data,new_len)!=0)
continue;
if(!conn_info.conv_manager.is_conv_used(conv))continue;
u64_t u64=conn_info.conv_manager.find_u64_by_conv(conv);
dest_t dest; dest_t dest;
dest.type=type_fd; dest.inner.ip_port.from_u64(u64);
dest.inner.fd=remote_fd; dest.type=type_ip_port;
my_send(dest,new_data,new_len); my_send(dest,new_data,new_len);
//send_fd(remote_fd,new_data,new_len,0); mylog(log_trace,"[%s] send packet\n",dest.inner.ip_port.to_s());
//send_data_safer(conn_info,buf,recv_len,conv); }
///////////////////todo /*
else if(events[idx].data.u64 ==(u64_t)timer_fd)
{
u64_t value;
read(timer_fd, &value, 8);
client_on_timer(conn_info);
mylog(log_trace,"epoll_trigger_counter: %d \n",epoll_trigger_counter);
epoll_trigger_counter=0;
}*/
else if(events[idx].data.u64>u32_t(-1) )
{
assert(!fd_manager.exist(events[idx].data.u64));//this fd64 has been closed
} }
else else
{ {
@ -346,7 +355,7 @@ int server_event_loop()
ip_port_t ip_port; ip_port_t ip_port;
ip_port.ip=udp_new_addr_in.sin_addr.s_addr; ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
ip_port.port=udp_new_addr_in.sin_port; ip_port.port=ntohs(udp_new_addr_in.sin_port);
if(!conn_manager.exist_ip_port(ip_port)) if(!conn_manager.exist_ip_port(ip_port))
{ {
conn_info_t &conn_info=conn_manager.find_insert(ip_port); conn_info_t &conn_info=conn_manager.find_insert(ip_port);
@ -357,7 +366,8 @@ int server_event_loop()
u32_t conv; u32_t conv;
char *new_data; char *new_data;
int new_len; int new_len;
get_conv(conv,data,data_len,new_data,new_len); if(get_conv(conv,data,data_len,new_data,new_len)!=0)
continue;
/* /*
id_t tmp_conv_id; id_t tmp_conv_id;
@ -391,6 +401,8 @@ int server_event_loop()
dest.type=type_fd64; dest.type=type_fd64;
dest.inner.fd64=fd64; dest.inner.fd64=fd64;
my_send(dest,new_data,new_len); my_send(dest,new_data,new_len);
//int fd = int((u64 << 32u) >> 32u); //int fd = int((u64 << 32u) >> 32u);
//////////////////////////////todo //////////////////////////////todo
@ -466,19 +478,19 @@ int server_event_loop()
continue; continue;
} }
if(data_len>=mtu_warn) if(data_len>=mtu_warn)
{ {
mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
} }
char *new_data;
int new_len;
put_conv(conv,data,data_len,new_data,new_len);
dest_t dest; dest_t dest;
dest.type=type_ip_port; dest.type=type_ip_port_conv;
dest.conv=conv;
dest.inner.ip_port=ip_port; dest.inner.ip_port=ip_port;
my_send(dest,new_data,new_len); my_send(dest,data,data_len);
////////todo send data mylog(log_trace,"[%s] send packet\n",ip_port.to_s());
} }
else else
{ {

View File

@ -205,7 +205,6 @@ int sendto_fd_ip_port (int fd,u32_t ip,int port,char * buf, int len,int flags)
memset(&tmp_sockaddr,0,sizeof(tmp_sockaddr)); memset(&tmp_sockaddr,0,sizeof(tmp_sockaddr));
tmp_sockaddr.sin_family = AF_INET; tmp_sockaddr.sin_family = AF_INET;
tmp_sockaddr.sin_addr.s_addr = ip; tmp_sockaddr.sin_addr.s_addr = ip;
tmp_sockaddr.sin_port = htons(uint16_t(port)); tmp_sockaddr.sin_port = htons(uint16_t(port));
return sendto(fd, buf, return sendto(fd, buf,
@ -244,6 +243,14 @@ int my_send(dest_t &dest,char *data,int len)
return sendto_ip_port(dest.inner.ip_port.ip,dest.inner.ip_port.port,data,len,0); return sendto_ip_port(dest.inner.ip_port.ip,dest.inner.ip_port.port,data,len,0);
break; break;
} }
case type_ip_port_conv:
{
char *new_data;
int new_len;
put_conv(dest.conv,data,len,new_data,new_len);
return sendto_ip_port(dest.inner.ip_port.ip,dest.inner.ip_port.port,new_data,new_len,0);
break;
}
case type_fd64: case type_fd64:
{ {
if(!fd_manager.exist(dest.inner.fd64)) return -1; if(!fd_manager.exist(dest.inner.fd64)) return -1;
@ -251,17 +258,53 @@ int my_send(dest_t &dest,char *data,int len)
return send_fd(fd,data,len,0); return send_fd(fd,data,len,0);
break; break;
} }
case type_fd64_conv:
{
char *new_data;
int new_len;
put_conv(dest.conv,data,len,new_data,new_len);
if(!fd_manager.exist(dest.inner.fd64)) return -1;
int fd=fd_manager.to_fd(dest.inner.fd64);
return send_fd(fd,new_data,new_len,0);
}
/*
case type_fd: case type_fd:
{ {
send_fd(dest.inner.fd,data,len,0); send_fd(dest.inner.fd,data,len,0);
break; break;
} }*/
default: default:
assert(0==1); assert(0==1);
} }
return 0; return 0;
} }
/*
* this function comes from http://www.hackersdelight.org/hdcodetxt/crc.c.txt
*/
unsigned int crc32h(unsigned char *message,int len) {
int i, crc;
unsigned int byte, c;
const unsigned int g0 = 0xEDB88320, g1 = g0>>1,
g2 = g0>>2, g3 = g0>>3, g4 = g0>>4, g5 = g0>>5,
g6 = (g0>>6)^g0, g7 = ((g0>>6)^g0)>>1;
i = 0;
crc = 0xFFFFFFFF;
while (i!=len) { // Get next byte.
byte = message[i];
crc = crc ^ byte;
c = ((crc<<31>>31) & g7) ^ ((crc<<30>>31) & g6) ^
((crc<<29>>31) & g5) ^ ((crc<<28>>31) & g4) ^
((crc<<27>>31) & g3) ^ ((crc<<26>>31) & g2) ^
((crc<<25>>31) & g1) ^ ((crc<<24>>31) & g0);
crc = ((unsigned)crc >> 8) ^ c;
i = i + 1;
}
return ~crc;
}
int put_conv(u32_t conv,char * input,int len_in,char *&output,int &len_out) int put_conv(u32_t conv,char * input,int len_in,char *&output,int &len_out)
{ {
static char buf[buf_len]; static char buf[buf_len];
@ -269,7 +312,10 @@ int put_conv(u32_t conv,char * input,int len_in,char *&output,int &len_out)
u32_t n_conv=htonl(conv); u32_t n_conv=htonl(conv);
memcpy(output,&n_conv,sizeof(n_conv)); memcpy(output,&n_conv,sizeof(n_conv));
memcpy(output+sizeof(n_conv),input,len_in); memcpy(output+sizeof(n_conv),input,len_in);
len_out=len_in+(int)(sizeof(n_conv)); u32_t crc32=crc32h((unsigned char *)output,len_in+sizeof(crc32));
u32_t crc32_n=htonl(crc32);
len_out=len_in+(int)(sizeof(n_conv))+(int)sizeof(crc32_n);
memcpy(output+len_in+(int)(sizeof(n_conv)),&crc32_n,sizeof(crc32_n));
return 0; return 0;
} }
int get_conv(u32_t &conv,char *input,int len_in,char *&output,int &len_out ) int get_conv(u32_t &conv,char *input,int len_in,char *&output,int &len_out )
@ -278,7 +324,19 @@ int get_conv(u32_t &conv,char *input,int len_in,char *&output,int &len_out )
memcpy(&n_conv,input,sizeof(n_conv)); memcpy(&n_conv,input,sizeof(n_conv));
conv=ntohl(n_conv); conv=ntohl(n_conv);
output=input+sizeof(n_conv); output=input+sizeof(n_conv);
len_out=len_in-(int)(sizeof(n_conv)); u32_t crc32_n;
if(len_out<0) return -1; len_out=len_in-(int)sizeof(n_conv)-(int)sizeof(crc32_n);
if(len_out<0)
{
mylog(log_debug,"len_out<0\n");
return -1;
}
memcpy(&crc32_n,input+len_in-(int)sizeof(crc32_n),sizeof(crc32_n));
u32_t crc32=ntohl(crc32_n);
if(crc32!=crc32h((unsigned char *)input,len_in-(int)sizeof(crc32_n)))
{
mylog(log_debug,"crc32 check failed\n");
return -1;
}
return 0; return 0;
} }