Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] proof of concept for input plugins #9231

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ struct flb_config {
int is_running; /* service running ? */
double flush; /* Flush timeout */

/*
* Maximum grace time on shutdown. If set to -1, the engine will
/*
* Maximum grace time on shutdown. If set to -1, the engine will
* shutdown when all remaining tasks are flushed
*/
int grace;
int grace;
int grace_count; /* Count of grace shutdown tries */
flb_pipefd_t flush_fd; /* Timer FD associated to flush */
int convert_nan_to_null; /* convert null to nan ? */
Expand Down
81 changes: 44 additions & 37 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ struct flb_input_instance {
/* flag to pause input when storage is full */
int storage_pause_on_chunks_overlimit;

struct flb_input_metrics *input_metrics;

/*
* Input network info:
*
Expand Down Expand Up @@ -309,43 +311,6 @@ struct flb_input_instance {
/* List of downstreams */
struct mk_list downstreams;

/*
* CMetrics
* --------
*
* All metrics available for an input plugin instance.
*/
struct cmt *cmt; /* parent context */
struct cmt_counter *cmt_bytes; /* metric: input_bytes_total */
struct cmt_counter *cmt_records; /* metric: input_records_total */

/* is the input instance overlimit ?: 1 or 0 */
struct cmt_gauge *cmt_storage_overlimit;

/* is the input instance paused or not ?: 1 or 0 */
struct cmt_gauge *cmt_ingestion_paused;

/* memory bytes used by chunks */
struct cmt_gauge *cmt_storage_memory_bytes;

/* total number of chunks */
struct cmt_gauge *cmt_storage_chunks;

/* total number of chunks up in memory */
struct cmt_gauge *cmt_storage_chunks_up;

/* total number of chunks down */
struct cmt_gauge *cmt_storage_chunks_down;

/* number of chunks in a busy state */
struct cmt_gauge *cmt_storage_chunks_busy;

/* total bytes used by chunks in a busy state */
struct cmt_gauge *cmt_storage_chunks_busy_bytes;

/* memory ring buffer (memrb) metrics */
struct cmt_counter *cmt_memrb_dropped_chunks;
struct cmt_counter *cmt_memrb_dropped_bytes;

/*
* Indexes for generated chunks: simple hash tables that keeps the latest
Expand Down Expand Up @@ -385,6 +350,46 @@ struct flb_input_instance {
struct flb_config *config;
};

struct flb_input_metrics {
/*
* CMetrics
* --------
*
* All metrics available for an input plugin instance.
*/
struct cmt *cmt; /* parent context */
struct cmt_counter *cmt_bytes; /* metric: input_bytes_total */
struct cmt_counter *cmt_records; /* metric: input_records_total */

/* is the input instance overlimit ?: 1 or 0 */
struct cmt_gauge *cmt_storage_overlimit;

/* is the input instance paused or not ?: 1 or 0 */
struct cmt_gauge *cmt_ingestion_paused;

/* memory bytes used by chunks */
struct cmt_gauge *cmt_storage_memory_bytes;

/* total number of chunks */
struct cmt_gauge *cmt_storage_chunks;

/* total number of chunks up in memory */
struct cmt_gauge *cmt_storage_chunks_up;

/* total number of chunks down */
struct cmt_gauge *cmt_storage_chunks_down;

/* number of chunks in a busy state */
struct cmt_gauge *cmt_storage_chunks_busy;

/* total bytes used by chunks in a busy state */
struct cmt_gauge *cmt_storage_chunks_busy_bytes;

/* memory ring buffer (memrb) metrics */
struct cmt_counter *cmt_memrb_dropped_chunks;
struct cmt_counter *cmt_memrb_dropped_bytes;
};

struct flb_input_collector {
struct mk_event event;
struct mk_event_loop *evl; /* event loop */
Expand Down Expand Up @@ -733,6 +738,8 @@ int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *in
int flb_input_downstream_set(struct flb_downstream *stream,
struct flb_input_instance *ins);

struct flb_input_metrics *flb_input_metrics_create();

/* processors */
int flb_input_instance_processors_load(struct flb_input_instance *ins, struct flb_cf_group *processors);

Expand Down
2 changes: 1 addition & 1 deletion plugins/in_fluentbit_metrics/metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ static int in_metrics_init(struct flb_input_instance *in,
ctx->coll_fd_runtime = ret;

/* Internal metrics */
ctx->c = cmt_counter_create(ctx->ins->cmt,
ctx->c = cmt_counter_create(ctx->ins->input_metrics->cmt,
"fluentbit", "input_metrics", "scrapes_total",
"Number of total metrics scrapes",
1, (char *[]) {"name"});
Expand Down
6 changes: 3 additions & 3 deletions plugins/in_podman_metrics/podman_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ static int create_counter(struct flb_in_metrics *ctx, struct cmt_counter **count
/* if counter was not yet created, it means that this function is called for the first time per counter type */
if (*counter == NULL) {
flb_plg_debug(ctx->ins, "Creating counter for %s, %s_%s_%s", name, COUNTER_PREFIX, metric_prefix, metric_name);
*counter = cmt_counter_create(ctx->ins->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields);
*counter = cmt_counter_create(ctx->ins->input_metrics->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields);
}

/* Allow setting value that is not grater that current one (if, for example, memory usage stays exactly the same) */
Expand Down Expand Up @@ -279,7 +279,7 @@ static int create_gauge(struct flb_in_metrics *ctx, struct cmt_gauge **gauge, fl
/* if gauge was not yet created, it means that this function is called for the first time per counter type */
if (*gauge == NULL) {
flb_plg_debug(ctx->ins, "Creating gauge for %s, %s_%s_%s", name, COUNTER_PREFIX, metric_prefix, metric_name);
*gauge = cmt_gauge_create(ctx->ins->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields);
*gauge = cmt_gauge_create(ctx->ins->input_metrics->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields);
}

flb_plg_debug(ctx->ins, "Set gauge for %s, %s_%s_%s: %lu", name, COUNTER_PREFIX, metric_prefix, metric_name, value);
Expand Down Expand Up @@ -386,7 +386,7 @@ static int scrape_metrics(struct flb_config *config, struct flb_in_metrics *ctx)
return -1;
}

if (flb_input_metrics_append(ctx->ins, NULL, 0, ctx->ins->cmt) == -1) {
if (flb_input_metrics_append(ctx->ins, NULL, 0, ctx->ins->input_metrics->cmt) == -1) {
flb_plg_error(ctx->ins, "Could not append metrics");
return -1;
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/in_tail/tail_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,19 +396,19 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
#endif

#ifdef FLB_HAVE_METRICS
ctx->cmt_files_opened = cmt_counter_create(ins->cmt,
ctx->cmt_files_opened = cmt_counter_create(ins->input_metrics->cmt,
"fluentbit", "input",
"files_opened_total",
"Total number of opened files",
1, (char *[]) {"name"});

ctx->cmt_files_closed = cmt_counter_create(ins->cmt,
ctx->cmt_files_closed = cmt_counter_create(ins->input_metrics->cmt,
"fluentbit", "input",
"files_closed_total",
"Total number of closed files",
1, (char *[]) {"name"});

ctx->cmt_files_rotated = cmt_counter_create(ins->cmt,
ctx->cmt_files_rotated = cmt_counter_create(ins->input_metrics->cmt,
"fluentbit", "input",
"files_rotated_total",
"Total number of rotated files",
Expand Down
Loading
Loading