diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 82003386d6e..0c51d57eef5 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -48,11 +48,11 @@ struct flb_config { int is_running; /* service running ? */ double flush; /* Flush timeout */ - /* - * Maximum grace time on shutdown. If set to -1, the engine will + /* + * Maximum grace time on shutdown. If set to -1, the engine will * shutdown when all remaining tasks are flushed */ - int grace; + int grace; int grace_count; /* Count of grace shutdown tries */ flb_pipefd_t flush_fd; /* Timer FD associated to flush */ int convert_nan_to_null; /* convert null to nan ? */ diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index 9140b1dc71f..7a2bdf91c2b 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -189,6 +189,8 @@ struct flb_input_instance { /* flag to pause input when storage is full */ int storage_pause_on_chunks_overlimit; + struct flb_input_metrics *input_metrics; + /* * Input network info: * @@ -309,43 +311,6 @@ struct flb_input_instance { /* List of downstreams */ struct mk_list downstreams; - /* - * CMetrics - * -------- - * - * All metrics available for an input plugin instance. - */ - struct cmt *cmt; /* parent context */ - struct cmt_counter *cmt_bytes; /* metric: input_bytes_total */ - struct cmt_counter *cmt_records; /* metric: input_records_total */ - - /* is the input instance overlimit ?: 1 or 0 */ - struct cmt_gauge *cmt_storage_overlimit; - - /* is the input instance paused or not ?: 1 or 0 */ - struct cmt_gauge *cmt_ingestion_paused; - - /* memory bytes used by chunks */ - struct cmt_gauge *cmt_storage_memory_bytes; - - /* total number of chunks */ - struct cmt_gauge *cmt_storage_chunks; - - /* total number of chunks up in memory */ - struct cmt_gauge *cmt_storage_chunks_up; - - /* total number of chunks down */ - struct cmt_gauge *cmt_storage_chunks_down; - - /* number of chunks in a busy state */ - struct cmt_gauge *cmt_storage_chunks_busy; - - /* total bytes used by chunks in a busy state */ - struct cmt_gauge *cmt_storage_chunks_busy_bytes; - - /* memory ring buffer (memrb) metrics */ - struct cmt_counter *cmt_memrb_dropped_chunks; - struct cmt_counter *cmt_memrb_dropped_bytes; /* * Indexes for generated chunks: simple hash tables that keeps the latest @@ -385,6 +350,46 @@ struct flb_input_instance { struct flb_config *config; }; +struct flb_input_metrics { + /* + * CMetrics + * -------- + * + * All metrics available for an input plugin instance. + */ + struct cmt *cmt; /* parent context */ + struct cmt_counter *cmt_bytes; /* metric: input_bytes_total */ + struct cmt_counter *cmt_records; /* metric: input_records_total */ + + /* is the input instance overlimit ?: 1 or 0 */ + struct cmt_gauge *cmt_storage_overlimit; + + /* is the input instance paused or not ?: 1 or 0 */ + struct cmt_gauge *cmt_ingestion_paused; + + /* memory bytes used by chunks */ + struct cmt_gauge *cmt_storage_memory_bytes; + + /* total number of chunks */ + struct cmt_gauge *cmt_storage_chunks; + + /* total number of chunks up in memory */ + struct cmt_gauge *cmt_storage_chunks_up; + + /* total number of chunks down */ + struct cmt_gauge *cmt_storage_chunks_down; + + /* number of chunks in a busy state */ + struct cmt_gauge *cmt_storage_chunks_busy; + + /* total bytes used by chunks in a busy state */ + struct cmt_gauge *cmt_storage_chunks_busy_bytes; + + /* memory ring buffer (memrb) metrics */ + struct cmt_counter *cmt_memrb_dropped_chunks; + struct cmt_counter *cmt_memrb_dropped_bytes; +}; + struct flb_input_collector { struct mk_event event; struct mk_event_loop *evl; /* event loop */ @@ -733,6 +738,8 @@ int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *in int flb_input_downstream_set(struct flb_downstream *stream, struct flb_input_instance *ins); +struct flb_input_metrics *flb_input_metrics_create(); + /* processors */ int flb_input_instance_processors_load(struct flb_input_instance *ins, struct flb_cf_group *processors); diff --git a/plugins/in_fluentbit_metrics/metrics.c b/plugins/in_fluentbit_metrics/metrics.c index 97b9902bdaa..41293b7742b 100644 --- a/plugins/in_fluentbit_metrics/metrics.c +++ b/plugins/in_fluentbit_metrics/metrics.c @@ -138,7 +138,7 @@ static int in_metrics_init(struct flb_input_instance *in, ctx->coll_fd_runtime = ret; /* Internal metrics */ - ctx->c = cmt_counter_create(ctx->ins->cmt, + ctx->c = cmt_counter_create(ctx->ins->input_metrics->cmt, "fluentbit", "input_metrics", "scrapes_total", "Number of total metrics scrapes", 1, (char *[]) {"name"}); diff --git a/plugins/in_podman_metrics/podman_metrics.c b/plugins/in_podman_metrics/podman_metrics.c index 511ec3cee38..7ccf2be4731 100644 --- a/plugins/in_podman_metrics/podman_metrics.c +++ b/plugins/in_podman_metrics/podman_metrics.c @@ -243,7 +243,7 @@ static int create_counter(struct flb_in_metrics *ctx, struct cmt_counter **count /* if counter was not yet created, it means that this function is called for the first time per counter type */ if (*counter == NULL) { flb_plg_debug(ctx->ins, "Creating counter for %s, %s_%s_%s", name, COUNTER_PREFIX, metric_prefix, metric_name); - *counter = cmt_counter_create(ctx->ins->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields); + *counter = cmt_counter_create(ctx->ins->input_metrics->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields); } /* Allow setting value that is not grater that current one (if, for example, memory usage stays exactly the same) */ @@ -279,7 +279,7 @@ static int create_gauge(struct flb_in_metrics *ctx, struct cmt_gauge **gauge, fl /* if gauge was not yet created, it means that this function is called for the first time per counter type */ if (*gauge == NULL) { flb_plg_debug(ctx->ins, "Creating gauge for %s, %s_%s_%s", name, COUNTER_PREFIX, metric_prefix, metric_name); - *gauge = cmt_gauge_create(ctx->ins->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields); + *gauge = cmt_gauge_create(ctx->ins->input_metrics->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields); } flb_plg_debug(ctx->ins, "Set gauge for %s, %s_%s_%s: %lu", name, COUNTER_PREFIX, metric_prefix, metric_name, value); @@ -386,7 +386,7 @@ static int scrape_metrics(struct flb_config *config, struct flb_in_metrics *ctx) return -1; } - if (flb_input_metrics_append(ctx->ins, NULL, 0, ctx->ins->cmt) == -1) { + if (flb_input_metrics_append(ctx->ins, NULL, 0, ctx->ins->input_metrics->cmt) == -1) { flb_plg_error(ctx->ins, "Could not append metrics"); return -1; } diff --git a/plugins/in_tail/tail_config.c b/plugins/in_tail/tail_config.c index 6f32cce2024..c9d3a176bb9 100644 --- a/plugins/in_tail/tail_config.c +++ b/plugins/in_tail/tail_config.c @@ -396,19 +396,19 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, #endif #ifdef FLB_HAVE_METRICS - ctx->cmt_files_opened = cmt_counter_create(ins->cmt, + ctx->cmt_files_opened = cmt_counter_create(ins->input_metrics->cmt, "fluentbit", "input", "files_opened_total", "Total number of opened files", 1, (char *[]) {"name"}); - ctx->cmt_files_closed = cmt_counter_create(ins->cmt, + ctx->cmt_files_closed = cmt_counter_create(ins->input_metrics->cmt, "fluentbit", "input", "files_closed_total", "Total number of closed files", 1, (char *[]) {"name"}); - ctx->cmt_files_rotated = cmt_counter_create(ins->cmt, + ctx->cmt_files_rotated = cmt_counter_create(ins->input_metrics->cmt, "fluentbit", "input", "files_rotated_total", "Total number of rotated files", diff --git a/src/flb_input.c b/src/flb_input.c index 7b7390b40e8..6699c052f74 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -651,7 +651,7 @@ const char *flb_input_get_property(const char *key, #ifdef FLB_HAVE_METRICS void *flb_input_get_cmt_instance(struct flb_input_instance *ins) { - return (void *)ins->cmt; + return (void *)ins->input_metrics->cmt; } #endif @@ -743,9 +743,10 @@ void flb_input_instance_destroy(struct flb_input_instance *ins) /* Remove metrics */ #ifdef FLB_HAVE_METRICS - if (ins->cmt) { - cmt_destroy(ins->cmt); - } + // TODO Actually destroy input_metrics + // if (ins->input_metrics) { + // cmt_destroy(ins->cmt); + // } if (ins->metrics) { flb_metrics_destroy(ins->metrics); @@ -953,127 +954,46 @@ int flb_input_instance_init(struct flb_input_instance *ins, ts = cfl_time_now(); /* CMetrics */ - ins->cmt = cmt_create(); - if (!ins->cmt) { - flb_error("[input] could not create cmetrics context: %s", - flb_input_name(ins)); - return -1; - } /* - * Register generic input plugin metrics + * Set generic input plugin metrics * ------------------------------------- */ - /* fluentbit_input_bytes_total */ - ins->cmt_bytes = \ - cmt_counter_create(ins->cmt, - "fluentbit", "input", "bytes_total", - "Number of input bytes.", - 1, (char *[]) {"name"}); - cmt_counter_set(ins->cmt_bytes, ts, 0, 1, (char *[]) {name}); - - /* fluentbit_input_records_total */ - ins->cmt_records = \ - cmt_counter_create(ins->cmt, - "fluentbit", "input", "records_total", - "Number of input records.", - 1, (char *[]) {"name"}); - cmt_counter_set(ins->cmt_records, ts, 0, 1, (char *[]) {name}); - - /* fluentbit_input_ingestion_paused */ - ins->cmt_ingestion_paused = \ - cmt_gauge_create(ins->cmt, - "fluentbit", "input", - "ingestion_paused", - "Is the input paused or not?", - 1, (char *[]) {"name"}); - cmt_gauge_set(ins->cmt_ingestion_paused, ts, 0, 1, (char *[]) {name}); + cmt_counter_set(ins->input_metrics->cmt_bytes, ts, 0, 1, (char *[]) {name}); + cmt_counter_set(ins->input_metrics->cmt_records, ts, 0, 1, (char *[]) {name}); + cmt_gauge_set(ins->input_metrics->cmt_ingestion_paused, ts, 0, 1, (char *[]) {name}); /* Storage Metrics */ if (ctx->storage_metrics == FLB_TRUE) { /* fluentbit_input_storage_overlimit */ - ins->cmt_storage_overlimit = \ - cmt_gauge_create(ins->cmt, - "fluentbit", "input", - "storage_overlimit", - "Is the input memory usage overlimit ?.", - 1, (char *[]) {"name"}); - cmt_gauge_set(ins->cmt_storage_overlimit, ts, 0, 1, (char *[]) {name}); + cmt_gauge_set(ins->input_metrics->cmt_storage_overlimit, ts, 0, 1, (char *[]) {name}); /* fluentbit_input_storage_memory_bytes */ - ins->cmt_storage_memory_bytes = \ - cmt_gauge_create(ins->cmt, - "fluentbit", "input", - "storage_memory_bytes", - "Memory bytes used by the chunks.", - 1, (char *[]) {"name"}); - cmt_gauge_set(ins->cmt_storage_memory_bytes, ts, 0, 1, (char *[]) {name}); + cmt_gauge_set(ins->input_metrics->cmt_storage_memory_bytes, ts, 0, 1, (char *[]) {name}); /* fluentbit_input_storage_chunks */ - ins->cmt_storage_chunks = \ - cmt_gauge_create(ins->cmt, - "fluentbit", "input", - "storage_chunks", - "Total number of chunks.", - 1, (char *[]) {"name"}); - cmt_gauge_set(ins->cmt_storage_chunks, ts, 0, 1, (char *[]) {name}); + cmt_gauge_set(ins->input_metrics->cmt_storage_chunks, ts, 0, 1, (char *[]) {name}); /* fluentbit_input_storage_chunks_up */ - ins->cmt_storage_chunks_up = \ - cmt_gauge_create(ins->cmt, - "fluentbit", "input", - "storage_chunks_up", - "Total number of chunks up in memory.", - 1, (char *[]) {"name"}); - cmt_gauge_set(ins->cmt_storage_chunks_up, ts, 0, 1, (char *[]) {name}); + cmt_gauge_set(ins->input_metrics->cmt_storage_chunks_up, ts, 0, 1, (char *[]) {name}); /* fluentbit_input_storage_chunks_down */ - ins->cmt_storage_chunks_down = \ - cmt_gauge_create(ins->cmt, - "fluentbit", "input", - "storage_chunks_down", - "Total number of chunks down.", - 1, (char *[]) {"name"}); - cmt_gauge_set(ins->cmt_storage_chunks_down, ts, 0, 1, (char *[]) {name}); + cmt_gauge_set(ins->input_metrics->cmt_storage_chunks_down, ts, 0, 1, (char *[]) {name}); /* fluentbit_input_storage_chunks_busy */ - ins->cmt_storage_chunks_busy = \ - cmt_gauge_create(ins->cmt, - "fluentbit", "input", - "storage_chunks_busy", - "Total number of chunks in a busy state.", - 1, (char *[]) {"name"}); - cmt_gauge_set(ins->cmt_storage_chunks_busy, ts, 0, 1, (char *[]) {name}); + cmt_gauge_set(ins->input_metrics->cmt_storage_chunks_busy, ts, 0, 1, (char *[]) {name}); /* fluentbit_input_storage_chunks_busy_bytes */ - ins->cmt_storage_chunks_busy_bytes = \ - cmt_gauge_create(ins->cmt, - "fluentbit", "input", - "storage_chunks_busy_bytes", - "Total number of bytes used by chunks in a busy state.", - 1, (char *[]) {"name"}); - cmt_gauge_set(ins->cmt_storage_chunks_busy_bytes, ts, 0, 1, (char *[]) {name}); + cmt_gauge_set(ins->input_metrics->cmt_storage_chunks_busy_bytes, ts, 0, 1, (char *[]) {name}); } if (ins->storage_type == FLB_STORAGE_MEMRB) { /* fluentbit_input_memrb_dropped_chunks */ - ins->cmt_memrb_dropped_chunks = cmt_counter_create(ins->cmt, - "fluentbit", "input", - "memrb_dropped_chunks", - "Number of memrb dropped chunks.", - 1, (char *[]) {"name"}); - cmt_counter_set(ins->cmt_memrb_dropped_chunks, ts, 0, 1, (char *[]) {name}); - + cmt_counter_set(ins->input_metrics->cmt_memrb_dropped_chunks, ts, 0, 1, (char *[]) {name}); /* fluentbit_input_memrb_dropped_bytes */ - ins->cmt_memrb_dropped_bytes = cmt_counter_create(ins->cmt, - "fluentbit", "input", - "memrb_dropped_bytes", - "Number of memrb dropped bytes.", - 1, (char *[]) {"name"}); - - cmt_counter_set(ins->cmt_memrb_dropped_bytes, ts, 0, 1, (char *[]) {name}); + cmt_counter_set(ins->input_metrics->cmt_memrb_dropped_bytes, ts, 0, 1, (char *[]) {name}); } /* OLD Metrics */ @@ -1260,10 +1180,17 @@ int flb_input_init_all(struct flb_config *config) struct mk_list *head; struct flb_input_instance *ins; struct flb_input_plugin *p; + struct flb_input_metrics *metrics; /* Initialize thread-id table */ memset(&config->in_table_id, '\0', sizeof(config->in_table_id)); + metrics = flb_input_metrics_create(); + if (!metrics) { + // TODO something better + return -1; + } + /* Iterate all active input instance plugins */ mk_list_foreach_safe(head, tmp, &config->inputs) { ins = mk_list_entry(head, struct flb_input_instance, _head); @@ -1274,6 +1201,9 @@ int flb_input_init_all(struct flb_config *config) continue; } + /* Set input instance metrics */ + ins->input_metrics = metrics; + /* Initialize instance */ ret = flb_input_instance_init(ins, config); if (ret == -1) { @@ -1702,18 +1632,18 @@ int flb_input_test_pause_resume(struct flb_input_instance *ins, int sleep_second static void flb_input_ingestion_paused(struct flb_input_instance *ins) { - if (ins->cmt_ingestion_paused != NULL) { + if (ins->input_metrics->cmt_ingestion_paused != NULL) { /* cmetrics */ - cmt_gauge_set(ins->cmt_ingestion_paused, cfl_time_now(), 1, + cmt_gauge_set(ins->input_metrics->cmt_ingestion_paused, cfl_time_now(), 1, 1, (char *[]) { (char *) flb_input_name(ins)}); } } static void flb_input_ingestion_resumed(struct flb_input_instance *ins) { - if (ins->cmt_ingestion_paused != NULL) { + if (ins->input_metrics->cmt_ingestion_paused != NULL) { /* cmetrics */ - cmt_gauge_set(ins->cmt_ingestion_paused, cfl_time_now(), 0, + cmt_gauge_set(ins->input_metrics->cmt_ingestion_paused, cfl_time_now(), 0, 1, (char *[]) {(char *) flb_input_name(ins)}); } } @@ -2016,3 +1946,113 @@ int flb_input_downstream_set(struct flb_downstream *stream, return 0; } + +struct flb_input_metrics *flb_input_metrics_create() +{ + struct flb_input_metrics *metrics = flb_calloc(1, sizeof(struct flb_input_metrics)); + + metrics->cmt = cmt_create(); + if (!metrics->cmt) { + flb_error("[input] could not create input plugin cmetrics context"); + return NULL; + } + + /* + * Register generic input plugin metrics + * ------------------------------------- + */ + + /* fluentbit_input_bytes_total */ + metrics->cmt_bytes = \ + cmt_counter_create(metrics->cmt, + "fluentbit", "input", "bytes_total", + "Number of input bytes.", + 1, (char *[]) {"name"}); + + /* fluentbit_input_records_total */ + metrics->cmt_records = \ + cmt_counter_create(metrics->cmt, + "fluentbit", "input", "records_total", + "Number of input records.", + 1, (char *[]) {"name"}); + + /* fluentbit_input_ingestion_paused */ + metrics->cmt_ingestion_paused = \ + cmt_gauge_create(metrics->cmt, + "fluentbit", "input", + "ingestion_paused", + "Is the input paused or not?", + 1, (char *[]) {"name"}); + + /* fluentbit_input_storage_overlimit */ + metrics->cmt_storage_overlimit = \ + cmt_gauge_create(metrics->cmt, + "fluentbit", "input", + "storage_overlimit", + "Is the input memory usage overlimit ?.", + 1, (char *[]) {"name"}); + + /* fluentbit_input_storage_memory_bytes */ + metrics->cmt_storage_memory_bytes = \ + cmt_gauge_create(metrics->cmt, + "fluentbit", "input", + "storage_memory_bytes", + "Memory bytes used by the chunks.", + 1, (char *[]) {"name"}); + + /* fluentbit_input_storage_chunks */ + metrics->cmt_storage_chunks = \ + cmt_gauge_create(metrics->cmt, + "fluentbit", "input", + "storage_chunks", + "Total number of chunks.", + 1, (char *[]) {"name"}); + + /* fluentbit_input_storage_chunks_up */ + metrics->cmt_storage_chunks_up = \ + cmt_gauge_create(metrics->cmt, + "fluentbit", "input", + "storage_chunks_up", + "Total number of chunks up in memory.", + 1, (char *[]) {"name"}); + + /* fluentbit_input_storage_chunks_down */ + metrics->cmt_storage_chunks_down = \ + cmt_gauge_create(metrics->cmt, + "fluentbit", "input", + "storage_chunks_down", + "Total number of chunks down.", + 1, (char *[]) {"name"}); + + /* fluentbit_input_storage_chunks_busy */ + metrics->cmt_storage_chunks_busy = \ + cmt_gauge_create(metrics->cmt, + "fluentbit", "input", + "storage_chunks_busy", + "Total number of chunks in a busy state.", + 1, (char *[]) {"name"}); + + /* fluentbit_input_storage_chunks_busy_bytes */ + metrics->cmt_storage_chunks_busy_bytes = \ + cmt_gauge_create(metrics->cmt, + "fluentbit", "input", + "storage_chunks_busy_bytes", + "Total number of bytes used by chunks in a busy state.", + 1, (char *[]) {"name"}); + + /* fluentbit_input_memrb_dropped_chunks */ + metrics->cmt_memrb_dropped_chunks = cmt_counter_create(metrics->cmt, + "fluentbit", "input", + "memrb_dropped_chunks", + "Number of memrb dropped chunks.", + 1, (char *[]) {"name"}); + + /* fluentbit_input_memrb_dropped_bytes */ + metrics->cmt_memrb_dropped_bytes = cmt_counter_create(metrics->cmt, + "fluentbit", "input", + "memrb_dropped_bytes", + "Number of memrb dropped bytes.", + 1, (char *[]) {"name"}); + + return metrics; +} \ No newline at end of file diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index ca02e6fca68..c016ad25a78 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -724,11 +724,11 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, ts = cfl_time_now(); /* fluentbit_input_records_total */ - cmt_counter_add(in->cmt_records, ts, ic->total_records, + cmt_counter_add(in->input_metrics->cmt_records, ts, ic->total_records, 1, (char *[]) {(char *) flb_input_name(in)}); /* fluentbit_input_bytes_total */ - cmt_counter_add(in->cmt_bytes, ts, buf_size, + cmt_counter_add(in->input_metrics->cmt_bytes, ts, buf_size, 1, (char *[]) {(char *) flb_input_name(in)}); /* OLD metrics */ @@ -1486,10 +1486,10 @@ static int input_chunk_append_raw(struct flb_input_instance *in, name = (char *) flb_input_name(in); /* update counters */ - cmt_counter_add(in->cmt_memrb_dropped_chunks, ts, + cmt_counter_add(in->input_metrics->cmt_memrb_dropped_chunks, ts, dropped_chunks, 1, (char *[]) {name}); - cmt_counter_add(in->cmt_memrb_dropped_bytes, ts, + cmt_counter_add(in->input_metrics->cmt_memrb_dropped_bytes, ts, dropped_bytes, 1, (char *[]) {name}); } @@ -1626,11 +1626,11 @@ static int input_chunk_append_raw(struct flb_input_instance *in, ts = cfl_time_now(); /* fluentbit_input_records_total */ - cmt_counter_add(in->cmt_records, ts, ic->added_records, + cmt_counter_add(in->input_metrics->cmt_records, ts, ic->added_records, 1, (char *[]) {(char *) flb_input_name(in)}); /* fluentbit_input_bytes_total */ - cmt_counter_add(in->cmt_bytes, ts, buf_size, + cmt_counter_add(in->input_metrics->cmt_bytes, ts, buf_size, 1, (char *[]) {(char *) flb_input_name(in)}); /* OLD api */ diff --git a/src/flb_metrics_exporter.c b/src/flb_metrics_exporter.c index 2e1b98f21a4..fefe0fa7366 100644 --- a/src/flb_metrics_exporter.c +++ b/src/flb_metrics_exporter.c @@ -300,14 +300,16 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx) /* Pipeline metrics: input, filters, outputs */ mk_list_foreach(head, &ctx->inputs) { + // TODO Actually garbage just need the POC done i = mk_list_entry(head, struct flb_input_instance, _head); - ret = cmt_cat(cmt, i->cmt); + ret = cmt_cat(cmt, i->input_metrics->cmt); if (ret == -1) { flb_error("[metrics exporter] could not append metrics from %s", flb_input_name(i)); cmt_destroy(cmt); return NULL; } + break; } mk_list_foreach(head, &ctx->filters) { diff --git a/src/flb_storage.c b/src/flb_storage.c index 4148be66781..7d9d84779bb 100644 --- a/src/flb_storage.c +++ b/src/flb_storage.c @@ -205,7 +205,7 @@ static void metrics_append_input(msgpack_packer *mp_pck, } if (ret == FLB_TRUE) { /* cmetrics */ - cmt_gauge_set(i->cmt_storage_overlimit, ts, 1, + cmt_gauge_set(i->input_metrics->cmt_storage_overlimit, ts, 1, 1, (char *[]) {name}); /* old code */ @@ -213,7 +213,7 @@ static void metrics_append_input(msgpack_packer *mp_pck, } else { /* cmetrics */ - cmt_gauge_set(i->cmt_storage_overlimit, ts, 0, + cmt_gauge_set(i->input_metrics->cmt_storage_overlimit, ts, 0, 1, (char *[]) {name}); /* old code */ @@ -221,7 +221,7 @@ static void metrics_append_input(msgpack_packer *mp_pck, } /* fluentbit_storage_memory_bytes */ - cmt_gauge_set(i->cmt_storage_memory_bytes, ts, i->mem_chunks_size, + cmt_gauge_set(i->input_metrics->cmt_storage_memory_bytes, ts, i->mem_chunks_size, 1, (char *[]) {name}); /* status['mem_size'] */ @@ -251,7 +251,7 @@ static void metrics_append_input(msgpack_packer *mp_pck, */ /* cmetrics */ - cmt_gauge_set(i->cmt_storage_chunks, ts, total_chunks, + cmt_gauge_set(i->input_metrics->cmt_storage_chunks, ts, total_chunks, 1, (char *[]) {name}); @@ -299,7 +299,7 @@ static void metrics_append_input(msgpack_packer *mp_pck, } /* fluentbit_storage_chunks_up */ - cmt_gauge_set(i->cmt_storage_chunks_up, ts, up, + cmt_gauge_set(i->input_metrics->cmt_storage_chunks_up, ts, up, 1, (char *[]) {name}); /* chunks['up'] */ @@ -308,7 +308,7 @@ static void metrics_append_input(msgpack_packer *mp_pck, msgpack_pack_uint64(mp_pck, up); /* fluentbit_storage_chunks_down */ - cmt_gauge_set(i->cmt_storage_chunks_down, ts, down, + cmt_gauge_set(i->input_metrics->cmt_storage_chunks_down, ts, down, 1, (char *[]) {name}); /* chunks['down'] */ @@ -317,7 +317,7 @@ static void metrics_append_input(msgpack_packer *mp_pck, msgpack_pack_uint64(mp_pck, down); /* fluentbit_storage_chunks_busy */ - cmt_gauge_set(i->cmt_storage_chunks_busy, ts, busy, + cmt_gauge_set(i->input_metrics->cmt_storage_chunks_busy, ts, busy, 1, (char *[]) {name}); /* chunks['busy'] */ @@ -326,7 +326,7 @@ static void metrics_append_input(msgpack_packer *mp_pck, msgpack_pack_uint64(mp_pck, busy); /* fluentbit_storage_chunks_busy_size */ - cmt_gauge_set(i->cmt_storage_chunks_busy_bytes, ts, busy_size, + cmt_gauge_set(i->input_metrics->cmt_storage_chunks_busy_bytes, ts, busy_size, 1, (char *[]) {name}); /* chunks['busy_size'] */