vessel/core/server.c
Arija A. 4f0bf80e5f
refact: Remove mem.h
Signed-off-by: Arija A. <ari@ari.lt>
2025-06-21 23:43:31 +03:00

681 lines
23 KiB
C

#include "include/conf.h"
#include <time.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <sys/time.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <netinet/tcp.h>
#include "include/log.h"
#include "include/hook.h"
#include "include/socket.h"
#include "include/server.h"
#include "include/vessel.h"
typedef struct {
const vs_SockWorkerHook *hooks;
vs_SockWorker *worker;
} vs_SocketState;
static const int vs_sock_true = 1;
static const struct timespec vs_socket_timeout = { 30, 0 }; /* 30 s */
static uint16_t vs_current_ss = 0;
static vs_SockServer *vs_socks[VS_MAX_SOCK_SERVERS] = { 0 };
static void vs_socks_handle(int sig) __attribute__((noreturn));
static void vs_socks_handle(int sig) {
int exit_code = 0;
uint16_t idx = 0;
(void)putchar('\n');
/* TODO: Fix the segfault which soemtimes on exit crashes the program? For
* example, after a stress test. */
for (idx = 0; idx < VS_MAX_SOCK_SERVERS; ++idx) {
if (vs_socks[idx]) {
vs_flog_info(vs_socks[idx]->logger,
"Destroying socket server %ju (index %u) on signal %d...",
vs_File_identity(&vs_socks[idx]->file),
idx,
sig);
if (!vs_SockServer_destroy(vs_socks[idx])) {
exit_code = 1;
vs_flog_error(vs_socks[idx]->logger,
"Failed to destroy socket server %ju: %s",
vs_File_identity(&vs_socks[idx]->file),
strerror(errno));
}
}
}
/* Unsafe call due to atexit() */
exit(exit_code);
}
static void *vs_handle_connection(void *arg) {
vs_SocketState *state = (vs_SocketState *)arg;
if (state->worker->dirty) {
vs_flog_error(state->worker->logger,
"Tried to handle connection %ju with a dirty worker. "
"Dropping connection.",
vs_File_identity(&state->worker->stream.file));
vs_Stream_drop(&state->worker->stream);
VS_FREE(state);
return NULL;
}
if (!vs_Lock_lock(&state->worker->lock)) {
vs_flog_error(state->worker->logger,
"Failed to lock connection %ju.",
vs_File_identity(&state->worker->stream.file));
return NULL;
}
/*
* Poll data.
*
* Most hooks will ignore polling of data and start reading immediately,
* therefore, we wait for the socket to send data using select() before
* passing it to any hooks.
*
* Of course, as all things, this is also hookable.
*/
struct timeval timeout = { vs_socket_timeout.tv_sec, vs_socket_timeout.tv_nsec };
switch (vs_File_poll(&state->worker->stream.file, &timeout)) {
case -1:
vs_flog_error(state->worker->logger,
"Failed to poll connection %ju: %s.",
vs_File_identity(&state->worker->stream.file),
strerror(errno));
break;
case 0:
vs_flog_warn(state->worker->logger,
"Connection %ju timed out.",
vs_File_identity(&state->worker->stream.file));
if (!vs_SockWorkerHook_runall(
state->hooks, vs_SockWorkerHookType_timeout, state->worker, false, NULL)) {
vs_flog_error(state->worker->logger,
"Failed to run timeout hooks for connection %ju.",
vs_File_identity(&state->worker->stream.file));
}
break;
default:
if (!vs_SockWorkerHook_runall(
state->hooks, vs_SockWorkerHookType_post_open, state->worker, false, NULL)) {
vs_flog_warn(state->worker->logger,
"Failed to run post-open hooks for connection %ju.",
vs_File_identity(&state->worker->stream.file));
}
break;
}
/* cleanup */
if (!vs_SockWorkerHook_runall(
state->hooks, vs_SockWorkerHookType_pre_close, state->worker, false, NULL)) {
vs_flog_warn(state->worker->logger,
"Failed to run pre-close hooks for connection %ju.",
vs_File_identity(&state->worker->stream.file));
}
if (!vs_Stream_drop(&state->worker->stream)) {
vs_flog_error(state->worker->logger,
"Failed to drop connection %ju: %s",
vs_File_identity(&state->worker->stream.file),
strerror(errno));
}
if (!vs_SockWorkerHook_runall(
state->hooks, vs_SockWorkerHookType_post_close, state->worker, false, NULL)) {
vs_flog_warn(state->worker->logger,
"Failed to run post-close hooks for connection %ju.",
vs_File_identity(&state->worker->stream.file));
}
state->worker->occupied = false;
vs_Lock_unlock(&state->worker->lock);
VS_FREE(state);
return NULL;
}
VS_DEFINE_HOOKS(vs_SockWorker, vs_SockWorkerHookType, vs_SockWorker *);
VS_DEFINE_HOOKS(vs_SockServer, vs_SockServerHookType, vs_SockServer *);
bool vs_SockServer_create_generic(vs_SockServer *server) {
if (!server || !server->host) {
return false;
}
if (!vs_File_init(&server->file, NULL)) {
return false;
}
if (!vs_Socket_new(&server->file, vs_SocketDomain_IPv4, vs_SocketType_TCP)) {
vs_File_destroy(&server->file);
return false;
}
if (!vs_Socket_setopt(
&server->file, vs_SocketOption_sock_reuseaddr, &vs_sock_true, sizeof(vs_sock_true))) {
goto cleanup;
}
if (!vs_Socket_setopt(
&server->file, vs_SocketOption_tcp_nodelay, &vs_sock_true, sizeof(vs_sock_true))) {
goto cleanup;
}
memset(&server->addr, 0, sizeof(server->addr));
server->addr.sin_family = AF_INET;
server->addr.sin_port = htons(server->port);
if (inet_pton(server->addr.sin_family, server->host, &server->addr.sin_addr) <= 0) {
goto cleanup;
}
if (!vs_Socket_bind(&server->file, (struct sockaddr *)&server->addr, sizeof(server->addr))) {
goto cleanup;
}
return true;
cleanup:
vs_File_destroy(&server->file);
return false;
}
bool vs_SockServer_init(vs_SockServer *server, suseconds_t timeout) {
uint16_t idx = 0;
uint16_t jdx = 0;
if (!server || server->threads == 0) {
return false;
}
server->workers = VS_CALLOC(server->threads, sizeof(*server->workers));
if (!server->workers) {
return false;
}
if (!vs_Lock_init(&server->lock)) {
VS_FREE(server->workers);
return false;
}
if (!vs_Lock_lock(&server->lock)) {
VS_FREE(server->workers);
vs_Lock_destroy(&server->lock);
return false;
}
for (idx = 0; idx < server->threads; ++idx) {
server->workers[idx].occupied = false;
server->workers[idx].dirty = false;
server->workers[idx].logger = server->logger;
if (!vs_Lock_init(&server->workers[idx].lock)) {
for (jdx = 0; jdx < idx; ++jdx) {
vs_Lock_destroy(&server->workers[jdx].lock);
vs_Stream_destroy(&server->workers[jdx].stream);
}
VS_FREE(server->workers);
vs_Lock_unlock(&server->lock);
return false;
}
if (!vs_Stream_init(&server->workers[idx].stream, VS_STREAM_RDWR)) {
for (jdx = 0; jdx <= idx; ++jdx) {
vs_Lock_destroy(&server->workers[jdx].lock);
if (jdx < idx) {
vs_Stream_destroy(&server->workers[jdx].stream);
}
}
VS_FREE(server->workers);
vs_Lock_unlock(&server->lock);
return false;
}
server->workers[idx].stream.timeout_usec = timeout;
}
server->run = true;
return vs_Lock_unlock(&server->lock);
}
bool vs_SockServer_setup_signals(vs_SockServer *server) {
uint16_t idx = 0;
if (!server) {
return false;
}
if (vs_current_ss >= VS_MAX_SOCK_SERVERS) {
return false;
}
for (idx = 0; idx < vs_current_ss; ++idx) {
if (vs_socks[idx] == server) {
return true;
}
}
if (!vs_Lock_lock(&server->lock)) {
return false;
}
vs_socks[vs_current_ss] = server;
if (vs_current_ss++ != 0) {
return true;
}
vs_log_info(server->logger, "Registering the global signal handler...");
if (!vs_Lock_unlock(&server->lock)) {
return false;
}
if (signal(SIGINT, vs_socks_handle) == SIG_ERR || signal(SIGTERM, vs_socks_handle) == SIG_ERR ||
signal(SIGQUIT, vs_socks_handle) == SIG_ERR || signal(SIGHUP, vs_socks_handle) == SIG_ERR) {
return false;
}
/* Handled by MSG_NOSIGNAL */
#if 0
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
return false;
}
#endif
return true;
}
bool vs_SockServer_start(vs_SockServer *server,
const vs_SockWorkerHook *worker_hooks,
const vs_SockServerHook *server_hooks) {
uint16_t idx = 0;
uint16_t jdx = 0;
size_t count = 0;
vs_File client = { 0 };
struct sockaddr_in client_addr;
vs_SockLen client_len = sizeof(client_addr);
if (!server) {
return false;
}
if (!vs_File_init(&client, NULL)) {
return false;
}
if (!vs_Lock_lock(&server->lock)) {
vs_File_destroy(&client);
return false;
}
if (!vs_Socket_listen(&server->file, server->threads * 3)) {
vs_Lock_unlock(&server->lock);
vs_File_destroy(&client);
return false;
}
/* clang-format off */
vs_log_info(server->logger, "");
vs_flog_info(server->logger, " ''''''''' ''''''''' " VS_CLR_RESET);
vs_flog_info(server->logger, " .. ' '%s ,,, " VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s.X.%s . l %s ,,, " VS_CLR_RESET, VS_CLR_BOLD, VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s.X.%s . e %s ,,, " VS_CLR_RESET, VS_CLR_BOLD, VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s.X.%s . s %s ,,, " VS_CLR_RESET, VS_CLR_BOLD, VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s.X.%s . s %s ,,, " VS_CLR_RESET, VS_CLR_BOLD, VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s.X.%s . s %s ,,, " VS_CLR_RESET, VS_CLR_BOLD, VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s.X.%s . e %s ,,, " VS_CLR_RESET, VS_CLR_BOLD, VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s.X.%s . v %s ,,, " VS_CLR_RESET, VS_CLR_BOLD, VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s.X.%s .' %s ,,, " VS_CLR_RESET, VS_CLR_BOLD, VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s.X.%s . %s,,, " VS_CLR_RESET, VS_CLR_BOLD, VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s.X.%s %s,,, " VS_CLR_RESET, VS_CLR_BOLD, VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s,,, " VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s,,, " VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s,,, " VS_CLR_RESET, VS_CLR_RED);
vs_flog_info(server->logger, " %s " VS_CLR_RESET, VS_CLR_RED);
vs_log_info(server->logger, "");
vs_log_info(server->logger, " Vessel version " VS_CLR_BOLD VS_VESSEL_HEADER_VERSION VS_CLR_RESET " started.");
vs_flog_info(server->logger, " Application %ju address: " VS_CLR_BOLD "%s:%u" VS_CLR_RESET, vs_File_identity(&server->file), server->host, server->port);
vs_flog_info(server->logger, " With " VS_CLR_BOLD VS_CLR_RED "<3" VS_CLR_RESET " by " VS_CLR_BOLD "%s" VS_CLR_RESET, VS_VESSEL_HEADER_SOURCE);
vs_log_info(server->logger, "");
/* clang-format on */
if (!vs_Lock_unlock(&server->lock)) {
vs_File_destroy(&client);
return false;
}
while (server->run) {
bool assigned = false;
if (!vs_File_clear(&client)) {
break;
}
/* Accept a connection */
for (idx = 0; server->run && idx < VS_MAX_SOCK_RETRIES; ++idx) {
if (vs_Socket_accept(
&server->file, &client, (struct sockaddr *)&client_addr, &client_len)) {
break;
}
/* else */
vs_flog_warn(server->logger,
"Failed to accept a socket connection: %s. Retrying "
"%u/%u...",
strerror(errno),
idx + 1,
VS_MAX_SOCK_RETRIES);
vs_secsleep(0.03);
}
if (!server->run) {
vs_flog_warn(server->logger,
"Abrupt shutdown request detected pre-worker for "
"connection %ju. Exiting.",
vs_File_identity(&client));
vs_File_destroy(&client);
return true;
}
if (!vs_File_isopen(&client)) {
vs_flog_error(server->logger,
"Failed to accept a connection after %u retries: %s. "
"Ignoring connection.",
VS_MAX_SOCK_RETRIES,
strerror(errno));
continue;
}
/* Set timeouts */
if (!vs_Socket_setopt(&client,
vs_SocketOption_sock_readto,
&vs_socket_timeout,
sizeof(vs_socket_timeout))) {
vs_flog_error(server->logger,
"Failed to set socket readtimeout. Dropping "
"connection %ju: %s.",
vs_File_identity(&client),
strerror(errno));
vs_File_close(&client);
continue;
}
if (!vs_Socket_setopt(&client,
vs_SocketOption_sock_writeto,
&vs_socket_timeout,
sizeof(vs_socket_timeout))) {
vs_flog_error(server->logger,
"Failed to set socket write timeout. Dropping "
"connection %ju: %s.",
vs_File_identity(&client),
strerror(errno));
vs_File_close(&client);
continue;
}
/* Run hooks */
void *prev_arg = server->arg;
server->arg = &client;
if (!vs_SockServerHook_runall(
server_hooks, vs_SockServerHookType_pre_worker, server, true, &count)) {
vs_flog_info(server->logger,
"Pre-worker hooks indicated stop for connection %ju. "
"Continuing assuming the worker handled the connection.",
vs_File_identity(&client));
server->arg = prev_arg;
/* Sock_drop(client_fd); */
continue;
}
server->arg = prev_arg;
/* Assign the connection to a worker until an available one is found */
retry:
if (!server->run) {
vs_flog_warn(server->logger,
"Abrupt shutdown request detected pre-(re)try for "
"connection %ju. Exiting.",
vs_File_identity(&client));
vs_File_close(&client);
return true;
}
for (idx = 0; !assigned && idx < VS_MAX_SOCK_RETRIES; ++idx) {
/* Look for a thread */
for (jdx = 0; jdx < server->threads; ++jdx) {
if (server->workers[jdx].occupied || server->workers[jdx].dirty) {
continue;
}
if (!vs_Lock_lock(&server->workers[jdx].lock)) {
continue;
}
if (server->workers[jdx].occupied || server->workers[jdx].dirty) {
vs_Lock_unlock(&server->workers[jdx].lock);
continue;
}
/* Found an available worker */
server->workers[jdx].client_addr = client_addr;
if (!vs_Stream_use(&server->workers[jdx].stream, &client)) {
vs_Lock_unlock(&server->workers[jdx].lock);
continue;
}
vs_SocketState *state = VS_MALLOC(sizeof(*state));
if (!state) {
vs_Lock_unlock(&server->workers[jdx].lock);
continue;
}
state->hooks = worker_hooks;
state->worker = &server->workers[jdx];
state->worker->logger = server->logger;
if (!vs_SockWorkerHook_runall(
state->hooks, vs_SockWorkerHookType_pre_open, state->worker, false, NULL)) {
vs_flog_warn(server->logger,
"Failed to run pre-open hooks for connection %ju.",
vs_File_identity(&state->worker->stream.file));
}
if (vs_Thread_init(
&server->workers[jdx].thread, vs_handle_connection, (void *)state)) {
/* Assigned the worker */
assigned = vs_Thread_detach(&server->workers[jdx].thread);
} else {
/* Failed asining a worker */
vs_flog_error(server->logger,
"Failed to assign connection %ju: %s.",
vs_File_identity(&client),
strerror(errno));
assigned = false; /* For style and clarity */
VS_FREE(state);
}
server->workers[jdx].occupied = assigned;
if (!vs_Lock_unlock(&server->workers[jdx].lock)) {
uint16_t kdx = 0;
bool dirty = true;
vs_flog_error(server->logger,
"Failed to unlock worker's %u mutex. Trying to "
"unlock it...",
jdx);
if (assigned) {
VS_FREE(state);
}
for (kdx = 0; kdx < VS_MAX_SOCK_UNLOCK_RETRIES; ++kdx) {
if (vs_Lock_unlock(&server->workers[jdx].lock)) {
vs_flog_info(server->logger,
"Worker's %u mutex back at a normal "
"state after %u tries.",
jdx,
kdx + 1);
dirty = false;
break;
}
/* else */
vs_flog_error(server->logger,
"Worker's %u mutex was not unlocked. "
"Trying again. (try %u)",
jdx,
kdx + 1);
dirty = true;
sleep(1);
}
if (dirty) {
vs_flog_error(server->logger,
"Worker %u is no longer normal as its mutex was "
"not successfully unlocked. Marking it as dirty.",
jdx);
server->workers[jdx].dirty = true;
}
continue;
}
if (assigned) {
break;
}
}
/* Check status: either be done or retry. */
if (!assigned) {
vs_flog_warn(server->logger,
"Couldn't assign a worker to %ju. Retrying %u/%u...",
vs_File_identity(&client),
idx + 1,
VS_MAX_SOCK_RETRIES);
vs_secsleep(0.03);
}
}
if (!assigned) {
vs_secsleep(0.03);
if (vs_SockServerHook_runall(
server_hooks, vs_SockServerHookType_no_worker, server, true, &count) &&
count != 0) {
vs_flog_info(server->logger,
"Could not assign a worker to connection %d. %zu 'No "
"Worker' hooks handled it - retrying.",
VS_MAX_SOCK_RETRIES,
count);
goto retry;
}
vs_flog_error(server->logger,
"Could not assign a worker to connection %d. Dropping "
"the connection.",
VS_MAX_SOCK_RETRIES);
vs_File_close(&client);
}
}
vs_File_destroy(&client);
return !server->run;
}
bool vs_SockServer_destroy(vs_SockServer *server) {
uint16_t idx = 0;
bool result = true;
if (!server) {
return false;
}
server->run = false; /* Stops the server from running */
if (!vs_Lock_lock(&server->lock)) {
result = false;
}
/* TODO: Connections hog the destruction process. Might need a better way to
* shut it down? */
for (idx = 0; idx < server->threads; ++idx) {
if (!vs_Lock_lock(&server->workers[idx].lock)) {
result = false;
}
if (server->workers[idx].occupied) {
/* Graceful exit. Wait for the thread to be done. */
if (!vs_Thread_attach(&server->workers[idx].thread, NULL)) {
result = false;
}
vs_Stream_drop(&server->workers[idx].stream);
}
vs_Stream_destroy(&server->workers[idx].stream);
if (!vs_Lock_unlock(&server->workers[idx].lock)) {
result = false;
}
if (!vs_Lock_destroy(&server->workers[idx].lock)) {
result = false;
}
}
VS_FREE(server->workers);
vs_File_destroy(&server->file);
if (!vs_Lock_unlock(&server->lock)) {
result = false;
}
if (!vs_Lock_destroy(&server->lock)) {
result = false;
}
return result;
}