diff --git a/main.cpp b/main.cpp index b7bf51d..51ea54c 100644 --- a/main.cpp +++ b/main.cpp @@ -1,28 +1,7 @@ -#include -#include -#include -#include -#include -#include -#include -#include - -#include -//#include"aes.h" - -#include -#include - -#include -#include -#include -using namespace std; - -#include -#include - -#include +#include "common.h" +#include "log.h" +using namespace std; typedef unsigned long long u64_t; //this works on most platform,avoid using the PRId64 typedef long long i64_t; @@ -30,11 +9,11 @@ typedef long long i64_t; typedef unsigned int u32_t; typedef int i32_t; -const u32_t anti_replay_window_size=1000; +//const u32_t anti_replay_window_size=1000; typedef u64_t anti_replay_seq_t; int disable_anti_replay=0; int dup_num=3; -int dup_delay=5000; //1000 = 1ms +int dup_delay=900; //ms int iv_min=2; int iv_max=30;//< 256; int random_number_fd=-1; @@ -42,72 +21,18 @@ int random_number_fd=-1; int remote_fd=-1; int local_fd=-1; int is_client = 0, is_server = 0; +int local_listen_fd=-1; +int disable_conv_clear=0; + +u32_t remote_address_uint32=0; + +char local_address[100], remote_address[100]; +int local_port = -1, remote_port = -1; +int multi_process_mode=0; int VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV; -void setnonblocking(int sock) { - int opts; - opts = fcntl(sock, F_GETFL); - if (opts < 0) { - perror("fcntl(sock,GETFL)"); - exit(1); - } - - opts = opts | O_NONBLOCK; - if (fcntl(sock, F_SETFL, opts) < 0) { - perror("fcntl(sock,SETFL,opts)"); - exit(1); - } -} -void init_random_number_fd() -{ - - random_number_fd=open("/dev/urandom",O_RDONLY); - - if(random_number_fd==-1) - { - printf("error open /dev/urandom\n"); - } - setnonblocking(random_number_fd); -} -void get_true_random_chars(char * s,int len) -{ - int size=read(random_number_fd,s,len); - if(size!=len) - { - printf("get random number failed\n"); - exit(-1); - } -} -u32_t get_true_random_number() -{ - u32_t ret; - int size=read(random_number_fd,&ret,sizeof(ret)); - if(size!=sizeof(ret)) - { - printf("get random number failed %d\n",size); - exit(-1); - } - return ret; -} -u64_t ntoh64(u64_t a) -{ - if(__BYTE_ORDER == __LITTLE_ENDIAN) - { - return __bswap_64( a); - } - else return a; - -} -u64_t hton64(u64_t a) -{ - if(__BYTE_ORDER == __LITTLE_ENDIAN) - { - return __bswap_64( a); - } - else return a; -} struct anti_replay_t { u64_t max_packet_received; @@ -169,8 +94,157 @@ struct anti_replay_t return 0; //for complier check } }anti_replay; +struct conn_manager_t //TODO change map to unordered map +{ + //typedef hash_map map; + unordered_map u64_to_fd; //conv and u64 are both supposed to be uniq + unordered_map fd_to_u64; -struct my_time:timespec + unordered_map fd_last_active_time; + + unordered_map::iterator clear_it; + + unordered_map::iterator it; + unordered_map::iterator old_it; + + //void (*clear_function)(uint64_t u64) ; + + long long last_clear_time; + + conn_manager_t() + { + clear_it=fd_last_active_time.begin(); + long long last_clear_time=0; + //clear_function=0; + } + ~conn_manager_t() + { + clear(); + } + int get_size() + { + return fd_to_u64.size(); + } + void reserve() + { + u64_to_fd.reserve(10007); + fd_to_u64.reserve(10007); + fd_last_active_time.reserve(10007); + } + void clear() + { + if(disable_conv_clear) return ; + + for(it=fd_to_u64.begin();it!=fd_to_u64.end();it++) + { + //int fd=int((it->second<<32u)>>32u); + close( it->first); + } + u64_to_fd.clear(); + fd_to_u64.clear(); + fd_last_active_time.clear(); + + clear_it=fd_last_active_time.begin(); + + } + int exist_fd(u32_t fd) + { + return fd_to_u64.find(fd)!=fd_to_u64.end(); + } + int exist_u64(u64_t u64) + { + return u64_to_fd.find(u64)!=u64_to_fd.end(); + } + u32_t find_fd_by_u64(u64_t u64) + { + return u64_to_fd[u64]; + } + u64_t find_u64_by_fd(u32_t fd) + { + return fd_to_u64[fd]; + } + int update_active_time(u32_t fd) + { + return fd_last_active_time[fd]=get_current_time(); + } + int insert_fd(u32_t fd,u64_t u64) + { + u64_to_fd[u64]=fd; + fd_to_u64[fd]=u64; + fd_last_active_time[fd]=get_current_time(); + return 0; + } + int erase_fd(u32_t fd) + { + if(disable_conv_clear) return 0; + u64_t u64=fd_to_u64[fd]; + + close(fd); + + fd_to_u64.erase(fd); + u64_to_fd.erase(u64); + fd_last_active_time.erase(fd); + return 0; + } + int clear_inactive(char * ip_port=0) + { + if(get_current_time()-last_clear_time>conv_clear_interval) + { + last_clear_time=get_current_time(); + return clear_inactive0(ip_port); + } + return 0; + } + int clear_inactive0(char * ip_port) + { + if(disable_conv_clear) return 0; + + + //map::iterator it; + int cnt=0; + it=clear_it; + int size=fd_last_active_time.size(); + int num_to_clean=size/conv_clear_ratio+conv_clear_min; //clear 1/10 each time,to avoid latency glitch + + u64_t current_time=get_current_time(); + for(;;) + { + if(cnt>=num_to_clean) break; + if(fd_last_active_time.begin()==fd_last_active_time.end()) break; + + if(it==fd_last_active_time.end()) + { + it=fd_last_active_time.begin(); + } + + if( current_time -it->second >conv_timeout ) + { + //mylog(log_info,"inactive conv %u cleared \n",it->first); + old_it=it; + it++; + u32_t fd= old_it->first; + erase_fd(old_it->first); + if(ip_port==0) + { + mylog(log_info,"fd %x cleared\n",fd); + } + else + { + mylog(log_info,"[%s]fd %x cleared\n",ip_port,fd); + } + } + else + { + it++; + } + cnt++; + } + return 0; + } +}conn_manager; + +typedef u64_t my_time; +/*struct my_time:timespec { bool operator <(const my_time& other)const { @@ -183,7 +257,7 @@ struct my_time:timespec if(tv_sec==other.tv_sec&&tv_nsec==other.tv_nsec) return true; return false; } -}; +};*/ struct delay_data { int fd; @@ -191,20 +265,17 @@ struct delay_data char * data; int len; }; -int timer_fd; +int delay_timer_fd; multimap delay_mp; my_time time_after_delay(my_time time) { - time.tv_nsec+=dup_delay*1000ll; //8ms - if(time.tv_nsec>=1000*1000*1000ll ) - { - time.tv_nsec-=1000*1000*1000ll; - time.tv_sec+=1; - } + time+=dup_delay*1000; return time; } + + int add_to_delay_mp(int fd,int times_left,char * buf,int len) { delay_data tmp; @@ -213,8 +284,8 @@ int add_to_delay_mp(int fd,int times_left,char * buf,int len) tmp.times_left = times_left; tmp.len = len; - my_time tmp_time; - clock_gettime(CLOCK_MONOTONIC, &tmp_time); + my_time tmp_time=get_current_time_us(); + //clock_gettime(CLOCK_MONOTONIC, &tmp_time); tmp_time=time_after_delay(tmp_time); delay_mp.insert(make_pair(tmp_time,tmp)); return 0; @@ -227,13 +298,9 @@ int add_and_new(int fd,int times_left,char * buf,int len) return 0; } -char local_address[100], remote_address[100]; -int local_port = -1, remote_port = -1; -//char keya[100], keyb[100]; -//int dup_a = 1, dup_b = 1; -//char iv[100]; +multimap new_delay_mp; + -const int buf_len = 20480; void handler(int num) { int status; @@ -352,31 +419,79 @@ void check_delay_map() int ret; it=delay_mp.begin(); if(it==delay_mp.end()) break; - ret=clock_gettime(CLOCK_MONOTONIC, ¤t_time); - if(ret!=0) - { - printf("unknown error\n"); - exit(1); - } + + current_time=get_current_time_us(); if(it->first < current_time||it->first ==current_time) { //send packet printf("<%d>",it->second.len); - if( (is_client &&it->second.fd==remote_fd ) || (is_server &&it->second.fd==local_fd ) ) + if(multi_process_mode) { - char new_data[buf_len];int new_len; - do_obscure(it->second.data,it->second.len,new_data,new_len); - ret = send(it->second.fd, new_data, new_len, 0); + if ((is_client && it->second.fd == remote_fd) + || (is_server && it->second.fd == local_fd)) { + char new_data[buf_len]; + int new_len; + do_obscure(it->second.data, it->second.len, new_data, + new_len); + ret = send(it->second.fd, new_data, new_len, 0); + } else { + ret = send(it->second.fd, it->second.data, + it->second.len, 0); + } + + if (ret < 0) { + printf("send return %d at @300", ret); + exit(1); + } } else { - ret = send(it->second.fd, it->second.data, it->second.len, 0); + if(is_client) + { + char new_data[buf_len]; + int new_len; + do_obscure(it->second.data, it->second.len, new_data, + new_len); + ret = send(it->second.fd, new_data, new_len, 0); + } + else + { + + if(conn_manager.exist_fd(it->second.fd)) + { + u64_t u64=conn_manager.find_u64_by_fd(it->second.fd); + + sockaddr_in tmp_sockaddr; + + memset(&tmp_sockaddr,0,sizeof(tmp_sockaddr)); + tmp_sockaddr.sin_family = AF_INET; + tmp_sockaddr.sin_addr.s_addr = (u64 >> 32u); + + tmp_sockaddr.sin_port = htons(uint16_t((u64 << 32u) >> 32u)); + + + char new_data[buf_len]; + int new_len; + do_obscure(it->second.data, it->second.len, new_data, + new_len); + + ret = sendto(local_listen_fd, new_data, + new_len , 0, + (struct sockaddr *) &tmp_sockaddr, + sizeof(tmp_sockaddr)); + //ret = send(it->second.fd, it->second.data, + // it->second.len, 0); + } + else + { + it->second.times_left=0; + } + } + if (ret < 0) { + printf("send return %d at @300", ret); + } } - if (ret < 0) { - printf("send return %d at @300", ret); - exit(1); - } if(it->second.times_left>1) { //delay_mp.insert(pair(current_time)); @@ -398,40 +513,346 @@ void check_delay_map() { itimerspec its; memset(&its.it_interval,0,sizeof(its.it_interval)); - its.it_value=delay_mp.begin()->first; - timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0); + its.it_value.tv_sec=delay_mp.begin()->first/1000000llu; + its.it_value.tv_nsec=(delay_mp.begin()->first%1000000llu)*1000llu; + timerfd_settime(delay_timer_fd,TFD_TIMER_ABSTIME,&its,0); } } //printf("end"); } -int set_buf_size(int fd) +int create_new_udp(int &new_udp_fd) { - int socket_buf_size=1024*1024; - if(setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &socket_buf_size, sizeof(socket_buf_size))<0) - //if(setsockopt(fd, SOL_SOCKET, SO_SNDBUFFORCE, &socket_buf_size, sizeof(socket_buf_size))<0) - { - printf("set SO_SNDBUF fail\n"); - exit(1); - } - //if(setsockopt(fd, SOL_SOCKET, SO_RCVBUFFORCE, &socket_buf_size, sizeof(socket_buf_size))<0) - if(setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &socket_buf_size, sizeof(socket_buf_size))<0) - { - printf("set SO_RCVBUF fail\n"); - exit(1); - } + struct sockaddr_in remote_addr_in; + + socklen_t slen = sizeof(sockaddr_in); + memset(&remote_addr_in, 0, sizeof(remote_addr_in)); + remote_addr_in.sin_family = AF_INET; + remote_addr_in.sin_port = htons(remote_port); + remote_addr_in.sin_addr.s_addr = remote_address_uint32; + + new_udp_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (new_udp_fd < 0) { + mylog(log_warn, "create udp_fd error\n"); + return -1; + } + setnonblocking(new_udp_fd); + set_buf_size(new_udp_fd); + + mylog(log_debug, "created new udp_fd %d\n", new_udp_fd); + int ret = connect(new_udp_fd, (struct sockaddr *) &remote_addr_in, slen); + if (ret != 0) { + mylog(log_warn, "udp fd connect fail\n"); + close(new_udp_fd); + return -1; + } return 0; } -int main(int argc, char *argv[]) +int set_timer(int epollfd,int &timer_fd) { - dup2(1, 2); //redirect stderr to stdout + int ret; + epoll_event ev; + + itimerspec its; + memset(&its,0,sizeof(its)); + + if((timer_fd=timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK)) < 0) + { + mylog(log_fatal,"timer_fd create error\n"); + myexit(1); + } + its.it_interval.tv_sec=(timer_interval/1000); + its.it_interval.tv_nsec=(timer_interval%1000)*1000ll*1000ll; + its.it_value.tv_nsec=1; //imidiately + timerfd_settime(timer_fd,0,&its,0); - init_random_number_fd(); + ev.events = EPOLLIN; + ev.data.u64 = timer_fd; + ret=epoll_ctl(epollfd, EPOLL_CTL_ADD, timer_fd, &ev); + if (ret < 0) { + mylog(log_fatal,"epoll_ctl return %d\n", ret); + myexit(-1); + } + return 0; +} +int event_loop() +{ + struct sockaddr_in local_me, local_other; + local_listen_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + int yes = 1; + //setsockopt(local_listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + set_buf_size(local_listen_fd); + setnonblocking(local_listen_fd); + + //char data[buf_len]; + //char *data=data0; + socklen_t slen = sizeof(sockaddr_in); + memset(&local_me, 0, sizeof(local_me)); + local_me.sin_family = AF_INET; + local_me.sin_port = htons(local_port); + local_me.sin_addr.s_addr = inet_addr(local_address); + if (bind(local_listen_fd, (struct sockaddr*) &local_me, slen) == -1) + { + mylog(log_fatal,"socket bind error"); + exit(1); + } + + int epollfd = epoll_create1(0); + const int max_events = 4096; + struct epoll_event ev, events[max_events]; + if (epollfd < 0) + { + mylog(log_fatal,"epoll created return %d\n", epollfd); + exit(-1); + } + ev.events = EPOLLIN; + ev.data.fd = local_listen_fd; + int ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, local_listen_fd, &ev); + + if(ret!=0) + { + mylog(log_fatal,"epoll created return %d\n", epollfd); + exit(-1); + } + int clear_timer_fd=-1; + set_timer(epollfd,clear_timer_fd); + + + + if ((delay_timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0) + { + printf("timer_fd create error"); + exit(1); + } + ev.events = EPOLLIN; + ev.data.fd = delay_timer_fd; + + itimerspec zero_its; + memset(&zero_its, 0, sizeof(zero_its)); + + timerfd_settime(delay_timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0); + epoll_ctl(epollfd, EPOLL_CTL_ADD, delay_timer_fd, &ev); + if (ret < 0) + { + printf("epoll_ctl return %d\n", ret); + exit(-1); + } + + for (;;) + { + int nfds = epoll_wait(epollfd, events, max_events, 180 * 1000); //3mins + if (nfds < 0) + { + mylog(log_fatal,"epoll_wait return %d\n", nfds); + exit(-1); + } + int n; + for (n = 0; n < nfds; ++n) + { + if (events[n].data.fd == local_listen_fd) //data income from local end + { + char data[buf_len]; + int data_len; + if ((data_len = recvfrom(local_listen_fd, data, buf_len, 0, + (struct sockaddr *) &local_other, &slen)) == -1) //<--first packet from a new ip:port turple + { + printf("recv_from error"); + exit(1); + } + data[data_len] = 0; //for easier debug + u64_t u64=pack_u64(local_other.sin_addr.s_addr,ntohs(local_other.sin_port)); + + if(!conn_manager.exist_u64(u64)) + { + int new_udp_fd; + if(create_new_udp(new_udp_fd)!=0) + { + continue; + } + struct epoll_event ev; + + mylog(log_trace, "u64: %lld\n", u64); + ev.events = EPOLLIN; + + ev.data.fd = new_udp_fd; + + ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, new_udp_fd, &ev); + if (ret != 0) { + mylog(log_warn, "add udp_fd error \n"); + //perror("why?"); + close(new_udp_fd); + continue; + } + mylog(log_info,"created new udp\n"); + conn_manager.insert_fd(new_udp_fd,u64); + } + + + int new_udp_fd=conn_manager.find_fd_by_u64(u64); + conn_manager.update_active_time(new_udp_fd); + int ret; + if(is_client) + { + add_seq(data,data_len); + char new_data[buf_len]; + int new_len; + do_obscure(data, data_len, new_data, new_len); + ret = send(new_udp_fd, new_data,new_len, 0); + + if(dup_num>1) + { + add_and_new(new_udp_fd, dup_num - 1, data, data_len); + } + } + else + { + char new_data[buf_len]; + int new_len; + if (de_obscure(data, data_len, new_data, new_len) != 0) { + printf("error at line %d ,data_len=%d \n", __LINE__,data_len); + continue; + } + + if (remove_seq(new_data, new_len) != 0) { + printf("error at line %d\n", __LINE__); + continue; + } + + ret = send(new_udp_fd, new_data,new_len, 0); + } + + if (ret < 0) { + mylog(log_warn, "send returned %d\n", ret); + //perror("what happened????"); + } + } + else if(events[n].data.fd == clear_timer_fd) + { + u64_t value; + read(clear_timer_fd, &value, 8); + mylog(log_debug, "timer!\n"); + conn_manager.clear_inactive(); + } + else if (events[n].data.fd == delay_timer_fd) + { + uint64_t value; + read(delay_timer_fd, &value, 8); + //printf("",delay_mp.size()); + //fflush(stdout); + } + else + { + int udp_fd=events[n].data.fd; + if(!conn_manager.exist_fd(udp_fd)) continue; + + char data[buf_len]; + int data_len =recv(udp_fd,data,buf_len,0); + if(data_len<0) + { + mylog(log_warn, "recv failed %d\n", data_len); + continue; + } + assert(conn_manager.exist_fd(udp_fd)); + + conn_manager.update_active_time(udp_fd); + + u64_t u64=conn_manager.find_u64_by_fd(udp_fd); + + sockaddr_in tmp_sockaddr; + + memset(&tmp_sockaddr,0,sizeof(tmp_sockaddr)); + tmp_sockaddr.sin_family = AF_INET; + tmp_sockaddr.sin_addr.s_addr = (u64 >> 32u); + + tmp_sockaddr.sin_port = htons(uint16_t((u64 << 32u) >> 32u)); + + if(is_client) + { + char new_data[buf_len]; + int new_len; + if (de_obscure(data, data_len, new_data, new_len) != 0) { + printf("error at line %d ,data_len=%d \n", __LINE__,data_len); + continue; + } + + if (remove_seq(new_data, new_len) != 0) { + printf("error at line %d\n", __LINE__); + continue; + } + + ret = sendto(local_listen_fd, new_data, + new_len , 0, + (struct sockaddr *) &tmp_sockaddr, + sizeof(tmp_sockaddr)); + } + else + { + add_seq(data,data_len); + char new_data[buf_len]; + int new_len; + do_obscure(data, data_len, new_data, new_len); + + if(dup_num>1) + { + add_and_new(udp_fd, dup_num - 1, data, data_len); + } + + ret = sendto(local_listen_fd, new_data, + new_len , 0, + (struct sockaddr *) &tmp_sockaddr, + sizeof(tmp_sockaddr)); + } + if (ret < 0) { + mylog(log_warn, "sento returned %d\n", ret); + //perror("ret<0"); + } + mylog(log_trace, "%s :%d\n", inet_ntoa(tmp_sockaddr.sin_addr), + ntohs(tmp_sockaddr.sin_port)); + mylog(log_trace, "%d byte sent\n", ret); + + } + } + check_delay_map(); + } + exit(0); + return 0; +} +void process_arg(int argc, char *argv[]) +{ int i, j, k; int opt; - signal(SIGCHLD, handler); + static struct option long_options[] = + { + {"log-level", required_argument, 0, 1}, + {"log-position", no_argument, 0, 1}, + {"disable-color", no_argument, 0, 1}, + {NULL, 0, 0, 0} + }; + int option_index = 0; + for (i = 0; i < argc; i++) + { + if(strcmp(argv[i],"--log-level")==0) + { + if(i", optopt); } @@ -527,9 +964,11 @@ int main(int argc, char *argv[]) printf("-s -c cant be both set\n"); exit(-1); } - +} +int multi_process() +{ struct sockaddr_in local_me, local_other; - int local_listen_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + local_listen_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); int yes = 1; setsockopt(local_listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); set_buf_size(local_listen_fd); @@ -670,19 +1109,19 @@ int main(int argc, char *argv[]) exit(-1); } - if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0) + if ((delay_timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0) { printf("timer_fd create error"); exit(1); } ev.events = EPOLLIN; - ev.data.fd = timer_fd; + ev.data.fd = delay_timer_fd; itimerspec zero_its; memset(&zero_its, 0, sizeof(zero_its)); - timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0); - epoll_ctl(epollfd, EPOLL_CTL_ADD, timer_fd, &ev); + timerfd_settime(delay_timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0); + epoll_ctl(epollfd, EPOLL_CTL_ADD, delay_timer_fd, &ev); if (ret < 0) { printf("epoll_ctl return %d\n", ret); @@ -788,10 +1227,10 @@ int main(int argc, char *argv[]) exit(1); } } - else if (events[n].data.fd == timer_fd) + else if (events[n].data.fd == delay_timer_fd) { uint64_t value; - read(timer_fd, &value, 8); + read(delay_timer_fd, &value, 8); //printf("",delay_mp.size()); //fflush(stdout); } @@ -806,5 +1245,40 @@ int main(int argc, char *argv[]) } } //while(1)end +} +int main(int argc, char *argv[]) +{ + //printf("%lld\n",get_current_time_us()); + + //printf("%lld\n",get_current_time_us()); + + //printf("%lld\n",get_current_time_us()); + + //printf("%lld\n",get_current_time()); + dup2(1, 2); //redirect stderr to stdout + int i, j, k; + process_arg(argc,argv); + + + init_random_number_fd(); + + signal(SIGCHLD, handler); + + mylog(log_info,"test\n"); + + remote_address_uint32=inet_addr(remote_address); + + + if(!multi_process_mode) + { + event_loop(); + } + else + { + multi_process(); + } + + return 0; } + diff --git a/makefile b/makefile index 77732c8..686e394 100644 --- a/makefile +++ b/makefile @@ -1,6 +1,6 @@ ccarm=mips-openwrt-linux-g++ all: - g++ main.cpp -o dupd -static -lrt + g++ main.cpp common.cpp log.cpp -I. -o dupd -static -lrt -std=c++11 release: g++ main.cpp -o dupd_amd64 -static -lrt g++ main.cpp -o dupd_x86 -static -lrt -m32