From 844eac0d3d31b82c54fe16685bc0b25be903b88d Mon Sep 17 00:00:00 2001 From: wangyu- Date: Thu, 5 Oct 2017 12:21:06 -0500 Subject: [PATCH] mainly function done --- common.cpp | 46 ++++++++++ common.h | 14 +++ fec_manager.cpp | 201 ++++++++++++++++++++++++++++++++++++++++++ fec_manager.h | 229 ++++++++++++++++++++++++++++++++++++++++++++---- main.cpp | 44 +++++++++- makefile | 2 +- packet.cpp | 95 +------------------- packet.h | 4 +- 8 files changed, 522 insertions(+), 113 deletions(-) diff --git a/common.cpp b/common.cpp index f999390..f23c4c1 100644 --- a/common.cpp +++ b/common.cpp @@ -52,6 +52,47 @@ u32_t get_u64_l(u64_t a) return (a<<32u)>>32u; } +void write_u16(char * p,u16_t w) +{ + *(unsigned char*)(p + 1) = (w & 0xff); + *(unsigned char*)(p + 0) = (w >> 8); +} +u16_t read_u16(char * p) +{ + u16_t res; + res = *(const unsigned char*)(p + 0); + res = *(const unsigned char*)(p + 1) + (res << 8); + return res; +} + +void write_u32(char * p,u32_t l) +{ + *(unsigned char*)(p + 3) = (unsigned char)((l >> 0) & 0xff); + *(unsigned char*)(p + 2) = (unsigned char)((l >> 8) & 0xff); + *(unsigned char*)(p + 1) = (unsigned char)((l >> 16) & 0xff); + *(unsigned char*)(p + 0) = (unsigned char)((l >> 24) & 0xff); +} +u32_t read_u32(char * p) +{ + u32_t res; + res = *(const unsigned char*)(p + 0); + res = *(const unsigned char*)(p + 1) + (res << 8); + res = *(const unsigned char*)(p + 2) + (res << 8); + res = *(const unsigned char*)(p + 3) + (res << 8); + return res; +} + +void write_u64(char * s,u64_t a) +{ + assert(0==1); +} +u64_t read_u64(char * s) +{ + assert(0==1); + return 0; +} + + char * my_ntoa(u32_t ip) { in_addr a; @@ -431,3 +472,8 @@ char * ip_port_t::to_s() sprintf(res,"%s:%d",my_ntoa(ip),port); return res; } + +int round_up_div(int a,int b) +{ + return (a+b-1)/b; +} diff --git a/common.h b/common.h index 1fce585..5661d39 100644 --- a/common.h +++ b/common.h @@ -58,6 +58,9 @@ typedef long long i64_t; typedef unsigned int u32_t; typedef int i32_t; +typedef unsigned short u16_t; +typedef short i16_t; + typedef u64_t my_time_t; const int max_data_len=1600; @@ -165,6 +168,15 @@ u32_t get_u64_h(u64_t a); u32_t get_u64_l(u64_t a); +void write_u16(char *,u16_t a); +u16_t read_u16(char *); + +void write_u32(char *,u32_t a); +u32_t read_u32(char *); + +void write_u64(char *,u64_t a); +u64_t read_uu64(char *); + char * my_ntoa(u32_t ip); void myexit(int a); @@ -195,6 +207,8 @@ int random_between(u32_t a,u32_t b); int set_timer_ms(int epollfd,int &timer_fd,u32_t timer_interval); +int round_up_div(int a,int b); + /* int create_new_udp(int &new_udp_fd,int remote_address_uint32,int remote_port); */ diff --git a/fec_manager.cpp b/fec_manager.cpp index a8947a6..2506573 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -5,6 +5,207 @@ * Author: root */ +#include "fec_manager.h" +#include "log.h" +#include "common.h" +#include "lib/rs.h" +u32_t seq=0; +blob_encode_t::blob_encode_t() +{ + clear(); +} +int blob_encode_t::clear() +{ + counter=0; + current_len=(int)sizeof(u32_t); + return 0; +} + +int blob_encode_t::get_num() +{ + return counter; +} +int blob_encode_t::get_shard_len(int n) +{ + return round_up_div(current_len,n); +} + +int blob_encode_t::get_shard_len(int n,int next_packet_len) +{ + return round_up_div(current_len+(int)sizeof(u16_t)+next_packet_len,n); +} + +int blob_encode_t::input(char *s,int len) +{ + assert(current_len+len+sizeof(u16_t) <=256*buf_len); + assert(len<=65535&&len>=0); + counter++; + assert(counter<=max_packet_num); + write_u16(buf+current_len,len); + current_len+=sizeof(u16_t); + memcpy(buf+current_len,s,len); + current_len+=len; + return 0; +} + +int blob_encode_t::output(int n,char ** &s_arr,int & len) +{ + static char *output_arr[256+100]; + len=round_up_div(current_len,n); + write_u32(buf,counter); + for(int i=0;icurrent_len) return -1; + + n=(int)read_u32(buf+parser_pos); + if(n>max_packet_num) {mylog(log_info,"failed 1\n");return -1;} + s_arr=s_buf; + len_arr=len_buf; + + parser_pos+=sizeof(u32_t); + for(int i=0;icurrent_len) {mylog(log_info,"failed2 \n");return -1;} + len_arr[i]=(int)read_u16(buf+parser_pos); + parser_pos+=(int)sizeof(u16_t); + if(parser_pos+len_arr[i]>current_len) {mylog(log_info,"failed 3 %d %d %d\n",parser_pos,len_arr[i],current_len);return -1;} + s_arr[i]=buf+parser_pos; + parser_pos+=len_arr[i]; + } + return 0; +} + +fec_encode_manager_t::fec_encode_manager_t() +{ + re_init(); +} +int fec_encode_manager_t::re_init() +{ + fec_data_num=4; + fec_redundant_num=2; + fec_mtu=1200; + + counter=0; + blob_encode.clear(); + ready_for_output=0; + return 0; +} +int fec_encode_manager_t::input(char *s,int len,int &is_first_packet) +{ + is_first_packet=0; + if(s==0 ||blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) + { + char ** blob_output; + int blob_len; + assert(counter!=0); + blob_encode.output(fec_data_num,blob_output,blob_len); + for(int i=0;i st; + int index; + anti_replay_t() + { + memset(replay_buffer,-1,sizeof(replay_buffer)); + st.rehash(anti_replay_buff_size*10); + index=0; + } + void set_invaild(u32_t seq) + { + if(st.find(seq)!=st.end() ) + { + mylog(log_trace,"seq %llx exist\n",seq); + return; + //return 0; + } + if(replay_buffer[index]!=u64_t(i64_t(-1))) + { + assert(st.find(replay_buffer[index])!=st.end()); + st.erase(replay_buffer[index]); + } + replay_buffer[index]=seq; + st.insert(seq); + index++; + if(index==int(anti_replay_buff_size)) index=0; + //return 1; //for complier check + } + int is_vaild(u32_t seq) + { + return st.find(seq)==st.end(); + } +}; struct blob_encode_t { - int input(char *,int l); - int ready_for_output(); - int output(char ** &,int l,int &n); + char buf[(256+5)*buf_len]; + int current_len; + int counter; + + blob_encode_t(); + + int clear(); + + int get_num(); + int get_shard_len(int n); + int get_shard_len(int n,int next_packet_len); + + int input(char *s,int len); //len=use len=0 for second and following packet + int output(int n,char ** &s_arr,int & len); }; struct blob_decode_t { - int input(char *,int l); - int ready_for_output(); - int output(char ** &,int l,int &n); + char buf[(256+5)*buf_len]; + int current_len; + int last_len; + int counter; + + blob_decode_t(); + int clear(); + int input(char *input,int len); + int output(int &n,char ** &output,int *&len_arr); }; + +struct fec_encode_manager_t +{ + int fec_data_num,fec_redundant_num; + int fec_mtu; + char buf[256+5][buf_len+100]; + char *output_buf[256+5]; + int output_len; + int ready_for_output; + + int counter; + + blob_encode_t blob_encode; + fec_encode_manager_t(); + int re_init(); + int input(char *s,int len,int &is_first_packet); + int output(int &n,char ** &s_arr,int &len); +}; +struct fec_data_t +{ + int used; + u32_t seq; + int data_num; + int redundant_num; + int idx; + int type; + char buf[buf_len]; + int len; +}; +struct fec_decode_manager_t +{ + anti_replay_t anti_replay; + fec_data_t fec_data[fec_buff_size]; + int index; + unordered_map > mp; + blob_decode_t blob_decode; + + fec_decode_manager_t() + { + for(int i=0;i<(int)fec_buff_size;i++) + fec_data[i].used=0; + ready_for_output=0; + } + + int output_n; + char ** output_s_arr; + int * output_len_arr; + + int ready_for_output; + int input(char *s,int len) + { + char *ori_s=s; + u32_t seq=read_u32(s); + s+=sizeof(u32_t); + int data_num=(unsigned char)*(s++); + int redundant_num=(unsigned char)*(s++); + int innder_index=(unsigned char)*(s++); + int type=(unsigned char)*(s++); + len=len-int(s-ori_s); + if(len<0) + { + return -1; + } + + if(!anti_replay.is_vaild(seq)) + { + return 0; + } + if(!mp[seq].empty()) + { + int tmp_idx=mp[seq].begin()->second; + int ok=1; + if(data_num+redundant_num>255) + ok=0; + if(fec_data[tmp_idx].data_num!=data_num||fec_data[tmp_idx].redundant_num!=redundant_num||fec_data[tmp_idx].len!=len) + { + ok=0; + } + if(ok==0) + { + return 0; + } + } + if(fec_data[index].used!=0) + { + int tmp_seq=fec_data[index].seq; + anti_replay.set_invaild(tmp_seq); + if(mp.find(tmp_seq)!=mp.end()) + { + mp.erase(tmp_seq); + } + } + + fec_data[index].used=1; + fec_data[index].seq=seq; + fec_data[index].data_num=data_num; + fec_data[index].redundant_num=redundant_num; + fec_data[index].idx=innder_index; + fec_data[index].type=type; + fec_data[index].len=len; + mp[seq][innder_index]=index; + + + map &inner_mp=mp[seq]; + if((int)inner_mp.size()>=data_num) + { + anti_replay.set_invaild(seq); + char *fec_tmp_arr[256+5]={0}; + for(auto it=inner_mp.begin();it!=inner_mp.end();it++) + { + fec_tmp_arr[it->first]=fec_data[it->second].buf; + } + rs_decode2(data_num,redundant_num,fec_tmp_arr,len); + blob_decode.clear(); + for(int i=0;i",data[i]); } fec_free(code); + + + char arr2[6][100]= + { + "aaa11111","","ccc333333333" + ,"ddd444","eee5555","ff6666" + }; + blob_encode_t blob_encode; + for(int i=0;i<6;i++) + blob_encode.input(arr2[i],strlen(arr2[i])); + + char **output; + int shard_len; + blob_encode.output(7,output,shard_len); + + + printf("",shard_len); + blob_decode_t blob_decode; + for(int i=0;i<7;i++) + { + blob_decode.input(output[i],shard_len); + } + + char **decode_output; + int * len_arr; + int num; + + + ret=blob_decode.output(num,decode_output,len_arr); + + printf("\n",num,ret); + for(int i=0;i",len_arr[i],buf); + } + printf("\n"); + + fec_encode_manager_t fec_encode_manager; + fec_decode_manager_t fec_decode_manager; + return 0; } void print_help() diff --git a/makefile b/makefile index b7b846b..bd35cf2 100755 --- a/makefile +++ b/makefile @@ -8,7 +8,7 @@ cc_arm= /toolchains/arm-2014.05/bin/arm-none-linux-gnueabi-g++ #cc_bcm2708=/home/wangyu/raspberry/tools/arm-bcm2708/gcc-linaro-arm-linux-gnueabihf-raspbian/bin/arm-linux-gnueabihf-g++ FLAGS= -std=c++11 -Wall -Wextra -Wno-unused-variable -Wno-unused-parameter -Wno-missing-field-initializers -SOURCES=main.cpp log.cpp common.cpp lib/fec.c lib/rs.c packet.cpp delay_manager.cpp fd_manager.cpp connection.cpp +SOURCES=main.cpp log.cpp common.cpp lib/fec.c lib/rs.c packet.cpp delay_manager.cpp fd_manager.cpp connection.cpp fec_manager.cpp NAME=speeder TARGETS=amd64 arm mips24kc_be x86 mips24kc_le diff --git a/packet.cpp b/packet.cpp index 771c413..7bd1465 100644 --- a/packet.cpp +++ b/packet.cpp @@ -17,7 +17,6 @@ u64_t dup_packet_send_count=0; u64_t packet_recv_count=0; u64_t dup_packet_recv_count=0; typedef u64_t anti_replay_seq_t; -const u32_t anti_replay_buff_size=10000; int disable_replay_filter=0; int random_drop=0; @@ -27,68 +26,6 @@ char key_string[1000]= "secret key"; int local_listen_fd=-1; - -struct anti_replay_t -{ - u64_t max_packet_received; - - u64_t replay_buffer[anti_replay_buff_size]; - unordered_set st; - u32_t const_id; - u32_t anti_replay_seq; - int index; - anti_replay_seq_t get_new_seq_for_send() - { - if(const_id==0) prepare(); - anti_replay_seq_t res=const_id; - res<<=32u; - anti_replay_seq++; - res|=anti_replay_seq; - const_id=0; - return res; - } - void prepare() - { - anti_replay_seq=get_true_random_number();//random first seq - const_id=get_true_random_number_nz(); - } - anti_replay_t() - { - memset(replay_buffer,0,sizeof(replay_buffer)); - st.rehash(anti_replay_buff_size*10); - max_packet_received=0; - index=0; - } - - int is_vaild(u64_t seq) - { - if(const_id==0) prepare(); - //if(disable_replay_filter) return 1; - if(seq==0) - { - mylog(log_debug,"seq=0\n"); - return 0; - } - if(st.find(seq)!=st.end() ) - { - mylog(log_trace,"seq %llx exist\n",seq); - return 0; - } - - if(replay_buffer[index]!=0) - { - assert(st.find(replay_buffer[index])!=st.end()); - st.erase(replay_buffer[index]); - } - replay_buffer[index]=seq; - st.insert(seq); - index++; - if(index==int(anti_replay_buff_size)) index=0; - - return 1; //for complier check - } -}anti_replay; - void encrypt_0(char * input,int &len,char *key) { int i,j; @@ -110,32 +47,6 @@ void decrypt_0(char * input,int &len,char *key) input[i]^=key[j]; } } -int add_seq(char * data,int &data_len ) -{ - if(data_len<0) return -1; - anti_replay_seq_t seq=anti_replay.get_new_seq_for_send(); - seq=hton64(seq); - memcpy(data+data_len,&seq,sizeof(seq)); - data_len+=sizeof(seq); - return 0; -} -int remove_seq(char * data,int &data_len) -{ - anti_replay_seq_t seq; - if(data_len