vessel/core/include/stream.h
2025-06-21 16:26:31 +03:00

156 lines
6.2 KiB
C

#ifndef VESSEL_STREAM_H_
#define VESSEL_STREAM_H_
#include "conf.h"
#include "def.h"
#include "file.h"
#include "thread.h"
#include <sys/time.h>
#ifndef VS_STREAM_BUFSZ
# define VS_STREAM_BUFSZ ((size_t)(1024 * 256)) /* 256 KB */
#endif /* VS_STREAM_BUFSZ */
#define VS_SEC2TO(sec) (suseconds_t)(1000000ULL * (sec))
#define VS_IO_WITH_TEST(expr, now_written, ret, total_written) \
(now_written) = (expr); \
if (0 == vs_Stream_ignore_err((now_written))) { \
return (ret); \
} \
(total_written) += (now_written)
#define VS_SB_RD 0
#define VS_SB_WR 1
#define VS_STREAM_NONE ((uint8_t)(0x0)) /* 0000 0000 */
#define VS_STREAM_WR ((uint8_t)(0x80)) /* 1000 0000 */
#define VS_STREAM_RD ((uint8_t)(0x40)) /* 0100 0000 */
#define VS_STREAM_RDWR ((uint8_t)(VS_STREAM_RD | VS_STREAM_WR)) /* 1100 0000 */
/* clang-format off */
/*
* buf + 0 buf + STREAM_BUFSZ
* |......................*********************** |
* ^ ^ ^
* buf bufpos bufend
*
* In streams, we use a triple pointer approach to handling data. We have 3
* pointers because we need to know 3 states:
*
* - buf: The main buffer that holds the data being read from or written to the stream.
* It is allocated based on the mode of the stream (read, write, or both).
*
* - bufpos: Indicates the current position in the buffer where the next read or write
* operation will occur. It is updated as data is consumed (read)
* or produced (written).
*
* - bufend: This pointer marks the end of the valid data in the buffer. For
* read operations, it helps determine how much data is available to read, while
* for write operations, it indicates how much space is left for writing new
* data.
*
* In between buf and bufpos we have already processed data, between bufpos and
* bufend we have data yet to be processed, and bufend and above until buf+STREAM_BUFSZ
* we have uninitialised, unprocessed, and empty data.
*
* The combination of these three pointers allows for efficient buffering and
* management of data flow within the stream.
*/
/* clang-format on */
typedef struct {
vs_File file; /* The file associated with this stream. */
uint8_t *buf[2]; /* Buffers for reading and writing. */
uint8_t *bufpos[2]; /* Current positions in each buffer. */
uint8_t *bufend[2]; /* End positions for each buffer. */
bool used; /* Indicates whether the stream is currently in use. */
suseconds_t timeout_usec; /* Timeout value for operations (if applicable). */
vs_Lock lock; /* Lock to ensure thread safety during operations. */
uint8_t mode; /* Mode of operation (read, write, or both). */
} vs_Stream;
#define VS_STREAM_ERROR ((size_t)(-1))
size_t vs_Stream_ignore_err(size_t ret);
bool vs_Stream_init(vs_Stream *stream, uint8_t mode);
bool vs_Stream_use(vs_Stream *stream, vs_File *file);
int vs_Stream_poll(vs_Stream *stream);
bool vs_Stream_read1(vs_Stream *stream, void *buf);
size_t vs_Stream_read(vs_Stream *stream, void *buf, size_t count);
size_t vs_Stream_readb(vs_Stream *stream,
void *buf,
size_t buf_size,
const void *boundary,
size_t boundary_size,
bool *found,
bool is_string);
size_t vs_Stream_readbf(vs_Stream *stream,
void *buf,
size_t buf_size,
bool *found,
bool is_string,
const char *boundary_fmt,
...) __attribute__((format(printf, 6, 7)));
size_t vs_Stream_write(vs_Stream *stream, const void *buf, size_t count);
size_t vs_Stream_writef(vs_Stream *stream, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
bool vs_Stream_flush(vs_Stream *stream);
bool vs_Stream_stop(vs_Stream *stream);
bool vs_Stream_drop(vs_Stream *stream);
bool vs_Stream_destroy(vs_Stream *stream);
#define VS_BYTE_STREAM_INIT_CAP ((size_t)512)
#define VS_BYTE_STREAM_CAP_MULTIPLIER 1.33
typedef struct {
uint8_t *_buf;
size_t _cap;
uint8_t *_read_pos;
uint8_t *_write_pos;
vs_Stream *_stream; /* for redirects */
vs_Lock _lock;
} vs_ByteStream;
#define VS_BYTE_STREAM_ERROR ((size_t)(-1))
size_t vs_ByteStream_ignore_err(size_t ret);
bool vs_ByteStream_init(vs_ByteStream *stream);
bool vs_ByteStream_init_redirect(vs_ByteStream *from_bstream, vs_Stream *to_stream);
bool vs_ByteStream_read1(vs_ByteStream *stream, void *buf);
size_t vs_ByteStream_read(vs_ByteStream *stream, void *buf, size_t count);
size_t vs_ByteStream_readb(vs_ByteStream *stream,
void *buf,
size_t buf_size,
const void *boundary,
size_t boundary_size,
bool *found,
bool is_string);
size_t vs_ByteStream_readbf(vs_ByteStream *stream,
void *buf,
size_t buf_size,
bool *found,
bool is_string,
const char *boundary_fmt,
...) __attribute__((format(printf, 6, 7)));
size_t vs_ByteStream_write(vs_ByteStream *stream, const void *buf, size_t count);
size_t vs_ByteStream_writef(vs_ByteStream *stream, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
const void *vs_ByteStream_buf(const vs_ByteStream *stream);
size_t vs_ByteStream_len(const vs_ByteStream *stream);
bool vs_ByteStream_destroy(vs_ByteStream *stream);
#endif /* VESSEL_STREAM_H_ */