From 6632b716b86926d19ed9be2c9c912815a5c4e465 Mon Sep 17 00:00:00 2001 From: Mykola Karpets Date: Mon, 10 Mar 2025 18:16:26 +0200 Subject: [PATCH] Add systemd socket activation capability Add support for a systemd.socket activation. Socket it passed from systemd via file descriptor (see systemd.socket doc) Also, added a few features for convenience: * --conn/conv-timeout options to control conversation and connection timeouts * --shutdown option to shutdown service after all connections are timeouted. This is for convent usage with systemd.socket * test script which allows to smoke test udp speeder and its socket activation capability * example systemd unit files --- common.cpp | 2 +- common.h | 4 +- connection.cpp | 8 ++- connection.h | 5 +- examples/udp_speeder.service | 9 +++ examples/udp_speeder.socket | 9 +++ main.cpp | 3 + makefile | 2 +- misc.cpp | 15 ++++ misc.h | 2 + test/README.md | 62 ++++++++++++++++ test/udp_speeder_test.py | 133 +++++++++++++++++++++++++++++++++++ tunnel_server.cpp | 87 ++++++++++++++++++++++- 13 files changed, 333 insertions(+), 8 deletions(-) create mode 100644 examples/udp_speeder.service create mode 100644 examples/udp_speeder.socket create mode 100644 test/README.md create mode 100644 test/udp_speeder_test.py diff --git a/common.cpp b/common.cpp index 8e61cb8..faf2d2c 100644 --- a/common.cpp +++ b/common.cpp @@ -854,7 +854,7 @@ int new_listen_socket2(int &fd, address_t &addr) { setnonblocking(fd); set_buf_size(fd, socket_buf_size); - mylog(log_debug, "local_listen_fd=%d\n", fd); + mylog(log_debug, "[%s]local_listen_fd=%d\n", addr.get_str(), fd); return 0; } diff --git a/common.h b/common.h index 9f59799..48c3899 100644 --- a/common.h +++ b/common.h @@ -107,7 +107,7 @@ const int default_mtu = 1250; // const u32_t timer_interval=400; ////const u32_t conv_timeout=180000; // const u32_t conv_timeout=40000;//for test -const u32_t conv_timeout = 180000; +// const u32_t conv_timeout = 180000; const int max_conv_num = 10000; const int max_conn_num = 200; @@ -143,7 +143,7 @@ const u32_t client_conn_timeout = 10000; const u32_t client_conn_uplink_timeout = client_conn_timeout + 2000; // const uint32_t server_conn_timeout=conv_timeout+60000;//this should be 60s+ longer than conv_timeout,so that conv_manager can destruct convs gradually,to avoid latency glicth -const u32_t server_conn_timeout = conv_timeout + 20000; // for test +// const u32_t server_conn_timeout = conv_timeout + 20000; // for test extern int about_to_exit; diff --git a/connection.cpp b/connection.cpp index 9a0ee89..deabb5d 100644 --- a/connection.cpp +++ b/connection.cpp @@ -12,6 +12,8 @@ const int disable_conn_clear = 0; // a raw connection is called conn. int report_interval = 0; +u32_t server_conn_timeout_s = 60; // default connection timeout in seconds +u32_t conv_timeout_s = 30; // default conversation timeout in seconds void server_clear_function(u64_t u64) // used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection), // so we have to close the fd when conv expires @@ -114,7 +116,7 @@ int conn_manager_t::clear_inactive0() { if (it->second->conv_manager.s.get_size() > 0) { // mylog(log_info,"[%s:%d]size %d \n",my_ntoa(get_u64_h(it->first)),get_u64_l(it->first),(int)it->second->conv_manager.get_size()); it++; - } else if (current_time < it->second->last_active_time + server_conn_timeout) { + } else if (current_time < it->second->last_active_time + server_conn_timeout_s * 1000) { it++; } else { address_t tmp_addr = it->first; // avoid making get_str() const; @@ -128,3 +130,7 @@ int conn_manager_t::clear_inactive0() { clear_it = it; return 0; } + +bool conn_manager_t::has_active_connections() { + return !mp.empty(); +} diff --git a/connection.h b/connection.h index 0b7f8ac..f58d746 100644 --- a/connection.h +++ b/connection.h @@ -9,6 +9,8 @@ #define CONNECTION_H_ extern int disable_anti_replay; +extern unsigned int server_conn_timeout_s; +extern unsigned int conv_timeout_s; #include "connection.h" #include "common.h" @@ -146,7 +148,7 @@ struct conv_manager_t // manage the udp connections u32_t conv; my_time_t ts = lru.peek_back(conv); - if (current_time - ts < conv_timeout) break; + if (current_time - ts < conv_timeout_s * 1000) break; erase_conv(conv); if (info == 0) { @@ -318,6 +320,7 @@ struct conn_manager_t // manager for connections. for client,we dont need conn_ int erase(unordered_map::iterator erase_it); int clear_inactive(); int clear_inactive0(); + bool has_active_connections(); }; extern conn_manager_t conn_manager; diff --git a/examples/udp_speeder.service b/examples/udp_speeder.service new file mode 100644 index 0000000..a3d0b1e --- /dev/null +++ b/examples/udp_speeder.service @@ -0,0 +1,9 @@ +[Unit] +Description=UDP Speeder Service +After=network.target + +[Service] +ExecStart=/home/mkarpets/repos/UDPspeeder/speederv2 -s -l0.0.0.0:4096 -r127.0.0.1:7777 -f20:10 --log-level 4 --conv-timeout 10 --conn-timeout 20 --shutdown + +[Install] +WantedBy=multi-user.target diff --git a/examples/udp_speeder.socket b/examples/udp_speeder.socket new file mode 100644 index 0000000..077d33f --- /dev/null +++ b/examples/udp_speeder.socket @@ -0,0 +1,9 @@ +[Unit] +Description=UDP Speeder Socket + +[Socket] +ListenDatagram=0.0.0.0:4096 +FreeBind=true + +[Install] +WantedBy=sockets.target diff --git a/main.cpp b/main.cpp index 326d9bc..8556f21 100644 --- a/main.cpp +++ b/main.cpp @@ -50,6 +50,9 @@ static void print_help() { printf(" --disable-obscure disable obscure, to save a bit bandwidth and cpu\n"); printf(" --disable-checksum disable checksum to save a bit bandwdith and cpu\n"); // printf(" --disable-xor disable xor\n"); + printf(" --conn-timeout connection timeout in seconds, default: 60\n"); + printf(" --conv-timeout conversation timeout in seconds, default: 30\n"); + printf(" --shutdown shut down after all connections are disconnected\n"); printf("developer options:\n"); printf(" --fifo use a fifo(named pipe) for sending commands to the running program, so that you\n"); diff --git a/makefile b/makefile index 80ad70c..9fef0fe 100755 --- a/makefile +++ b/makefile @@ -26,7 +26,7 @@ export STAGING_DIR=/tmp/ #just for supress warning of staging_dir not define # targets for nativei (non-cross) compile all:git_version rm -f ${NAME} - ${cc_local} -o ${NAME} -I. ${SOURCES} ${FLAGS} -lrt -ggdb -static -O2 + ${cc_local} -DSYSTEMD_SOCKET_ACTIVATION -o ${NAME} -I. ${SOURCES} ${FLAGS} -lrt -lsystemd -ggdb -O2 freebsd:git_version rm -f ${NAME} diff --git a/misc.cpp b/misc.cpp index 5918e57..bdc5187 100644 --- a/misc.cpp +++ b/misc.cpp @@ -54,6 +54,8 @@ int mssfix = default_mtu; int manual_set_tun = 0; int persist_tun = 0; +bool shutdown_if_all_disconnected = false; + char rs_par_str[rs_str_len] = "20:10"; int from_normal_to_fec(conn_info_t &conn_info, char *data, int len, int &out_n, char **&out_arr, int *&out_len, my_time_t *&out_delay) { @@ -225,6 +227,7 @@ int print_parameter() { jitter_min / 1000, jitter_max / 1000, output_interval_min / 1000, output_interval_max / 1000, g_fec_par.timeout / 1000, g_fec_par.mtu, g_fec_par.queue_len, g_fec_par.mode); mylog(log_info, "fec_str=%s\n", rs_par_str); mylog(log_info, "fec_inner_parameter=%s\n", g_fec_par.rs_to_str()); + mylog(log_info, "conv_timeout=%d conn_timeout=%d\n", conv_timeout_s, server_conn_timeout_s); return 0; } int handle_command(char *s) { @@ -582,6 +585,9 @@ void process_arg(int argc, char *argv[]) { {"persist-tun", no_argument, 0, 1}, {"manual-set-tun", no_argument, 0, 1}, {"interval", required_argument, 0, 'i'}, + {"conn-timeout", required_argument, 0, 1}, + {"conv-timeout", required_argument, 0, 1}, + {"shutdown", no_argument, 0, 1}, {NULL, 0, 0, 0}}; int option_index = 0; assert(g_fec_par.rs_from_str(rs_par_str) == 0); @@ -842,6 +848,15 @@ void process_arg(int argc, char *argv[]) { } else if (strcmp(long_options[option_index].name, "mssfix") == 0) { sscanf(optarg, "%d", &mssfix); mylog(log_warn, "mssfix=%d\n", mssfix); + } else if (strcmp(long_options[option_index].name, "conn-timeout") == 0) { + sscanf(optarg, "%u", &server_conn_timeout_s); + mylog(log_warn, "conn_timeout=%d\n", server_conn_timeout_s); + } else if (strcmp(long_options[option_index].name, "conv-timeout") == 0) { + sscanf(optarg, "%u", &conv_timeout_s); + mylog(log_warn, "conv_timeout=%d\n", conv_timeout_s); + } else if (strcmp(long_options[option_index].name, "shutdown") == 0) { + shutdown_if_all_disconnected = true; + mylog(log_warn, "shutdown enabled\n"); } else { mylog(log_fatal, "unknown option\n"); myexit(-1); diff --git a/misc.h b/misc.h index 6ed637e..c1fe1ca 100644 --- a/misc.h +++ b/misc.h @@ -58,6 +58,8 @@ extern int mssfix; extern int manual_set_tun; extern int persist_tun; +extern bool shutdown_if_all_disconnected; + int from_normal_to_fec(conn_info_t &conn_info, char *data, int len, int &out_n, char **&out_arr, int *&out_len, my_time_t *&out_delay); int from_fec_to_normal(conn_info_t &conn_info, char *data, int len, int &out_n, char **&out_arr, int *&out_len, my_time_t *&out_delay); diff --git a/test/README.md b/test/README.md new file mode 100644 index 0000000..2668a96 --- /dev/null +++ b/test/README.md @@ -0,0 +1,62 @@ +Testing UDP speeder +--- + +### Build +``` +export UDP_SPEEDER_REPO= +make -C ${UDP_SPEEDER_REPO} +``` + +### Simple test +``` +# Start server +python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \ + --speederv2-path ${UDP_SPEEDER_REPO}/speederv2 \ + --mode server \ + --extra-args "--conn-timeout 20 --conv-timeout 10" \ + --log-level info + +# Start client +python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \ + --speederv2-path ${UDP_SPEEDER_REPO}/UDPspeeder/speederv2 \ + --mode client \ + --log-level info +``` + +### Test with systemd-socket-activate +``` +# Start server with --socket-activate +python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \ + --speederv2-path ${UDP_SPEEDER_REPO}/speederv2 \ + --mode server \ + --socket-activate \ + --extra-args "--conn-timeout 20 --conv-timeout 10" \ + --log-level info + +#Start client +``` + +### Test with real systemd units +``` +# Add unit files +sudo ln -s ${UDP_SPEEDER_REPO}/test/udp_speeder.socket /etc/systemd/system/ +sudo ln -s ${UDP_SPEEDER_REPO}/test/udp_speeder.service /etc/systemd/system/ + +# Enable them to systemd +sudo systemctl daemon-reload +sudo systemctl enable udp_speeder.socket +sudo systemctl enable udp_speeder.socket + +# Observe status and logs +sudo systemctl status udp_speeder.socket +journalctl -f -u udp_speeder.socket -u udp_speeder.service + +# Start test server without speederv2 (will be started by udp_speeder.socket) +python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \ + --mode server \ + --no-udpspeeder \ + --log-level info + +# Start client +``` + diff --git a/test/udp_speeder_test.py b/test/udp_speeder_test.py new file mode 100644 index 0000000..dd62f9b --- /dev/null +++ b/test/udp_speeder_test.py @@ -0,0 +1,133 @@ +import socket +import subprocess +import sys +import threading +import signal +import time +import argparse + +LOG_LEVEL_MAP = { + "never": 0, + "fatal": 1, + "error": 2, + "warn": 3, + "info": 4, + "debug": 5, + "trace": 6 +} + +class UDPspeederTest: + def __init__(self, speederv2_path, log_level="info", socket_activate=False, extra_udpspeeder_args="", no_udpspeeder=False): + self.speederv2_path = speederv2_path + self.log_level = log_level + self.socket_activate = socket_activate + self.extra_udpspeeder_args = extra_udpspeeder_args + self.no_udpspeeder = no_udpspeeder + + def run_speederv2(self, mode, local_port, remote_ip, remote_port): + log_level_num = LOG_LEVEL_MAP.get(self.log_level, 4) # Default to "info" if log_level is not found + cmd = f"{self.speederv2_path} {mode} -l0.0.0.0:{local_port} -r{remote_ip}:{remote_port} -f20:10 --log-level {log_level_num} {self.extra_udpspeeder_args}" + if self.socket_activate: + cmd = f"systemd-socket-activate -l0.0.0.0:{local_port} -d {cmd}" + print(f"UPDspeeder command: {cmd}") + + process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return process + + def start_udpspeeder_server(self, local_port, remote_port): + return self.run_speederv2("-s", local_port, "127.0.0.1", remote_port) + + def start_udpspeeder_client(self, local_port, remote_ip, remote_port): + return self.run_speederv2("-c", local_port, remote_ip, remote_port) + + def server(self, udp_speeder_port, server_port): + # Start UDPspeeder server if not disabled + if not self.no_udpspeeder: + udpspeeder_process = self.start_udpspeeder_server(udp_speeder_port, server_port) + threading.Thread(target=self.log_udpspeeder_output, args=(udpspeeder_process,)).start() + + # Start UDP server + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind(("0.0.0.0", server_port)) + print(f"Server listening on port {server_port}") + + def cleanup(signum, frame): + print("Shutting down server...") + sock.close() + if not self.no_udpspeeder: + udpspeeder_process.terminate() + sys.exit(0) + + signal.signal(signal.SIGTERM, cleanup) + + while True: + data, addr = sock.recvfrom(1024) + print(f"Received message: {data.decode()} from {addr}") + reply = f"Server reply to {data.decode()}" + sock.sendto(reply.encode(), addr) + print(f"Sent reply: {reply} ==============================================================") + + def client(self, us_client_port, us_server_port): + # Start UDPspeeder client if not disabled + if not self.no_udpspeeder: + udpspeeder_process = self.start_udpspeeder_client(us_client_port, "127.0.0.1", us_server_port) + threading.Thread(target=self.log_udpspeeder_output, args=(udpspeeder_process,)).start() + + # Start UDP client + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + server_address = ("127.0.0.1", us_client_port) + message = "Hello, Server!" + + def cleanup(signum, frame): + print("Shutting down client...") + sock.close() + if not self.no_udpspeeder: + udpspeeder_process.terminate() + sys.exit(0) + + signal.signal(signal.SIGTERM, cleanup) + sock.settimeout(1.0) + + while True: + print(f"Sending message: {message}") + sent = sock.sendto(message.encode(), server_address) + + try: + data, server = sock.recvfrom(1024) + except socket.timeout: + print("Request timed out") + continue + + print(f"Received reply: {data.decode()} ==============================================================") + time.sleep(1) + + sock.close() + + def log_udpspeeder_output(self, process): + for line in process.stdout: + print(line.decode(), end='') + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="UDPspeeder test script") + parser.add_argument("--mode", choices=["server", "client"], help="Mode to run: 'server' or 'client'") + parser.add_argument("--us-server-port", type=int, default=4096, help="UDPspeeder server port") + parser.add_argument("--us-client-port", type=int, default=3333, help="UDPspeeder client port") + parser.add_argument("--server-port", type=int, default=7777, help="Server port behind UDPspeeder") + parser.add_argument("--log-level", choices=LOG_LEVEL_MAP.keys(), default="info", nargs="?", help="Log level: 'never', 'fatal', 'error', 'warn', 'info', 'debug', 'trace'") + parser.add_argument("--socket-activate", action="store_true", help="Test systemd socket activation") + parser.add_argument("--extra-args", help="Extra arguments to pass to UDPspeeder") + parser.add_argument("--no-udpspeeder", action="store_true", help="Do not start UDPspeeder") + parser.add_argument("--speederv2-path", default="speederv2", help="Path to the speederv2 binary") + + + args = parser.parse_args() + + udpspeeder_test = UDPspeederTest(args.speederv2_path, args.log_level, args.socket_activate, args.extra_args, args.no_udpspeeder) + + if args.mode == "server": + udpspeeder_test.server(args.us_server_port, args.server_port) + elif args.mode == "client": + udpspeeder_test.client(args.us_client_port, args.us_server_port) + else: + print("Invalid mode. Use 'server' or 'client'.") + sys.exit(1) \ No newline at end of file diff --git a/tunnel_server.cpp b/tunnel_server.cpp index aaa82c7..76d7af8 100644 --- a/tunnel_server.cpp +++ b/tunnel_server.cpp @@ -6,6 +6,10 @@ */ #include "tunnel.h" +#ifdef SYSTEMD_SOCKET_ACTIVATION +#include +#endif +#include static void conn_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents); static void fec_encode_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents); @@ -15,6 +19,8 @@ enum tmp_mode_t { is_from_remote = 0, is_fec_timeout, is_conn_timer }; +bool first_connection_established = false; + void data_from_remote_or_fec_timeout_or_conn_timer(conn_info_t &conn_info, fd64_t fd64, tmp_mode_t mode) { int ret; @@ -193,6 +199,7 @@ static void local_listen_cb(struct ev_loop *loop, struct ev_io *watcher, int rev conn_info.fec_encode_manager.set_loop_and_cb(loop, fec_encode_cb); mylog(log_info, "new connection from %s\n", addr.get_str()); + first_connection_established = true; } conn_info_t &conn_info = conn_manager.find_insert(addr); @@ -316,9 +323,61 @@ static void global_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int // uint64_t value; // read(timer.get_timer_fd(), &value, 8); conn_manager.clear_inactive(); + + if (shutdown_if_all_disconnected && first_connection_established && !conn_manager.has_active_connections()) { + mylog(log_info, "No active connections, exiting...\n"); + ev_break(loop, EVBREAK_ALL); + } + mylog(log_trace, "events[idx].data.u64==(u64_t)timer.get_timer_fd()\n"); } +void print_socket_info(int fd) { + struct sockaddr_storage addr; + socklen_t addr_len = sizeof(addr); + if (getsockname(fd, (struct sockaddr *)&addr, &addr_len) == -1) { + mylog(log_error, "getsockname failed: %s\n", strerror(errno)); + return; + } + + char ip_str[INET6_ADDRSTRLEN]; + void *ip_addr; + int port; + const char *family_str; + + if (addr.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&addr; + ip_addr = &(s->sin_addr); + port = ntohs(s->sin_port); + family_str = "AF_INET"; + } else { + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr; + ip_addr = &(s->sin6_addr); + port = ntohs(s->sin6_port); + family_str = "AF_INET6"; + } + + inet_ntop(addr.ss_family, ip_addr, ip_str, sizeof(ip_str)); + + int sock_type; + socklen_t optlen = sizeof(sock_type); + if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &sock_type, &optlen) == -1) { + mylog(log_error, "getsockopt failed: %s\n", strerror(errno)); + return; + } + + const char *type_str; + if (sock_type == SOCK_DGRAM) { + type_str = "SOCK_DGRAM"; + } else if (sock_type == SOCK_STREAM) { + type_str = "SOCK_STREAM"; + } else { + type_str = "UNKNOWN"; + } + + mylog(log_info, "Socket info - Family: %s, Type: %s, IP: %s, Port: %d\n", family_str, type_str, ip_str, port); +} + int tunnel_server_event_loop() { int i, j, k; int ret; @@ -326,8 +385,32 @@ int tunnel_server_event_loop() { // int epoll_fd; // int remote_fd; - int local_listen_fd; - new_listen_socket2(local_listen_fd, local_addr); + int local_listen_fd = -1; +#ifdef SYSTEMD_SOCKET_ACTIVATION + int n = sd_listen_fds(0); + if (sd_listen_fds(0) > 0) { + if (n != 1) { + mylog(log_fatal, "expect exactly 1 socket passed from systemd, but got %d\n", n); + myexit(-1); + } + + int fd = SD_LISTEN_FDS_START; + // print_socket_info(fd); + + int ret = sd_is_socket_inet(fd, local_addr.get_type(), SOCK_DGRAM, -1, local_addr.get_port()); + mylog(log_info, "sd_is_socket_inet returned: %d\n", ret); + if (ret <= 0) { + mylog(log_fatal, "socket is not UDP\n"); + myexit(-1); + } + + local_listen_fd = fd; + mylog(log_info, "Using socket passed from systemd\n"); + } +#endif + if (local_listen_fd < 0) { + new_listen_socket2(local_listen_fd, local_addr); + } // epoll_fd = epoll_create1(0); // assert(epoll_fd>0);