vessel/lab/prod-ready.c
Arija A. 68aced9a7f
misc: Add "prod ready" socket server
Signed-off-by: Arija A. <ari@ari.lt>
2025-06-14 01:53:23 +03:00

292 lines
6.7 KiB
C

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#ifdef WITH_EPOLL
# include <sys/epoll.h>
#endif
#define MAX_CLIENTS 1024
#define THREAD_POOL_SIZE 128
#define BACKLOG 128
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_EVENTS 128
#define JOB_QUEUE_LIMIT 2048
typedef struct client_job_t {
int client_fd;
struct client_job_t *next;
} client_job_t;
typedef struct job_queue_t {
client_job_t *front;
client_job_t *rear;
int size;
pthread_mutex_t lock;
pthread_cond_t cond;
} job_queue_t;
static job_queue_t job_queue;
static volatile long total_connections = 0;
static volatile int active_connections = 0;
static void job_queue_init(job_queue_t *q) {
q->front = q->rear = NULL;
q->size = 0;
pthread_mutex_init(&q->lock, NULL);
pthread_cond_init(&q->cond, NULL);
}
static int job_queue_push(job_queue_t *q, int client_fd) {
pthread_mutex_lock(&q->lock);
if (q->size >= JOB_QUEUE_LIMIT) {
pthread_mutex_unlock(&q->lock);
return -1;
}
client_job_t *job = (client_job_t *)malloc(sizeof(client_job_t));
if (!job) {
pthread_mutex_unlock(&q->lock);
return -1;
}
job->client_fd = client_fd;
job->next = NULL;
if (q->rear == NULL) {
q->front = q->rear = job;
} else {
q->rear->next = job;
q->rear = job;
}
++q->size;
pthread_cond_signal(&q->cond);
pthread_mutex_unlock(&q->lock);
return 0;
}
static int job_queue_pop(job_queue_t *q) {
pthread_mutex_lock(&q->lock);
while (q->front == NULL) {
pthread_cond_wait(&q->cond, &q->lock);
}
client_job_t *job = q->front;
q->front = job->next;
if (q->front == NULL) {
q->rear = NULL;
}
--q->size;
pthread_mutex_unlock(&q->lock);
int fd = job->client_fd;
free(job);
return fd;
}
static void set_nonblocking(int fd) {
#if 0
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) {
return;
}
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
#else
(void)fd;
#endif
}
static void *worker_thread(void *arg) {
(void)arg;
char buffer[BUFFER_SIZE];
while (1) {
int client_fd = job_queue_pop(&job_queue);
if (client_fd < 0) {
continue;
}
++active_connections;
ssize_t len = recv(client_fd, buffer, sizeof(buffer) - 1, 0);
if (len > 0) {
buffer[len] = '\0';
char header[256];
ssize_t header_len = snprintf(
header,
sizeof(header),
"HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %zd\r\n\r\n",
len);
send(client_fd, header, (size_t)header_len, 0);
send(client_fd, buffer, (size_t)len, 0);
}
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
--active_connections;
}
return NULL;
}
static void *logger_thread(void *arg) {
(void)arg;
while (1) {
fprintf(stderr,
"\rTotal: %ld | Active: %d | Queue: %d ",
total_connections,
active_connections,
job_queue.size);
fflush(stderr);
sleep(1);
}
return NULL;
}
int main(void) {
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd < 0) {
perror("socket");
exit(EXIT_FAILURE);
}
int opt = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
int buf_size = 1 << 20;
setsockopt(server_fd, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size));
setsockopt(server_fd, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size));
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(PORT);
if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
perror("bind");
exit(EXIT_FAILURE);
}
if (listen(server_fd, BACKLOG) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
job_queue_init(&job_queue);
pthread_t workers[THREAD_POOL_SIZE];
for (int idx = 0; idx < THREAD_POOL_SIZE; ++idx) {
if (pthread_create(&workers[idx], NULL, worker_thread, NULL) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
}
printf("Server listening on 127.0.0.1:%d\n", PORT);
pthread_t logger;
pthread_create(&logger, NULL, logger_thread, NULL);
pthread_detach(logger);
#ifdef WITH_EPOLL
int epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
struct epoll_event ev, events[MAX_EVENTS];
ev.events = EPOLLIN;
ev.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) < 0) {
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
while (1) {
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds < 0) {
if (errno == EINTR) {
continue;
}
perror("epoll_wait");
break;
}
for (int idx = 0; idx < nfds; ++idx) {
if (events[idx].data.fd == server_fd) {
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd < 0) {
continue;
}
set_nonblocking(client_fd);
if (job_queue_push(&job_queue, client_fd) < 0) {
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
} else {
++total_connections;
}
}
}
}
#else
while (1) {
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd < 0) {
if (errno == EINTR || errno == EAGAIN) {
continue;
}
perror("accept");
continue;
}
set_nonblocking(client_fd);
if (job_queue_push(&job_queue, client_fd) < 0) {
shutdown(client_fd, SHUT_RDWR);
close(client_fd);
} else {
++total_connections;
}
}
#endif
shutdown(server_fd, SHUT_RDWR);
close(server_fd);
return 0;
}