977 lines
25 KiB
C
977 lines
25 KiB
C
#include "include/conf.h"
|
|
|
|
#include <stdio.h>
|
|
#include <unistd.h>
|
|
#include <string.h>
|
|
#include <stdarg.h>
|
|
|
|
#include "include/def.h"
|
|
#include "include/file.h"
|
|
#include "include/server.h"
|
|
#include "include/stream.h"
|
|
#include "include/thread.h"
|
|
|
|
#include <sys/select.h>
|
|
|
|
size_t vs_Stream_ignore_err(size_t ret) { return ret == VS_STREAM_ERROR ? 0 : ret; }
|
|
|
|
bool vs_Stream_init(vs_Stream *stream, uint8_t mode) {
|
|
if (!stream || mode == VS_STREAM_NONE || !vs_Lock_init(&stream->lock)) {
|
|
return false;
|
|
}
|
|
|
|
if (!vs_Lock_lock(&stream->lock)) {
|
|
vs_Lock_destroy(&stream->lock);
|
|
return false;
|
|
}
|
|
|
|
stream->mode = mode;
|
|
stream->used = false;
|
|
|
|
if (!vs_File_init(&stream->file, NULL)) {
|
|
goto error;
|
|
}
|
|
|
|
void *buf = NULL;
|
|
|
|
if ((mode & VS_STREAM_RDWR) == VS_STREAM_RDWR) {
|
|
/* Allocate memory for both buffers in one call for efficiency sake. */
|
|
|
|
buf = VS_MALLOC(VS_STREAM_BUFSZ * 2);
|
|
|
|
if (!buf) {
|
|
vs_File_destroy(&stream->file);
|
|
goto error;
|
|
}
|
|
|
|
stream->buf[VS_SB_RD] = buf;
|
|
|
|
stream->buf[VS_SB_WR] = stream->buf[VS_SB_RD] + VS_STREAM_BUFSZ;
|
|
|
|
stream->bufpos[VS_SB_WR] = stream->buf[VS_SB_WR];
|
|
stream->bufend[VS_SB_WR] = stream->buf[VS_SB_WR] + VS_STREAM_BUFSZ;
|
|
|
|
stream->bufpos[VS_SB_RD] = stream->bufend[VS_SB_RD] = stream->buf[VS_SB_RD];
|
|
} else if (mode & VS_STREAM_RD) {
|
|
buf = VS_MALLOC(VS_STREAM_BUFSZ);
|
|
|
|
if (!buf) {
|
|
vs_File_destroy(&stream->file);
|
|
goto error;
|
|
}
|
|
|
|
stream->buf[VS_SB_RD] = buf;
|
|
stream->bufpos[VS_SB_RD] = stream->bufend[VS_SB_RD] = stream->buf[VS_SB_RD];
|
|
} else if (mode & VS_STREAM_WR) {
|
|
buf = VS_MALLOC(VS_STREAM_BUFSZ);
|
|
|
|
if (!buf) {
|
|
vs_File_destroy(&stream->file);
|
|
goto error;
|
|
}
|
|
|
|
stream->buf[VS_SB_WR] = buf;
|
|
stream->bufpos[VS_SB_WR] = stream->buf[VS_SB_WR];
|
|
stream->bufend[VS_SB_WR] = stream->buf[VS_SB_WR] + VS_STREAM_BUFSZ;
|
|
} else {
|
|
/* No valid mode selected */
|
|
vs_File_destroy(&stream->file);
|
|
goto error;
|
|
}
|
|
|
|
if (!vs_Lock_unlock(&stream->lock)) {
|
|
VS_FREE(buf);
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
|
|
error:
|
|
vs_Lock_unlock(&stream->lock);
|
|
vs_Lock_destroy(&stream->lock);
|
|
return false;
|
|
}
|
|
|
|
bool vs_Stream_use(vs_Stream *stream, vs_File *file) {
|
|
if (!stream || !file || stream->used || vs_File_isopen(&stream->file) ||
|
|
!vs_File_isopen(file) || !vs_Lock_lock(&stream->lock)) {
|
|
return false;
|
|
}
|
|
|
|
if (!vs_File_init(&stream->file, file)) {
|
|
vs_Lock_unlock(&stream->lock);
|
|
return false;
|
|
}
|
|
|
|
stream->used = true;
|
|
|
|
stream->bufpos[VS_SB_RD] = stream->bufend[VS_SB_RD] = stream->buf[VS_SB_RD];
|
|
|
|
stream->bufpos[VS_SB_WR] = stream->buf[VS_SB_WR];
|
|
stream->bufend[VS_SB_WR] = stream->bufpos[VS_SB_WR] + VS_STREAM_BUFSZ;
|
|
|
|
return vs_Lock_unlock(&stream->lock);
|
|
}
|
|
|
|
int vs_Stream_poll(vs_Stream *stream) {
|
|
if (!stream || stream->timeout_usec == 0) {
|
|
return -1;
|
|
}
|
|
|
|
const struct timeval timeout = { 0, stream->timeout_usec };
|
|
return vs_File_poll(&stream->file, &timeout);
|
|
}
|
|
|
|
/*
|
|
* Refills the read buffer if needed.
|
|
*
|
|
* true: Refilled/fits.
|
|
* false: Not refilled.
|
|
*/
|
|
static bool vs_refill_read(vs_Stream *stream) {
|
|
if (stream->bufend[VS_SB_RD] > stream->bufpos[VS_SB_RD]) {
|
|
return true;
|
|
}
|
|
|
|
if (stream->timeout_usec > 0) {
|
|
/* Prepare for select to handle streamed input (such as TCP
|
|
* connections) */
|
|
|
|
const struct timeval timeout = { 0, stream->timeout_usec };
|
|
|
|
if (vs_File_poll(&stream->file, &timeout) <= 0) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/* Refill the read buffer */
|
|
|
|
const size_t now_read = vs_File_read(&stream->file, stream->buf[VS_SB_RD], VS_STREAM_BUFSZ);
|
|
|
|
if (now_read == 0) {
|
|
return false;
|
|
}
|
|
|
|
stream->bufpos[VS_SB_RD] = stream->buf[VS_SB_RD];
|
|
stream->bufend[VS_SB_RD] = stream->buf[VS_SB_RD] + now_read;
|
|
|
|
return true;
|
|
}
|
|
|
|
bool vs_Stream_read1(vs_Stream *stream, void *buf) {
|
|
if (!stream || !stream->used || !(stream->mode & VS_STREAM_RD) ||
|
|
!vs_Lock_lock(&stream->lock)) {
|
|
return false;
|
|
}
|
|
|
|
if (!vs_refill_read(stream)) {
|
|
vs_Lock_unlock(&stream->lock);
|
|
return false;
|
|
}
|
|
|
|
if (buf) {
|
|
*((uint8_t *)buf) = *stream->bufpos[VS_SB_RD];
|
|
}
|
|
++stream->bufpos[VS_SB_RD];
|
|
|
|
return vs_Lock_unlock(&stream->lock);
|
|
}
|
|
|
|
size_t vs_Stream_read(vs_Stream *stream, void *buf, size_t count) {
|
|
size_t total_read = 0;
|
|
|
|
if (!stream || !stream->used) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
if (!(stream->mode & VS_STREAM_RD) || count == 0) {
|
|
return 0;
|
|
}
|
|
if (!vs_Lock_lock(&stream->lock)) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
while (total_read < count) {
|
|
if (!vs_refill_read(stream)) {
|
|
return vs_Lock_unlock(&stream->lock) ? total_read : VS_STREAM_ERROR;
|
|
}
|
|
|
|
const size_t bytes_available =
|
|
(size_t)(stream->bufend[VS_SB_RD] - stream->bufpos[VS_SB_RD]);
|
|
const size_t bytes_to_copy = VS_MIN(bytes_available, count - total_read);
|
|
|
|
if (buf) {
|
|
memcpy((uint8_t *)buf + total_read, stream->bufpos[VS_SB_RD], bytes_to_copy);
|
|
}
|
|
|
|
total_read += bytes_to_copy;
|
|
stream->bufpos[VS_SB_RD] += bytes_to_copy;
|
|
}
|
|
|
|
return vs_Lock_unlock(&stream->lock) ? total_read : VS_STREAM_ERROR;
|
|
}
|
|
|
|
size_t vs_Stream_readb(vs_Stream *stream,
|
|
void *buf,
|
|
size_t buf_size,
|
|
const void *boundary,
|
|
size_t boundary_size,
|
|
bool *found,
|
|
bool is_string) {
|
|
if (found) {
|
|
*found = false;
|
|
}
|
|
|
|
if (!stream || !stream->used) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
if (!(stream->mode & VS_STREAM_RD) || !boundary || buf_size == 0 || boundary_size == 0) {
|
|
return 0;
|
|
}
|
|
if (!vs_Lock_lock(&stream->lock)) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
size_t total_read = 0;
|
|
size_t matched_bytes = 0;
|
|
|
|
const size_t full_read_size = buf_size - (is_string ? 1 : 0);
|
|
|
|
while (total_read < full_read_size) {
|
|
if (!vs_refill_read(stream)) {
|
|
vs_Lock_unlock(&stream->lock);
|
|
return total_read;
|
|
}
|
|
|
|
/* Process the buffer until we reach its end or the desired read size */
|
|
while (stream->bufpos[VS_SB_RD] < stream->bufend[VS_SB_RD]) {
|
|
const uint8_t byte = *stream->bufpos[VS_SB_RD];
|
|
|
|
if (is_string && byte == '\0') {
|
|
++stream->bufpos[VS_SB_RD]; /* Skip past the NULL */
|
|
vs_Lock_unlock(&stream->lock);
|
|
return total_read;
|
|
}
|
|
|
|
/* Boundary matching */
|
|
if (byte == *((const uint8_t *)boundary + matched_bytes)) {
|
|
++matched_bytes;
|
|
|
|
/* If we've matched the entire boundary */
|
|
if (matched_bytes == boundary_size) {
|
|
/* +1 since matched bytes was incremented */
|
|
const size_t total = total_read - matched_bytes + 1;
|
|
|
|
if (found) {
|
|
*found = true;
|
|
}
|
|
|
|
if (is_string && buf) {
|
|
*((uint8_t *)buf + total) = '\0';
|
|
}
|
|
|
|
++stream->bufpos[VS_SB_RD];
|
|
|
|
return vs_Lock_unlock(&stream->lock) ? total : VS_STREAM_ERROR;
|
|
}
|
|
} else {
|
|
matched_bytes = 0;
|
|
}
|
|
|
|
if (buf) {
|
|
*((uint8_t *)buf + total_read) = byte;
|
|
}
|
|
|
|
++total_read;
|
|
++stream->bufpos[VS_SB_RD];
|
|
|
|
if (total_read >= full_read_size) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (is_string) {
|
|
if (buf) {
|
|
*((uint8_t *)buf + total_read) = '\0';
|
|
}
|
|
|
|
++total_read;
|
|
}
|
|
|
|
return vs_Lock_unlock(&stream->lock) ? total_read : VS_STREAM_ERROR;
|
|
}
|
|
|
|
size_t vs_Stream_readbf(vs_Stream *stream,
|
|
void *buf,
|
|
size_t buf_size,
|
|
bool *found,
|
|
bool is_string,
|
|
const char *boundary_fmt,
|
|
...) {
|
|
if (!stream || !stream->used) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
if (!(stream->mode & VS_STREAM_RD) || !buf || !boundary_fmt || buf_size == 0) {
|
|
return 0;
|
|
}
|
|
|
|
va_list args;
|
|
char *temp_buf = NULL;
|
|
size_t size = 0;
|
|
ssize_t needed = 0;
|
|
|
|
va_start(args, boundary_fmt);
|
|
|
|
/* Determine the size needed for the buffer */
|
|
|
|
va_list args_copy;
|
|
va_copy(args_copy, args);
|
|
needed = vsnprintf(NULL, 0, boundary_fmt, args_copy) + 1; /* +1 for null terminator */
|
|
va_end(args_copy);
|
|
|
|
if (needed <= 0) {
|
|
va_end(args);
|
|
return 0;
|
|
}
|
|
|
|
size = (size_t)needed;
|
|
|
|
/* Alocate a dynamic temporary buffer */
|
|
|
|
temp_buf = VS_MALLOC(size);
|
|
|
|
if (!temp_buf) {
|
|
va_end(args);
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
/* Format the string into the allocated buffer */
|
|
|
|
needed = vsnprintf(temp_buf, size, boundary_fmt, args);
|
|
va_end(args);
|
|
|
|
if (needed < 0) {
|
|
VS_FREE(temp_buf);
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
size = (size_t)needed;
|
|
|
|
/* Use readb() abstraction */
|
|
|
|
size = vs_Stream_readb(stream, buf, buf_size, temp_buf, size, found, is_string);
|
|
VS_FREE(temp_buf);
|
|
return size;
|
|
}
|
|
|
|
size_t vs_Stream_write(vs_Stream *stream, const void *buf, size_t count) {
|
|
size_t total_written = 0;
|
|
|
|
if (!stream || !stream->used) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
if (!(stream->mode & VS_STREAM_WR) || !buf || count == 0) {
|
|
return 0;
|
|
}
|
|
if (!vs_Lock_lock(&stream->lock)) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
while (total_written < count) {
|
|
size_t bytes_available = (size_t)(stream->bufend[VS_SB_WR] - stream->bufpos[VS_SB_WR]);
|
|
|
|
if (bytes_available == 0) {
|
|
if (!vs_Lock_unlock(&stream->lock)) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
if (!vs_Stream_flush(stream)) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
if (!vs_Lock_lock(&stream->lock)) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
bytes_available = (size_t)(stream->bufend[VS_SB_WR] - stream->bufpos[VS_SB_WR]);
|
|
}
|
|
|
|
const size_t bytes_to_copy = VS_MIN(bytes_available, count - total_written);
|
|
|
|
if (buf) {
|
|
memcpy(stream->bufpos[VS_SB_WR], (const uint8_t *)buf + total_written, bytes_to_copy);
|
|
}
|
|
|
|
total_written += bytes_to_copy;
|
|
stream->bufpos[VS_SB_WR] += bytes_to_copy;
|
|
}
|
|
|
|
return vs_Lock_unlock(&stream->lock) ? total_written : VS_STREAM_ERROR;
|
|
}
|
|
|
|
size_t vs_Stream_writef(vs_Stream *stream, const char *fmt, ...) {
|
|
if (!stream || !stream->used) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
if (!(stream->mode & VS_STREAM_WR) || !fmt) {
|
|
return 0;
|
|
}
|
|
|
|
va_list args;
|
|
char *temp_buf = NULL;
|
|
size_t size = 0;
|
|
ssize_t needed = 0;
|
|
|
|
va_start(args, fmt);
|
|
|
|
/* Determine the size needed for the buffer */
|
|
|
|
va_list args_copy;
|
|
va_copy(args_copy, args);
|
|
needed = vsnprintf(NULL, 0, fmt, args_copy) + 1; /* +1 for null terminator */
|
|
va_end(args_copy);
|
|
|
|
if (needed <= 0) {
|
|
va_end(args);
|
|
return 0;
|
|
}
|
|
|
|
size = (size_t)needed;
|
|
|
|
/* Alocate a dynamic temporary buffer */
|
|
|
|
temp_buf = VS_MALLOC(size);
|
|
|
|
if (!temp_buf) {
|
|
va_end(args);
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
/* Format the string into the allocated buffer */
|
|
|
|
needed = vsnprintf(temp_buf, size, fmt, args);
|
|
|
|
if (needed < 0) {
|
|
VS_FREE(temp_buf);
|
|
va_end(args);
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
va_end(args);
|
|
|
|
size = (size_t)needed;
|
|
|
|
/* Now, just use the write abstraction */
|
|
|
|
size = vs_Stream_write(stream, temp_buf, size);
|
|
VS_FREE(temp_buf);
|
|
return size;
|
|
}
|
|
|
|
bool vs_Stream_flush(vs_Stream *stream) {
|
|
if (!stream || !stream->used || !(stream->mode & VS_STREAM_WR) ||
|
|
!vs_Lock_lock(&stream->lock)) {
|
|
return false;
|
|
}
|
|
|
|
if (stream->bufpos[VS_SB_WR] > stream->buf[VS_SB_WR]) {
|
|
const size_t write_size = (size_t)(stream->bufpos[VS_SB_WR] - stream->buf[VS_SB_WR]);
|
|
|
|
if (vs_File_write(&stream->file, stream->buf[VS_SB_WR], write_size) != write_size) {
|
|
vs_Lock_unlock(&stream->lock);
|
|
return false;
|
|
}
|
|
|
|
stream->bufpos[VS_SB_WR] = stream->buf[VS_SB_WR];
|
|
}
|
|
|
|
return vs_Lock_unlock(&stream->lock);
|
|
}
|
|
|
|
bool vs_Stream_stop(vs_Stream *stream) {
|
|
if (!stream || !stream->used) {
|
|
return false;
|
|
}
|
|
|
|
vs_Stream_flush(stream);
|
|
|
|
if (!vs_Lock_lock(&stream->lock)) {
|
|
return false;
|
|
}
|
|
|
|
stream->used = false;
|
|
|
|
stream->bufpos[VS_SB_RD] = stream->buf[VS_SB_RD];
|
|
stream->bufpos[VS_SB_WR] = stream->buf[VS_SB_WR];
|
|
|
|
return vs_Lock_unlock(&stream->lock);
|
|
}
|
|
|
|
bool vs_Stream_destroy(vs_Stream *stream) {
|
|
if (!stream) {
|
|
return false;
|
|
}
|
|
|
|
if (!vs_Lock_lock(&stream->lock)) {
|
|
return false;
|
|
}
|
|
|
|
if ((stream->mode & VS_STREAM_RDWR) == VS_STREAM_RDWR) {
|
|
VS_FREE(stream->buf[VS_SB_RD]);
|
|
} else {
|
|
if (stream->mode & VS_STREAM_RD) {
|
|
VS_FREE(stream->buf[VS_SB_RD]);
|
|
}
|
|
|
|
if (stream->mode & VS_STREAM_WR) {
|
|
VS_FREE(stream->buf[VS_SB_WR]);
|
|
}
|
|
}
|
|
|
|
const bool stopped = vs_Stream_stop(stream);
|
|
const bool destroyed = vs_File_destroy(&stream->file);
|
|
const bool unlocked = vs_Lock_unlock(&stream->lock);
|
|
const bool lock_destroyed = vs_Lock_destroy(&stream->lock);
|
|
|
|
return stopped && destroyed && unlocked && lock_destroyed;
|
|
}
|
|
|
|
bool vs_Stream_drop(vs_Stream *stream) {
|
|
if (!stream) {
|
|
return false;
|
|
}
|
|
|
|
const bool stopped = vs_Stream_stop(stream);
|
|
const bool closed = vs_File_close(&stream->file);
|
|
|
|
return stopped && closed;
|
|
}
|
|
|
|
size_t vs_ByteStream_ignore_err(size_t ret) { return ret == VS_BYTE_STREAM_ERROR ? 0 : ret; }
|
|
|
|
bool vs_ByteStream_init(vs_ByteStream *stream) {
|
|
if (!stream) {
|
|
return false;
|
|
}
|
|
|
|
if (!vs_Lock_init(&stream->_lock)) {
|
|
return false;
|
|
}
|
|
|
|
if (!vs_Lock_lock(&stream->_lock)) {
|
|
vs_Lock_destroy(&stream->_lock);
|
|
return false;
|
|
}
|
|
|
|
stream->_cap = VS_BYTE_STREAM_INIT_CAP;
|
|
stream->_buf = VS_MALLOC(VS_BYTE_STREAM_INIT_CAP);
|
|
|
|
if (!stream->_buf) {
|
|
vs_Lock_unlock(&stream->_lock);
|
|
vs_Lock_destroy(&stream->_lock);
|
|
return false;
|
|
}
|
|
|
|
stream->_read_pos = stream->_buf;
|
|
stream->_write_pos = stream->_buf;
|
|
|
|
stream->_stream = NULL;
|
|
|
|
if (!vs_Lock_unlock(&stream->_lock)) {
|
|
VS_FREE(stream->_buf);
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool vs_ByteStream_init_redirect(vs_ByteStream *from_bstream, vs_Stream *to_stream) {
|
|
if (!from_bstream || !to_stream) {
|
|
return false;
|
|
}
|
|
|
|
from_bstream->_stream = to_stream;
|
|
return true;
|
|
}
|
|
|
|
bool vs_ByteStream_read1(vs_ByteStream *stream, void *buf) {
|
|
if (!stream) {
|
|
return false;
|
|
}
|
|
|
|
if (stream->_stream) {
|
|
return vs_Stream_read1(stream->_stream, buf);
|
|
}
|
|
|
|
if (!vs_Lock_lock(&stream->_lock)) {
|
|
return false;
|
|
}
|
|
|
|
if (stream->_read_pos >= stream->_write_pos) {
|
|
vs_Lock_unlock(&stream->_lock);
|
|
return false;
|
|
}
|
|
|
|
if (buf) {
|
|
*(uint8_t *)buf = *stream->_read_pos;
|
|
}
|
|
++stream->_read_pos;
|
|
|
|
return vs_Lock_unlock(&stream->_lock);
|
|
}
|
|
|
|
size_t vs_ByteStream_read(vs_ByteStream *stream, void *buf, size_t count) {
|
|
if (!stream) {
|
|
return VS_BYTE_STREAM_ERROR;
|
|
}
|
|
|
|
if (stream->_stream) {
|
|
return vs_Stream_read(stream->_stream, buf, count);
|
|
}
|
|
|
|
if (!vs_Lock_lock(&stream->_lock)) {
|
|
return VS_BYTE_STREAM_ERROR;
|
|
}
|
|
|
|
const size_t available = (size_t)(stream->_write_pos - stream->_read_pos);
|
|
const size_t to_read = VS_MIN(count, available);
|
|
|
|
if (buf && to_read > 0) {
|
|
memcpy(buf, stream->_read_pos, to_read);
|
|
}
|
|
|
|
stream->_read_pos += to_read;
|
|
|
|
/* Shift remaining data to the start */
|
|
const size_t remaining = (size_t)(stream->_write_pos - stream->_read_pos);
|
|
if (remaining > 0) {
|
|
memmove(stream->_buf, stream->_read_pos, remaining);
|
|
}
|
|
stream->_read_pos = stream->_buf;
|
|
stream->_write_pos = stream->_buf + remaining;
|
|
|
|
vs_Lock_unlock(&stream->_lock);
|
|
return to_read;
|
|
}
|
|
|
|
size_t vs_ByteStream_write(vs_ByteStream *stream, const void *buf, size_t count) {
|
|
if (!stream) {
|
|
return VS_BYTE_STREAM_ERROR;
|
|
}
|
|
|
|
if (stream->_stream) {
|
|
return vs_Stream_write(stream->_stream, buf, count);
|
|
}
|
|
|
|
if (!buf || count == 0) {
|
|
return count;
|
|
}
|
|
|
|
if (!vs_Lock_lock(&stream->_lock)) {
|
|
return VS_BYTE_STREAM_ERROR;
|
|
}
|
|
|
|
const size_t used = (size_t)(stream->_write_pos - stream->_buf);
|
|
size_t space = stream->_cap - used;
|
|
|
|
while (space < count) {
|
|
size_t new_cap = (size_t)((double)stream->_cap * VS_BYTE_STREAM_CAP_MULTIPLIER);
|
|
while (new_cap - used < count) {
|
|
new_cap = (size_t)((double)new_cap * VS_BYTE_STREAM_CAP_MULTIPLIER);
|
|
}
|
|
|
|
uint8_t *new_buf = VS_REALLOC(stream->_buf, new_cap);
|
|
if (!new_buf) {
|
|
vs_Lock_unlock(&stream->_lock);
|
|
return VS_BYTE_STREAM_ERROR;
|
|
}
|
|
|
|
/* Update pointers based on new base */
|
|
const size_t read_offset = (size_t)(stream->_read_pos - stream->_buf);
|
|
const size_t write_offset = (size_t)(stream->_write_pos - stream->_buf);
|
|
|
|
stream->_buf = new_buf;
|
|
stream->_cap = new_cap;
|
|
stream->_read_pos = stream->_buf + read_offset;
|
|
stream->_write_pos = stream->_buf + write_offset;
|
|
|
|
space = new_cap - write_offset;
|
|
}
|
|
|
|
memcpy(stream->_write_pos, buf, count);
|
|
stream->_write_pos += count;
|
|
|
|
vs_Lock_unlock(&stream->_lock);
|
|
return count;
|
|
}
|
|
|
|
bool vs_ByteStream_destroy(vs_ByteStream *stream) {
|
|
if (!stream) {
|
|
return false;
|
|
}
|
|
|
|
if (stream->_stream) {
|
|
return vs_Stream_destroy(stream->_stream);
|
|
}
|
|
|
|
if (!vs_Lock_lock(&stream->_lock)) {
|
|
return false;
|
|
}
|
|
|
|
VS_FREE(stream->_buf);
|
|
|
|
stream->_buf = NULL;
|
|
stream->_read_pos = NULL;
|
|
stream->_write_pos = NULL;
|
|
stream->_cap = 0;
|
|
|
|
const bool unlocked = vs_Lock_unlock(&stream->_lock);
|
|
const bool destroyed = vs_Lock_destroy(&stream->_lock);
|
|
|
|
return unlocked && destroyed;
|
|
}
|
|
|
|
size_t vs_ByteStream_readb(vs_ByteStream *stream,
|
|
void *buf,
|
|
size_t buf_size,
|
|
const void *boundary,
|
|
size_t boundary_size,
|
|
bool *found,
|
|
bool is_string) {
|
|
if (!stream) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
if (stream->_stream) {
|
|
return vs_Stream_readb(
|
|
stream->_stream, buf, buf_size, boundary, boundary_size, found, is_string);
|
|
}
|
|
|
|
if (found) {
|
|
*found = false;
|
|
}
|
|
|
|
if (!boundary || boundary_size == 0 || buf_size == 0) {
|
|
return 0;
|
|
}
|
|
|
|
if (!vs_Lock_lock(&stream->_lock)) {
|
|
return VS_BYTE_STREAM_ERROR;
|
|
}
|
|
|
|
const size_t max_size = buf_size - (is_string ? 1 : 0);
|
|
const size_t available = (size_t)(stream->_write_pos - stream->_read_pos);
|
|
|
|
size_t total_read = 0;
|
|
size_t matched = 0;
|
|
|
|
const uint8_t *read_ptr = stream->_read_pos;
|
|
|
|
while (total_read < max_size && total_read < available) {
|
|
uint8_t byte = *read_ptr;
|
|
|
|
if (byte == ((const uint8_t *)boundary)[matched]) {
|
|
++matched;
|
|
|
|
if (is_string && byte == '\0') {
|
|
stream->_read_pos += total_read + 1;
|
|
|
|
size_t remaining = (size_t)(stream->_write_pos - stream->_read_pos);
|
|
if (remaining > 0) {
|
|
memmove(stream->_buf, stream->_read_pos, remaining);
|
|
}
|
|
stream->_read_pos = stream->_buf;
|
|
stream->_write_pos = stream->_buf + remaining;
|
|
|
|
vs_Lock_unlock(&stream->_lock);
|
|
return total_read;
|
|
}
|
|
|
|
if (matched == boundary_size) {
|
|
if (found) {
|
|
*found = true;
|
|
}
|
|
|
|
/* Copy matched boundary if buf is set */
|
|
const size_t copy_len = total_read + 1;
|
|
|
|
if (buf) {
|
|
memcpy(buf, stream->_read_pos, copy_len);
|
|
if (is_string) {
|
|
((uint8_t *)buf)[copy_len] = '\0';
|
|
}
|
|
}
|
|
|
|
/* Move read position past boundary */
|
|
stream->_read_pos += copy_len;
|
|
|
|
/* Shift remaining data to front */
|
|
const size_t remaining = (size_t)(stream->_write_pos - stream->_read_pos);
|
|
if (remaining > 0) {
|
|
memmove(stream->_buf, stream->_read_pos, remaining);
|
|
}
|
|
stream->_read_pos = stream->_buf;
|
|
stream->_write_pos = stream->_buf + remaining;
|
|
|
|
vs_Lock_unlock(&stream->_lock);
|
|
return is_string ? copy_len + 1 : copy_len;
|
|
}
|
|
} else {
|
|
matched = 0;
|
|
}
|
|
|
|
if (buf) {
|
|
((uint8_t *)buf)[total_read] = byte;
|
|
}
|
|
|
|
++read_ptr;
|
|
++total_read;
|
|
}
|
|
|
|
if (buf && total_read > 0) {
|
|
memcpy(buf, stream->_read_pos, total_read);
|
|
if (is_string) {
|
|
((uint8_t *)buf)[total_read] = '\0';
|
|
}
|
|
}
|
|
|
|
stream->_read_pos += total_read;
|
|
|
|
const size_t remaining = (size_t)(stream->_write_pos - stream->_read_pos);
|
|
if (remaining > 0) {
|
|
memmove(stream->_buf, stream->_read_pos, remaining);
|
|
}
|
|
stream->_read_pos = stream->_buf;
|
|
stream->_write_pos = stream->_buf + remaining;
|
|
|
|
vs_Lock_unlock(&stream->_lock);
|
|
return is_string ? total_read + 1 : total_read;
|
|
}
|
|
|
|
size_t vs_ByteStream_readbf(vs_ByteStream *stream,
|
|
void *buf,
|
|
size_t buf_size,
|
|
bool *found,
|
|
bool is_string,
|
|
const char *boundary_fmt,
|
|
...) {
|
|
if (!stream) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
if (!buf || !boundary_fmt || buf_size == 0) {
|
|
return 0;
|
|
}
|
|
|
|
va_list args;
|
|
char *temp_buf = NULL;
|
|
size_t size = 0;
|
|
ssize_t needed = 0;
|
|
|
|
va_start(args, boundary_fmt);
|
|
|
|
/* Determine the size needed for the buffer */
|
|
|
|
va_list args_copy;
|
|
va_copy(args_copy, args);
|
|
needed = vsnprintf(NULL, 0, boundary_fmt, args_copy) + 1; /* +1 for null terminator */
|
|
va_end(args_copy);
|
|
|
|
if (needed <= 0) {
|
|
va_end(args);
|
|
return 0;
|
|
}
|
|
|
|
size = (size_t)needed;
|
|
|
|
/* Alocate a dynamic temporary buffer */
|
|
|
|
temp_buf = VS_MALLOC(size);
|
|
|
|
if (!temp_buf) {
|
|
va_end(args);
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
/* Format the string into the allocated buffer */
|
|
|
|
needed = vsnprintf(temp_buf, size, boundary_fmt, args);
|
|
va_end(args);
|
|
|
|
if (needed < 0) {
|
|
VS_FREE(temp_buf);
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
size = (size_t)needed;
|
|
|
|
/* Use readb() abstraction */
|
|
|
|
size = vs_ByteStream_readb(stream, buf, buf_size, temp_buf, size, found, is_string);
|
|
VS_FREE(temp_buf);
|
|
return size;
|
|
}
|
|
|
|
size_t vs_ByteStream_writef(vs_ByteStream *stream, const char *fmt, ...) {
|
|
if (!stream) {
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
if (!fmt) {
|
|
return 0;
|
|
}
|
|
|
|
va_list args;
|
|
char *temp_buf = NULL;
|
|
size_t size = 0;
|
|
ssize_t needed = 0;
|
|
|
|
va_start(args, fmt);
|
|
|
|
/* Determine the size needed for the buffer */
|
|
|
|
va_list args_copy;
|
|
va_copy(args_copy, args);
|
|
needed = vsnprintf(NULL, 0, fmt, args_copy) + 1; /* +1 for null terminator */
|
|
va_end(args_copy);
|
|
|
|
if (needed <= 0) {
|
|
va_end(args);
|
|
return 0;
|
|
}
|
|
|
|
size = (size_t)needed;
|
|
|
|
/* Alocate a dynamic temporary buffer */
|
|
|
|
temp_buf = VS_MALLOC(size);
|
|
|
|
if (!temp_buf) {
|
|
va_end(args);
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
/* Format the string into the allocated buffer */
|
|
|
|
needed = vsnprintf(temp_buf, size, fmt, args);
|
|
|
|
if (needed < 0) {
|
|
VS_FREE(temp_buf);
|
|
va_end(args);
|
|
return VS_STREAM_ERROR;
|
|
}
|
|
|
|
va_end(args);
|
|
|
|
size = (size_t)needed;
|
|
|
|
/* Now, just use the write abstraction */
|
|
|
|
size = vs_ByteStream_write(stream, temp_buf, size);
|
|
VS_FREE(temp_buf);
|
|
return size;
|
|
}
|
|
|
|
const void *vs_ByteStream_buf(const vs_ByteStream *stream) {
|
|
return stream->_stream ? NULL : stream->_buf;
|
|
}
|
|
|
|
size_t vs_ByteStream_len(const vs_ByteStream *stream) {
|
|
return stream->_stream ? 0 : (size_t)(stream->_write_pos - stream->_read_pos);
|
|
}
|