Skip to content

Commit

Permalink
Add "retention" feature allowing idle metrics to expire
Browse files Browse the repository at this point in the history
Signed-off-by: Philipp Hossner <[email protected]>
  • Loading branch information
phihos committed Aug 1, 2022
1 parent ef53c83 commit aa589ce
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 26 deletions.
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ For details of each metric type, see [Prometheus documentation](http://prometheu
- `type`: metric type (required)
- `desc`: description of this metric (required)
- `key`: key name of record for instrumentation (**optional**)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)

If key is empty, the metric values is treated as 1, so the counter increments by 1 on each record regardless of contents of the record.
Expand All @@ -310,6 +312,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
- `type`: metric type (required)
- `desc`: description of metric (required)
- `key`: key name of record for instrumentation (required)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)

### summary type
Expand All @@ -332,6 +336,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
- `type`: metric type (required)
- `desc`: description of metric (required)
- `key`: key name of record for instrumentation (required)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)

### histogram type
Expand All @@ -356,6 +362,8 @@ If key is empty, the metric values is treated as 1, so the counter increments by
- `desc`: description of metric (required)
- `key`: key name of record for instrumentation (required)
- `buckets`: buckets of record for instrumentation (optional)
- `retention`: time in seconds to remove a metric after not being updated (optional). See [Retention](#retention)
- `retention_check_interval`: time in seconds to check for expired metrics (optional). Has no effect when `retention` not set. See [Retention](#retention)
- `<labels>`: additional labels for this metric (optional). See [Labels](#labels)

## Labels
Expand Down Expand Up @@ -430,6 +438,33 @@ Prometheus output/filter plugin can have multiple metric section. Top-level labe

In this case, `message_foo_counter` has `tag`, `hostname`, `key` and `data_type` labels.

## Retention

By default metrics with all encountered label combinations are preserved until the next restart of fluentd.
Even if a label combination did not receive any update for a long time.
That behavior is not always desirable e.g. when the contents of of fields change for good and the metric becomes idle.
For these metrics you can set `retention` and `retention_check_interval` like this:

```
<metric>
name message_foo_counter
type counter
desc The total number of foo in message.
key foo
retention 3600 # 1h
retention_check_interval 1800 # 30m
<labels>
bar ${bar}
</labels>
</metric>
```

If `${bar}` was `baz` one time but after that no records with that value were processed, then after one hour the metric
`foo{bar="baz"}` might be removed.
When this actually happens depends on `retention_check_interval` (default 60).
It causes a background thread to check every 30 minutes for expired metrics.
So worst case the metrics are removed 30 minutes after expiration.
You can set this value as low as `1`, but that may put more stress on your CPU.

## Try plugin with nginx

Expand Down
13 changes: 13 additions & 0 deletions lib/fluent/plugin/filter_prometheus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class PrometheusFilter < Fluent::Plugin::Filter
include Fluent::Plugin::PrometheusLabelParser
include Fluent::Plugin::Prometheus

helpers :thread

def initialize
super
@registry = ::Prometheus::Client.registry
Expand All @@ -22,6 +24,17 @@ def configure(conf)
@metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels)
end

def start
super
Fluent::Plugin::Prometheus.start_retention_threads(
@metrics,
@registry,
method(:thread_create),
method(:thread_current_running?),
@log
)
end

def filter(tag, time, record)
instrument_single(tag, time, record, @metrics)
record
Expand Down
13 changes: 13 additions & 0 deletions lib/fluent/plugin/out_prometheus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class PrometheusOutput < Fluent::Plugin::Output
include Fluent::Plugin::PrometheusLabelParser
include Fluent::Plugin::Prometheus

helpers :thread

def initialize
super
@registry = ::Prometheus::Client.registry
Expand All @@ -22,6 +24,17 @@ def configure(conf)
@metrics = Fluent::Plugin::Prometheus.parse_metrics_elements(conf, @registry, labels)
end

def start
super
Fluent::Plugin::Prometheus.start_retention_threads(
@metrics,
@registry,
method(:thread_create),
method(:thread_current_running?),
@log
)
end

def process(tag, es)
instrument(tag, es, @metrics)
end
Expand Down
149 changes: 123 additions & 26 deletions lib/fluent/plugin/prometheus.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'prometheus/client'
require 'prometheus/client/formats/text'
require 'fluent/plugin/prometheus/placeholder_expander'
require 'fluent/plugin/prometheus/data_store'

module Fluent
module Plugin
Expand Down Expand Up @@ -81,6 +82,17 @@ def self.parse_metrics_elements(conf, registry, labels = {})
metrics
end

def self.start_retention_threads(metrics, registry, thread_create, thread_running, log)
metrics.select { |metric| metric.has_retention? }.each do |metric|
thread_create.call("prometheus_retention_#{metric.name}".to_sym) do
while thread_running.call()
metric.remove_expired_metrics(registry, log)
sleep(metric.retention_check_interval)
end
end
end
end

def self.placeholder_expander(log)
Fluent::Plugin::Prometheus::ExpandBuilder.new(log: log)
end
Expand All @@ -97,6 +109,11 @@ def stringify_keys(hash_to_stringify)
end.to_h
end

def initialize
super
::Prometheus::Client.config.data_store = Fluent::Plugin::Prometheus::DataStore.new
end

def configure(conf)
super
@placeholder_values = {}
Expand Down Expand Up @@ -151,6 +168,8 @@ class Metric
attr_reader :name
attr_reader :key
attr_reader :desc
attr_reader :retention
attr_reader :retention_check_interval

def initialize(element, registry, labels)
['name', 'desc'].each do |key|
Expand All @@ -162,6 +181,11 @@ def initialize(element, registry, labels)
@name = element['name']
@key = element['key']
@desc = element['desc']
@retention = element['retention'].to_i
@retention_check_interval = element.fetch('retention_check_interval', 60).to_i
if has_retention?
@last_modified_store = LastModifiedStore.new
end

@base_labels = Fluent::Plugin::Prometheus.parse_labels_elements(element)
@base_labels = labels.merge(@base_labels)
Expand Down Expand Up @@ -192,6 +216,74 @@ def self.get(registry, name, type, docstring)

metric
end

def set_value?(value)
if value
return true
end
false
end

def instrument(record, expander)
value = self.value(record)
if self.set_value?(value)
labels = labels(record, expander)
set_value(value, labels)
if has_retention?
@last_modified_store.set_last_updated(labels)
end
end
end

def has_retention?
@retention > 0
end

def remove_expired_metrics(registry, log)
if has_retention?
metric = registry.get(@name)

expiration_time = Time.now - @retention
expired_label_sets = @last_modified_store.get_labels_not_modified_since(expiration_time)

expired_label_sets.each { |expired_label_set|
log.debug "Metric #{@name} with labels #{expired_label_set} expired. Removing..."
metric.remove(expired_label_set) # this method is supplied by the require at the top of this method
@last_modified_store.remove(expired_label_set)
}
else
log.warn('remove_expired_metrics should not be called when retention is not set for this metric!')
end
end

class LastModifiedStore
def initialize
@internal_store = Hash.new
@lock = Monitor.new
end

def synchronize
@lock.synchronize { yield }
end

def set_last_updated(labels)
synchronize do
@internal_store[labels] = Time.now
end
end

def remove(labels)
synchronize do
@internal_store.delete(labels)
end
end

def get_labels_not_modified_since(time)
synchronize do
@internal_store.select { |k, v| v < time }.keys
end
end
end
end

class Gauge < Metric
Expand All @@ -208,16 +300,17 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
def value(record)
if @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
end
if value
@gauge.set(value, labels: labels(record, expander))
@key.call(record)
end
end

def set_value(value, labels)
@gauge.set(value, labels: labels)
end
end

class Counter < Metric
Expand All @@ -230,20 +323,22 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
# use record value of the key if key is specified, otherwise just increment
def value(record)
if @key.nil?
value = 1
1
elsif @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
@key.call(record)
end
end

# ignore if record value is nil
return if value.nil?
def set_value?(value)
!value.nil?
end

@counter.increment(by: value, labels: labels(record, expander))
def set_value(value, labels)
@counter.increment(by: value, labels: labels)
end
end

Expand All @@ -261,16 +356,17 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
def value(record)
if @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
end
if value
@summary.observe(value, labels: labels(record, expander))
@key.call(record)
end
end

def set_value(value, labels)
@summary.observe(value, labels: labels)
end
end

class Histogram < Metric
Expand All @@ -294,16 +390,17 @@ def initialize(element, registry, labels)
end
end

def instrument(record, expander)
def value(record)
if @key.is_a?(String)
value = record[@key]
record[@key]
else
value = @key.call(record)
end
if value
@histogram.observe(value, labels: labels(record, expander))
@key.call(record)
end
end

def set_value(value, labels)
@histogram.observe(value, labels: labels)
end
end
end
end
Expand Down
Loading

0 comments on commit aa589ce

Please sign in to comment.