Skip to content

Commit

Permalink
config: add storage.chunk_max_size
Browse files Browse the repository at this point in the history
Add a configuration value for the storage chunk max size.

Signed-off-by: braydonk <[email protected]>
  • Loading branch information
braydonk committed Oct 1, 2024
1 parent 161e834 commit c2e626a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
24 changes: 13 additions & 11 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ struct flb_config {
int is_running; /* service running ? */
double flush; /* Flush timeout */

/*
* Maximum grace time on shutdown. If set to -1, the engine will
/*
* Maximum grace time on shutdown. If set to -1, the engine will
* shutdown when all remaining tasks are flushed
*/
int grace;
int grace;
int grace_count; /* Count of grace shutdown tries */
flb_pipefd_t flush_fd; /* Timer FD associated to flush */
int convert_nan_to_null; /* convert null to nan ? */
Expand Down Expand Up @@ -227,6 +227,7 @@ struct flb_config {
char *storage_bl_mem_limit; /* storage backlog memory limit */
struct flb_storage_metrics *storage_metrics_ctx; /* storage metrics context */
int storage_trim_files; /* enable/disable file trimming */
size_t storage_chunk_max_size; /* The max chunk size */

/* Embedded SQL Database support (SQLite3) */
#ifdef FLB_HAVE_SQLDB
Expand Down Expand Up @@ -354,15 +355,16 @@ enum conf_type {
#define FLB_CONF_DNS_PREFER_IPV6 "dns.prefer_ipv6"

/* Storage / Chunk I/O */
#define FLB_CONF_STORAGE_PATH "storage.path"
#define FLB_CONF_STORAGE_SYNC "storage.sync"
#define FLB_CONF_STORAGE_METRICS "storage.metrics"
#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum"
#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit"
#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up"
#define FLB_CONF_STORAGE_PATH "storage.path"
#define FLB_CONF_STORAGE_SYNC "storage.sync"
#define FLB_CONF_STORAGE_METRICS "storage.metrics"
#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum"
#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit"
#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up"
#define FLB_CONF_STORAGE_DELETE_IRRECOVERABLE_CHUNKS \
"storage.delete_irrecoverable_chunks"
#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files"
"storage.delete_irrecoverable_chunks"
#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files"
#define FLB_CONF_STORAGE_CHUNK_MAX_SIZE "storage.chunk_max_size"

/* Coroutines */
#define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size"
Expand Down
5 changes: 5 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <fluent-bit/flb_config_format.h>
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/flb_bucket_queue.h>
#include <fluent-bit/flb_input_chunk.h>

const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL";

Expand Down Expand Up @@ -154,6 +155,9 @@ struct flb_service_config service_configs[] = {
{FLB_CONF_STORAGE_TRIM_FILES,
FLB_CONF_TYPE_BOOL,
offsetof(struct flb_config, storage_trim_files)},
{FLB_CONF_STORAGE_CHUNK_MAX_SIZE,
FLB_CONF_TYPE_INT,
offsetof(struct flb_config, storage_chunk_max_size)},

/* Coroutines */
{FLB_CONF_STR_CORO_STACK_SIZE,
Expand Down Expand Up @@ -278,6 +282,7 @@ struct flb_config *flb_config_init()
config->storage_path = NULL;
config->storage_input_plugin = NULL;
config->storage_metrics = FLB_TRUE;
config->storage_chunk_max_size = FLB_INPUT_CHUNK_FS_MAX_SIZE;

config->sched_cap = FLB_SCHED_CAP;
config->sched_base = FLB_SCHED_BASE;
Expand Down
10 changes: 6 additions & 4 deletions src/flb_input_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ static void buffer_entry_destroy(struct buffer_entry *entry) {
}

static int split_buffer_entry(struct buffer_entry *entry,
struct mk_list *entries)
struct mk_list *entries,
int buf_entry_max_size)
{
int ret;
int encoder_result;
Expand Down Expand Up @@ -114,7 +115,7 @@ static int split_buffer_entry(struct buffer_entry *entry,
continue;
}

if (log_encoder.output_length >= FLB_INPUT_CHUNK_FS_MAX_SIZE) {
if (log_encoder.output_length >= buf_entry_max_size) {
tmp_encoder_buf_size = log_encoder.output_length;
tmp_encoder_buf = log_encoder.output_buffer;
flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
Expand Down Expand Up @@ -191,13 +192,14 @@ static int input_log_append(struct flb_input_instance *ins,
if (buf_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) {
mk_list_init(&buffers);
start_buffer = new_buffer_entry(buf, buf_size);
split_buffer_entry(start_buffer, &buffers);
split_buffer_entry(start_buffer, &buffers, ins->config->storage_chunk_max_size);
flb_free(start_buffer);
mk_list_foreach_safe(head, tmp, &buffers) {
iter_buffer = mk_list_entry(head, struct buffer_entry, _head);
records = flb_mp_count(iter_buffer->buf, iter_buffer->buf_size);
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records,
tag, tag_len, iter_buffer->buf, iter_buffer->buf_size);
tag, tag_len,
iter_buffer->buf, iter_buffer->buf_size);
buffer_entry_destroy(iter_buffer);
}
} else {
Expand Down

0 comments on commit c2e626a

Please sign in to comment.