deployd/server/server.c
Arija A. d1d0aec439
Add dproto documentation
Signed-off-by: Arija A. <ari@ari.lt>
2025-07-24 23:55:42 +03:00

624 lines
19 KiB
C

#define _POSIX_C_SOURCE 200112L
#include "include/conf.h"
#include <poll.h>
#include <errno.h>
#include <netdb.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdbool.h>
#include <sys/types.h>
#include <sqlite3.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include "include/pow.h"
#include "include/proto.h"
#include "include/server.h"
static uint8_t dp_server_info[] =
"\x00\x33" /* version, length */
"DeployD continuous deployment daemon: Hello, World!";
static volatile sig_atomic_t dp_signaled = 0;
static void dp_handle_shutdown(int signo) __attribute__((noreturn));
static void dp_handle_shutdown(int signo) {
printf("\nShutting server down on signal %d...\n", signo);
fflush(stdout);
_exit(0);
}
typedef struct dp_SocketNode {
SSL *ssl;
int client_fd;
sqlite3 *database;
char *ipaddr;
dp_Logger logger;
struct dp_SocketNode *next;
} dp_SocketNode;
typedef struct {
dp_SocketNode *head;
dp_SocketNode *tail;
size_t size;
pthread_mutex_t lock;
pthread_cond_t not_empty;
} dp_SocketQueue;
static void dp_SocketQueue_init(dp_SocketQueue *queue) {
queue->head = NULL;
queue->tail = NULL;
queue->size = 0;
pthread_mutex_init(&queue->lock, NULL);
pthread_cond_init(&queue->not_empty, NULL);
}
static void dp_SocketQueue_push(dp_SocketQueue *queue,
SSL *ssl,
int client_fd,
sqlite3 *database,
char *ipaddr,
dp_Logger *logger) {
dp_SocketNode *node = DP_MALLOC(sizeof(*node));
node->ssl = ssl;
node->client_fd = client_fd;
node->database = database;
node->next = NULL;
node->ipaddr = ipaddr;
dp_log_inherit(logger, &node->logger);
dp_log_set_target(&node->logger, node->ipaddr);
pthread_mutex_lock(&queue->lock);
if (queue->tail) {
queue->tail->next = node;
} else {
queue->head = node;
}
queue->tail = node;
++queue->size;
pthread_cond_signal(&queue->not_empty);
pthread_mutex_unlock(&queue->lock);
}
static dp_SocketNode *dp_SocketQueue_pop_node(dp_SocketQueue *queue) {
pthread_mutex_lock(&queue->lock);
while (queue->head == NULL) {
pthread_cond_wait(&queue->not_empty, &queue->lock);
}
dp_SocketNode *node = queue->head;
queue->head = node->next;
if (queue->head == NULL) {
queue->tail = NULL;
}
--queue->size;
pthread_mutex_unlock(&queue->lock);
return node;
}
static void dp_SocketQueue_destroy(dp_SocketQueue *queue) {
dp_SocketNode *cur = queue->head;
while (cur) {
dp_SocketNode *next = cur->next;
DP_FREE(cur->ipaddr);
DP_FREE(cur);
cur = next;
}
pthread_mutex_destroy(&queue->lock);
pthread_cond_destroy(&queue->not_empty);
}
static void *dp_worker_thread(void *arg) {
dp_SocketQueue *queue = (dp_SocketQueue *)arg;
while (true) {
dp_SocketNode *node = dp_SocketQueue_pop_node(queue);
SSL *ssl = node->ssl;
int client_fd = node->client_fd;
sqlite3 *database = node->database;
char *ipaddr = node->ipaddr;
dp_Logger logger = node->logger;
DP_FREE(node);
if (SSL_write(ssl, dp_server_info, sizeof(dp_server_info) - 1) !=
sizeof(dp_server_info) - 1) {
dp_logf(&logger, DP_LOG_ERROR, DP_WORKER_LOG,
"SSL connection %d did not write our server info. "
"Closing.",
client_fd);
DP_FREE(ipaddr);
SSL_shutdown(ssl);
SSL_free(ssl);
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
continue;
}
if (!dp_pow_protect(ssl, &logger)) {
dp_logf(&logger, DP_LOG_ERROR, DP_WORKER_LOG,
"SSL connection %d did not solve PoW. Closed.", client_fd);
DP_FREE(ipaddr);
SSL_shutdown(ssl);
SSL_free(ssl);
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
continue;
}
uint8_t packet[DP_PACKET_SIZE] = {0};
bool running = true;
size_t packets = 0;
while (running && packets < 64) {
int bytes = SSL_read(ssl, packet, sizeof(packet));
if (bytes < 1) {
break;
}
bool ret = false;
switch ((dp_PacketType)packet[0]) {
case dp_PacketType_command:
ret = dp_proto_command(ssl, packet + 1, bytes - 1, database,
&logger);
break;
case dp_PacketType_ping: ret = dp_proto_ping_reply(ssl); break;
case dp_PacketType_exit:
if (bytes == 1) {
ret = true;
running = false;
} else {
ret = false;
dp_proto_error(
ssl, dp_PacketError_proto_packet_too_long,
"EXIT packets must be exactly 1 byte long",
&logger);
}
break;
default:
ret = false;
running = dp_proto_skip(ssl, packet[0]);
if (running) {
dp_proto_error(ssl, dp_PacketError_type,
"Unknown/disallowed packet type",
&logger);
} else {
dp_proto_error(ssl, dp_PacketError_proto_packet_invalid,
"The supplied disallowed packet type is "
"invalid to skip",
&logger);
}
break;
}
if (!ret) {
dp_log(&logger, DP_LOG_ERROR, DP_WORKER_LOG,
"Command failed to execute");
}
++packets;
}
if (!dp_proto_exit(ssl)) {
dp_log(&logger, DP_LOG_ERROR, DP_WORKER_LOG,
"Failed to send an EXIT packet to client");
}
SSL_shutdown(ssl);
SSL_free(ssl);
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
dp_logf(&logger, DP_LOG_INFO, DP_WORKER_LOG,
"SSL connection %d closed.", client_fd);
DP_FREE(ipaddr);
}
return NULL;
}
static bool dp_parse_host_port(const char *hostport,
char *hostbuf,
size_t hostbuf_len,
char *portbuf,
size_t portbuf_len) {
size_t addrlen = 0;
const char *close = NULL;
const char *colon = NULL;
if (!hostport) {
return false;
}
if (strcmp(hostport, "*") == 0) {
return false;
}
if (strncmp(hostport, "*:", 2) == 0) {
strncpy(hostbuf, "*", hostbuf_len - 1);
hostbuf[hostbuf_len - 1] = '\0';
strncpy(portbuf, hostport + 2, portbuf_len - 1);
portbuf[portbuf_len - 1] = '\0';
return true;
}
if (strncmp(hostport, "[*]:", 4) == 0) {
strncpy(hostbuf, "[*]", hostbuf_len - 1);
hostbuf[hostbuf_len - 1] = '\0';
strncpy(portbuf, hostport + 4, portbuf_len - 1);
portbuf[portbuf_len - 1] = '\0';
return true;
}
if (hostport[0] == '[') {
close = strchr(hostport, ']');
if (!close) {
return false;
}
addrlen = (size_t)(close - hostport - 1);
if (addrlen >= hostbuf_len) {
return false;
}
strncpy(hostbuf, hostport + 1, addrlen);
hostbuf[addrlen] = '\0';
if (close[1] != ':') {
return false;
}
strncpy(portbuf, close + 2, portbuf_len - 1);
portbuf[portbuf_len - 1] = '\0';
} else {
colon = strrchr(hostport, ':');
if (!colon) {
return false;
}
addrlen = (size_t)(colon - hostport);
if (addrlen >= hostbuf_len) {
return false;
}
strncpy(hostbuf, hostport, addrlen);
hostbuf[addrlen] = '\0';
strncpy(portbuf, colon + 1, portbuf_len - 1);
portbuf[portbuf_len - 1] = '\0';
}
return true;
}
static SSL_CTX *dp_create_ssl_ctx(const char *cert_file,
const char *key_file,
dp_Logger *logger) {
SSL_CTX *ctx = NULL;
SSL_library_init();
OpenSSL_add_all_algorithms();
SSL_load_error_strings();
ctx = SSL_CTX_new(TLS_server_method());
if (!ctx) {
perror("SSL_CTX_new");
return NULL;
}
if (SSL_CTX_use_certificate_file(ctx, cert_file, SSL_FILETYPE_PEM) <= 0) {
ERR_print_errors_fp(stderr);
SSL_CTX_free(ctx);
return NULL;
}
if (SSL_CTX_use_PrivateKey_file(ctx, key_file, SSL_FILETYPE_PEM) <= 0) {
ERR_print_errors_fp(stderr);
SSL_CTX_free(ctx);
return NULL;
}
if (!SSL_CTX_check_private_key(ctx)) {
dp_logf(logger, DP_LOG_ERROR, DP_SSL_LOG,
"Private key does not match certificate public key");
SSL_CTX_free(ctx);
return NULL;
}
return ctx;
}
bool dp_deployd_server(const char *hosts[],
uint8_t hosts_size,
uint16_t threads,
const char *cert_file,
const char *key_file,
sqlite3 *database,
dp_Logger *logger) {
int listen_fds[DP_MAX_LISTENS];
struct pollfd pollfds[DP_MAX_LISTENS];
uint8_t listen_fd_count = 0;
uint16_t idx = 0;
dp_SocketQueue queue = {0};
pthread_t *tids = NULL;
if (threads < 1 || hosts_size < 1 || hosts_size > DP_MAX_LISTENS) {
return false;
}
if (!dp_signaled) {
if (signal(SIGINT, dp_handle_shutdown) == SIG_ERR ||
signal(SIGTERM, dp_handle_shutdown) == SIG_ERR ||
signal(SIGQUIT, dp_handle_shutdown) == SIG_ERR ||
signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
return false;
}
dp_signaled = 1;
}
for (idx = 0; idx < hosts_size; ++idx) {
char hostbuf[129] = {0};
char portbuf[16] = {0};
struct addrinfo hints = {0};
struct addrinfo *res = NULL;
struct addrinfo *adi = NULL;
if (!dp_parse_host_port(hosts[idx], hostbuf, sizeof(hostbuf), portbuf,
sizeof(portbuf))) {
dp_logf(logger, DP_LOG_FATAL, DP_SERVER_LOG, "Invalid host: %s",
hosts[idx]);
return false;
}
if (strcmp(hostbuf, "*") == 0) {
hostbuf[0] = '\0';
hints.ai_family = AF_INET;
} else if (strcmp(hostbuf, "[*]") == 0) {
hostbuf[0] = '\0';
hints.ai_family = AF_INET6;
}
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
int gai =
getaddrinfo(hostbuf[0] ? hostbuf : NULL, portbuf, &hints, &res);
if (gai != 0 || !res) {
return false;
}
int listen_fd = -1;
bool bound = false;
for (adi = res; adi != NULL; adi = adi->ai_next) {
listen_fd =
socket(adi->ai_family, adi->ai_socktype, adi->ai_protocol);
if (listen_fd < 0) {
continue;
}
int flags = fcntl(listen_fd, F_GETFD);
if (flags < 0) {
dp_err(logger, DP_LOG_ERROR, DP_SERVER_LOG, "fcntl(F_GETFD)");
close(listen_fd);
listen_fd = -1;
continue;
}
if (fcntl(listen_fd, F_SETFD, flags | FD_CLOEXEC) < 0) {
dp_err(logger, DP_LOG_ERROR, DP_SERVER_LOG, "fcntl(F_SETFD)");
close(listen_fd);
listen_fd = -1;
continue;
}
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&opt,
sizeof(opt));
int buf_size = 131072;
setsockopt(listen_fd, SOL_SOCKET, SO_RCVBUF, (void *)&buf_size,
sizeof(buf_size));
setsockopt(listen_fd, SOL_SOCKET, SO_SNDBUF, (void *)&buf_size,
sizeof(buf_size));
if (bind(listen_fd, adi->ai_addr, adi->ai_addrlen) == 0) {
bound = true;
break;
}
close(listen_fd);
listen_fd = -1;
}
freeaddrinfo(res);
if (!bound || listen_fd < 0) {
return false;
}
if (listen(listen_fd, threads * 2) < 0) {
close(listen_fd);
return false;
}
listen_fds[listen_fd_count] = listen_fd;
pollfds[listen_fd_count].fd = listen_fd;
pollfds[listen_fd_count].events = POLLIN;
++listen_fd_count;
dp_logf(logger, DP_LOG_INFO, DP_SERVER_LOG, "Listening on %s",
hosts[idx]);
}
dp_SocketQueue_init(&queue);
tids = DP_CALLOC(threads, sizeof(pthread_t));
if (!tids) {
dp_log(logger, DP_LOG_FATAL, DP_SERVER_LOG,
"Failed to allocate threads");
return false;
}
for (idx = 0; idx < threads; ++idx) {
pthread_create(&tids[idx], NULL, dp_worker_thread, &queue);
}
SSL_CTX *ctx = dp_create_ssl_ctx(cert_file, key_file, logger);
while (true) {
int n_ready = poll(pollfds, listen_fd_count, -1);
if (n_ready < 0) {
if (errno == EINTR) {
continue;
}
dp_err(logger, DP_LOG_ERROR, DP_SERVER_LOG, "Poll error");
break;
}
for (idx = 0; idx < listen_fd_count; ++idx) {
if (pollfds[idx].revents & POLLIN) {
struct sockaddr_storage client_addr;
socklen_t addrlen = sizeof(client_addr);
int client_fd = accept(
listen_fds[idx], (struct sockaddr *)&client_addr, &addrlen);
if (client_fd >= 0) {
struct timeval timeout;
timeout.tv_sec = 5;
timeout.tv_usec = 0;
if (setsockopt(client_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout,
sizeof(timeout)) < 0) {
dp_log(logger, DP_LOG_WARN, DP_SERVER_LOG,
"Failed to set receive timeout to 5 seconds");
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
continue;
}
if (setsockopt(client_fd, SOL_SOCKET, SO_SNDTIMEO, &timeout,
sizeof(timeout)) < 0) {
dp_log(logger, DP_LOG_WARN, DP_SERVER_LOG,
"Failed to set send timeout to 5 seconds");
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
continue;
}
char *ipstr = DP_MALLOC(129);
if (!ipstr) {
dp_log(logger, DP_LOG_ERROR, DP_SERVER_LOG,
"Failed to allocate IP string representation "
"buffer. Dropping connection.");
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
continue;
}
void *addr = NULL;
if (client_addr.ss_family == AF_INET) {
struct sockaddr_in *sockaddr =
(struct sockaddr_in *)&client_addr;
addr = &(sockaddr->sin_addr);
} else if (client_addr.ss_family == AF_INET6) {
struct sockaddr_in6 *sockaddr =
(struct sockaddr_in6 *)&client_addr;
addr = &(sockaddr->sin6_addr);
}
if (addr) {
inet_ntop(client_addr.ss_family, addr, ipstr, 128);
dp_logf(logger, DP_LOG_INFO, DP_SERVER_LOG,
"Queueing request from client %d (IP: "
"%s)",
client_fd, ipstr);
} else {
DP_FREE(ipstr);
dp_logf(logger, DP_LOG_ERROR, DP_SERVER_LOG,
"NOT queueing request from client %d (IP: "
"<unknown family>)",
client_fd);
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
continue;
}
SSL *ssl = SSL_new(ctx);
if (!ssl) {
dp_log(logger, DP_LOG_ERROR, DP_SSL_LOG,
"SSL_new failed");
DP_FREE(ipstr);
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
continue;
}
SSL_set_fd(ssl, client_fd);
if (SSL_accept(ssl) <= 0) {
dp_log(logger, DP_LOG_ERROR, DP_SSL_LOG,
"SSL errors encountered!");
ERR_print_errors_fp(stderr);
SSL_free(ssl);
DP_FREE(ipstr);
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
continue;
}
dp_SocketQueue_push(&queue, ssl, client_fd, database, ipstr,
logger);
}
}
}
}
for (idx = 0; idx < threads; ++idx) {
pthread_join(tids[idx], NULL);
}
DP_FREE(tids);
for (idx = 0; idx < listen_fd_count; ++idx) {
shutdown(listen_fds[idx], SHUT_RDWR);
close(listen_fds[idx]);
}
dp_SocketQueue_destroy(&queue);
return true;
}