Add systemd socket activation capability

Add support for a systemd.socket activation. Socket it passed from
systemd via file descriptor (see systemd.socket doc)
Also, added a few features for convenience:
  * --conn/conv-timeout options to control conversation and connection
    timeouts
  * --shutdown option to shutdown service after all connections are
    timeouted. This is for convent usage with systemd.socket
  * test script which allows to smoke test udp speeder and its
    socket activation capability
  * example systemd unit files
This commit is contained in:
Mykola Karpets 2025-03-10 18:16:26 +02:00
parent 17694ecaa9
commit 6632b716b8
13 changed files with 333 additions and 8 deletions

View File

@ -854,7 +854,7 @@ int new_listen_socket2(int &fd, address_t &addr) {
setnonblocking(fd);
set_buf_size(fd, socket_buf_size);
mylog(log_debug, "local_listen_fd=%d\n", fd);
mylog(log_debug, "[%s]local_listen_fd=%d\n", addr.get_str(), fd);
return 0;
}

View File

@ -107,7 +107,7 @@ const int default_mtu = 1250;
// const u32_t timer_interval=400;
////const u32_t conv_timeout=180000;
// const u32_t conv_timeout=40000;//for test
const u32_t conv_timeout = 180000;
// const u32_t conv_timeout = 180000;
const int max_conv_num = 10000;
const int max_conn_num = 200;
@ -143,7 +143,7 @@ const u32_t client_conn_timeout = 10000;
const u32_t client_conn_uplink_timeout = client_conn_timeout + 2000;
// const uint32_t server_conn_timeout=conv_timeout+60000;//this should be 60s+ longer than conv_timeout,so that conv_manager can destruct convs gradually,to avoid latency glicth
const u32_t server_conn_timeout = conv_timeout + 20000; // for test
// const u32_t server_conn_timeout = conv_timeout + 20000; // for test
extern int about_to_exit;

View File

@ -12,6 +12,8 @@
const int disable_conn_clear = 0; // a raw connection is called conn.
int report_interval = 0;
u32_t server_conn_timeout_s = 60; // default connection timeout in seconds
u32_t conv_timeout_s = 30; // default conversation timeout in seconds
void server_clear_function(u64_t u64) // used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection),
// so we have to close the fd when conv expires
@ -114,7 +116,7 @@ int conn_manager_t::clear_inactive0() {
if (it->second->conv_manager.s.get_size() > 0) {
// mylog(log_info,"[%s:%d]size %d \n",my_ntoa(get_u64_h(it->first)),get_u64_l(it->first),(int)it->second->conv_manager.get_size());
it++;
} else if (current_time < it->second->last_active_time + server_conn_timeout) {
} else if (current_time < it->second->last_active_time + server_conn_timeout_s * 1000) {
it++;
} else {
address_t tmp_addr = it->first; // avoid making get_str() const;
@ -128,3 +130,7 @@ int conn_manager_t::clear_inactive0() {
clear_it = it;
return 0;
}
bool conn_manager_t::has_active_connections() {
return !mp.empty();
}

View File

@ -9,6 +9,8 @@
#define CONNECTION_H_
extern int disable_anti_replay;
extern unsigned int server_conn_timeout_s;
extern unsigned int conv_timeout_s;
#include "connection.h"
#include "common.h"
@ -146,7 +148,7 @@ struct conv_manager_t // manage the udp connections
u32_t conv;
my_time_t ts = lru.peek_back(conv);
if (current_time - ts < conv_timeout) break;
if (current_time - ts < conv_timeout_s * 1000) break;
erase_conv(conv);
if (info == 0) {
@ -318,6 +320,7 @@ struct conn_manager_t // manager for connections. for client,we dont need conn_
int erase(unordered_map<address_t, conn_info_t *>::iterator erase_it);
int clear_inactive();
int clear_inactive0();
bool has_active_connections();
};
extern conn_manager_t conn_manager;

View File

@ -0,0 +1,9 @@
[Unit]
Description=UDP Speeder Service
After=network.target
[Service]
ExecStart=/home/mkarpets/repos/UDPspeeder/speederv2 -s -l0.0.0.0:4096 -r127.0.0.1:7777 -f20:10 --log-level 4 --conv-timeout 10 --conn-timeout 20 --shutdown
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,9 @@
[Unit]
Description=UDP Speeder Socket
[Socket]
ListenDatagram=0.0.0.0:4096
FreeBind=true
[Install]
WantedBy=sockets.target

View File

@ -50,6 +50,9 @@ static void print_help() {
printf(" --disable-obscure <number> disable obscure, to save a bit bandwidth and cpu\n");
printf(" --disable-checksum <number> disable checksum to save a bit bandwdith and cpu\n");
// printf(" --disable-xor <number> disable xor\n");
printf(" --conn-timeout <number> connection timeout in seconds, default: 60\n");
printf(" --conv-timeout <number> conversation timeout in seconds, default: 30\n");
printf(" --shutdown shut down after all connections are disconnected\n");
printf("developer options:\n");
printf(" --fifo <string> use a fifo(named pipe) for sending commands to the running program, so that you\n");

View File

@ -26,7 +26,7 @@ export STAGING_DIR=/tmp/ #just for supress warning of staging_dir not define
# targets for nativei (non-cross) compile
all:git_version
rm -f ${NAME}
${cc_local} -o ${NAME} -I. ${SOURCES} ${FLAGS} -lrt -ggdb -static -O2
${cc_local} -DSYSTEMD_SOCKET_ACTIVATION -o ${NAME} -I. ${SOURCES} ${FLAGS} -lrt -lsystemd -ggdb -O2
freebsd:git_version
rm -f ${NAME}

View File

@ -54,6 +54,8 @@ int mssfix = default_mtu;
int manual_set_tun = 0;
int persist_tun = 0;
bool shutdown_if_all_disconnected = false;
char rs_par_str[rs_str_len] = "20:10";
int from_normal_to_fec(conn_info_t &conn_info, char *data, int len, int &out_n, char **&out_arr, int *&out_len, my_time_t *&out_delay) {
@ -225,6 +227,7 @@ int print_parameter() {
jitter_min / 1000, jitter_max / 1000, output_interval_min / 1000, output_interval_max / 1000, g_fec_par.timeout / 1000, g_fec_par.mtu, g_fec_par.queue_len, g_fec_par.mode);
mylog(log_info, "fec_str=%s\n", rs_par_str);
mylog(log_info, "fec_inner_parameter=%s\n", g_fec_par.rs_to_str());
mylog(log_info, "conv_timeout=%d conn_timeout=%d\n", conv_timeout_s, server_conn_timeout_s);
return 0;
}
int handle_command(char *s) {
@ -582,6 +585,9 @@ void process_arg(int argc, char *argv[]) {
{"persist-tun", no_argument, 0, 1},
{"manual-set-tun", no_argument, 0, 1},
{"interval", required_argument, 0, 'i'},
{"conn-timeout", required_argument, 0, 1},
{"conv-timeout", required_argument, 0, 1},
{"shutdown", no_argument, 0, 1},
{NULL, 0, 0, 0}};
int option_index = 0;
assert(g_fec_par.rs_from_str(rs_par_str) == 0);
@ -842,6 +848,15 @@ void process_arg(int argc, char *argv[]) {
} else if (strcmp(long_options[option_index].name, "mssfix") == 0) {
sscanf(optarg, "%d", &mssfix);
mylog(log_warn, "mssfix=%d\n", mssfix);
} else if (strcmp(long_options[option_index].name, "conn-timeout") == 0) {
sscanf(optarg, "%u", &server_conn_timeout_s);
mylog(log_warn, "conn_timeout=%d\n", server_conn_timeout_s);
} else if (strcmp(long_options[option_index].name, "conv-timeout") == 0) {
sscanf(optarg, "%u", &conv_timeout_s);
mylog(log_warn, "conv_timeout=%d\n", conv_timeout_s);
} else if (strcmp(long_options[option_index].name, "shutdown") == 0) {
shutdown_if_all_disconnected = true;
mylog(log_warn, "shutdown enabled\n");
} else {
mylog(log_fatal, "unknown option\n");
myexit(-1);

2
misc.h
View File

@ -58,6 +58,8 @@ extern int mssfix;
extern int manual_set_tun;
extern int persist_tun;
extern bool shutdown_if_all_disconnected;
int from_normal_to_fec(conn_info_t &conn_info, char *data, int len, int &out_n, char **&out_arr, int *&out_len, my_time_t *&out_delay);
int from_fec_to_normal(conn_info_t &conn_info, char *data, int len, int &out_n, char **&out_arr, int *&out_len, my_time_t *&out_delay);

62
test/README.md Normal file
View File

@ -0,0 +1,62 @@
Testing UDP speeder
---
### Build
```
export UDP_SPEEDER_REPO=<path to UPDspeeder repo>
make -C ${UDP_SPEEDER_REPO}
```
### Simple test
```
# Start server
python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \
--speederv2-path ${UDP_SPEEDER_REPO}/speederv2 \
--mode server \
--extra-args "--conn-timeout 20 --conv-timeout 10" \
--log-level info
# Start client
python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \
--speederv2-path ${UDP_SPEEDER_REPO}/UDPspeeder/speederv2 \
--mode client \
--log-level info
```
### Test with systemd-socket-activate
```
# Start server with --socket-activate
python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \
--speederv2-path ${UDP_SPEEDER_REPO}/speederv2 \
--mode server \
--socket-activate \
--extra-args "--conn-timeout 20 --conv-timeout 10" \
--log-level info
#Start client
```
### Test with real systemd units
```
# Add unit files
sudo ln -s ${UDP_SPEEDER_REPO}/test/udp_speeder.socket /etc/systemd/system/
sudo ln -s ${UDP_SPEEDER_REPO}/test/udp_speeder.service /etc/systemd/system/
# Enable them to systemd
sudo systemctl daemon-reload
sudo systemctl enable udp_speeder.socket
sudo systemctl enable udp_speeder.socket
# Observe status and logs
sudo systemctl status udp_speeder.socket
journalctl -f -u udp_speeder.socket -u udp_speeder.service
# Start test server without speederv2 (will be started by udp_speeder.socket)
python3 ${UDP_SPEEDER_REPO}/test/udp_speeder_test.py \
--mode server \
--no-udpspeeder \
--log-level info
# Start client
```

133
test/udp_speeder_test.py Normal file
View File

@ -0,0 +1,133 @@
import socket
import subprocess
import sys
import threading
import signal
import time
import argparse
LOG_LEVEL_MAP = {
"never": 0,
"fatal": 1,
"error": 2,
"warn": 3,
"info": 4,
"debug": 5,
"trace": 6
}
class UDPspeederTest:
def __init__(self, speederv2_path, log_level="info", socket_activate=False, extra_udpspeeder_args="", no_udpspeeder=False):
self.speederv2_path = speederv2_path
self.log_level = log_level
self.socket_activate = socket_activate
self.extra_udpspeeder_args = extra_udpspeeder_args
self.no_udpspeeder = no_udpspeeder
def run_speederv2(self, mode, local_port, remote_ip, remote_port):
log_level_num = LOG_LEVEL_MAP.get(self.log_level, 4) # Default to "info" if log_level is not found
cmd = f"{self.speederv2_path} {mode} -l0.0.0.0:{local_port} -r{remote_ip}:{remote_port} -f20:10 --log-level {log_level_num} {self.extra_udpspeeder_args}"
if self.socket_activate:
cmd = f"systemd-socket-activate -l0.0.0.0:{local_port} -d {cmd}"
print(f"UPDspeeder command: {cmd}")
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process
def start_udpspeeder_server(self, local_port, remote_port):
return self.run_speederv2("-s", local_port, "127.0.0.1", remote_port)
def start_udpspeeder_client(self, local_port, remote_ip, remote_port):
return self.run_speederv2("-c", local_port, remote_ip, remote_port)
def server(self, udp_speeder_port, server_port):
# Start UDPspeeder server if not disabled
if not self.no_udpspeeder:
udpspeeder_process = self.start_udpspeeder_server(udp_speeder_port, server_port)
threading.Thread(target=self.log_udpspeeder_output, args=(udpspeeder_process,)).start()
# Start UDP server
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("0.0.0.0", server_port))
print(f"Server listening on port {server_port}")
def cleanup(signum, frame):
print("Shutting down server...")
sock.close()
if not self.no_udpspeeder:
udpspeeder_process.terminate()
sys.exit(0)
signal.signal(signal.SIGTERM, cleanup)
while True:
data, addr = sock.recvfrom(1024)
print(f"Received message: {data.decode()} from {addr}")
reply = f"Server reply to {data.decode()}"
sock.sendto(reply.encode(), addr)
print(f"Sent reply: {reply} ==============================================================")
def client(self, us_client_port, us_server_port):
# Start UDPspeeder client if not disabled
if not self.no_udpspeeder:
udpspeeder_process = self.start_udpspeeder_client(us_client_port, "127.0.0.1", us_server_port)
threading.Thread(target=self.log_udpspeeder_output, args=(udpspeeder_process,)).start()
# Start UDP client
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = ("127.0.0.1", us_client_port)
message = "Hello, Server!"
def cleanup(signum, frame):
print("Shutting down client...")
sock.close()
if not self.no_udpspeeder:
udpspeeder_process.terminate()
sys.exit(0)
signal.signal(signal.SIGTERM, cleanup)
sock.settimeout(1.0)
while True:
print(f"Sending message: {message}")
sent = sock.sendto(message.encode(), server_address)
try:
data, server = sock.recvfrom(1024)
except socket.timeout:
print("Request timed out")
continue
print(f"Received reply: {data.decode()} ==============================================================")
time.sleep(1)
sock.close()
def log_udpspeeder_output(self, process):
for line in process.stdout:
print(line.decode(), end='')
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="UDPspeeder test script")
parser.add_argument("--mode", choices=["server", "client"], help="Mode to run: 'server' or 'client'")
parser.add_argument("--us-server-port", type=int, default=4096, help="UDPspeeder server port")
parser.add_argument("--us-client-port", type=int, default=3333, help="UDPspeeder client port")
parser.add_argument("--server-port", type=int, default=7777, help="Server port behind UDPspeeder")
parser.add_argument("--log-level", choices=LOG_LEVEL_MAP.keys(), default="info", nargs="?", help="Log level: 'never', 'fatal', 'error', 'warn', 'info', 'debug', 'trace'")
parser.add_argument("--socket-activate", action="store_true", help="Test systemd socket activation")
parser.add_argument("--extra-args", help="Extra arguments to pass to UDPspeeder")
parser.add_argument("--no-udpspeeder", action="store_true", help="Do not start UDPspeeder")
parser.add_argument("--speederv2-path", default="speederv2", help="Path to the speederv2 binary")
args = parser.parse_args()
udpspeeder_test = UDPspeederTest(args.speederv2_path, args.log_level, args.socket_activate, args.extra_args, args.no_udpspeeder)
if args.mode == "server":
udpspeeder_test.server(args.us_server_port, args.server_port)
elif args.mode == "client":
udpspeeder_test.client(args.us_client_port, args.us_server_port)
else:
print("Invalid mode. Use 'server' or 'client'.")
sys.exit(1)

View File

@ -6,6 +6,10 @@
*/
#include "tunnel.h"
#ifdef SYSTEMD_SOCKET_ACTIVATION
#include <systemd/sd-daemon.h>
#endif
#include <arpa/inet.h>
static void conn_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents);
static void fec_encode_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents);
@ -15,6 +19,8 @@ enum tmp_mode_t { is_from_remote = 0,
is_fec_timeout,
is_conn_timer };
bool first_connection_established = false;
void data_from_remote_or_fec_timeout_or_conn_timer(conn_info_t &conn_info, fd64_t fd64, tmp_mode_t mode) {
int ret;
@ -193,6 +199,7 @@ static void local_listen_cb(struct ev_loop *loop, struct ev_io *watcher, int rev
conn_info.fec_encode_manager.set_loop_and_cb(loop, fec_encode_cb);
mylog(log_info, "new connection from %s\n", addr.get_str());
first_connection_established = true;
}
conn_info_t &conn_info = conn_manager.find_insert(addr);
@ -316,9 +323,61 @@ static void global_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int
// uint64_t value;
// read(timer.get_timer_fd(), &value, 8);
conn_manager.clear_inactive();
if (shutdown_if_all_disconnected && first_connection_established && !conn_manager.has_active_connections()) {
mylog(log_info, "No active connections, exiting...\n");
ev_break(loop, EVBREAK_ALL);
}
mylog(log_trace, "events[idx].data.u64==(u64_t)timer.get_timer_fd()\n");
}
void print_socket_info(int fd) {
struct sockaddr_storage addr;
socklen_t addr_len = sizeof(addr);
if (getsockname(fd, (struct sockaddr *)&addr, &addr_len) == -1) {
mylog(log_error, "getsockname failed: %s\n", strerror(errno));
return;
}
char ip_str[INET6_ADDRSTRLEN];
void *ip_addr;
int port;
const char *family_str;
if (addr.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&addr;
ip_addr = &(s->sin_addr);
port = ntohs(s->sin_port);
family_str = "AF_INET";
} else {
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
ip_addr = &(s->sin6_addr);
port = ntohs(s->sin6_port);
family_str = "AF_INET6";
}
inet_ntop(addr.ss_family, ip_addr, ip_str, sizeof(ip_str));
int sock_type;
socklen_t optlen = sizeof(sock_type);
if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &sock_type, &optlen) == -1) {
mylog(log_error, "getsockopt failed: %s\n", strerror(errno));
return;
}
const char *type_str;
if (sock_type == SOCK_DGRAM) {
type_str = "SOCK_DGRAM";
} else if (sock_type == SOCK_STREAM) {
type_str = "SOCK_STREAM";
} else {
type_str = "UNKNOWN";
}
mylog(log_info, "Socket info - Family: %s, Type: %s, IP: %s, Port: %d\n", family_str, type_str, ip_str, port);
}
int tunnel_server_event_loop() {
int i, j, k;
int ret;
@ -326,8 +385,32 @@ int tunnel_server_event_loop() {
// int epoll_fd;
// int remote_fd;
int local_listen_fd;
new_listen_socket2(local_listen_fd, local_addr);
int local_listen_fd = -1;
#ifdef SYSTEMD_SOCKET_ACTIVATION
int n = sd_listen_fds(0);
if (sd_listen_fds(0) > 0) {
if (n != 1) {
mylog(log_fatal, "expect exactly 1 socket passed from systemd, but got %d\n", n);
myexit(-1);
}
int fd = SD_LISTEN_FDS_START;
// print_socket_info(fd);
int ret = sd_is_socket_inet(fd, local_addr.get_type(), SOCK_DGRAM, -1, local_addr.get_port());
mylog(log_info, "sd_is_socket_inet returned: %d\n", ret);
if (ret <= 0) {
mylog(log_fatal, "socket is not UDP\n");
myexit(-1);
}
local_listen_fd = fd;
mylog(log_info, "Using socket passed from systemd\n");
}
#endif
if (local_listen_fd < 0) {
new_listen_socket2(local_listen_fd, local_addr);
}
// epoll_fd = epoll_create1(0);
// assert(epoll_fd>0);