Skip to content

Commit

Permalink
out_forward: add new 'add_option' configuration property
Browse files Browse the repository at this point in the history
Forward protocol supports metadata through the 'options' feature. This information
is appended to the whole payload and normally is handled only by the Forward plugin
implementation.

This plugin extends Fluent Bit Forward configuration where now is possible to set arbitrary
options key/value pairs (strings) for very advanced use cases. Usage:

[OUTPUT]
    name        forward
    match       *
    add_option  key1 val1
    add_option  key2 val2

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Sep 5, 2023
1 parent c625ad7 commit 1490882
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 1 deletion.
16 changes: 15 additions & 1 deletion plugins/out_forward/forward.c
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,13 @@ static int config_set_properties(struct flb_upstream_node *node,
fc->send_options = flb_utils_bool(tmp);
}

/* add_option -> extra_options: if the user has defined 'add_option'
* we need to enable the 'send_options' flag
*/
if (fc->extra_options && mk_list_size(fc->extra_options) > 0) {
fc->send_options = FLB_TRUE;
}

/* require ack response (implies send_options) */
tmp = config_get_property("require_ack_response", node, ctx);
if (tmp) {
Expand Down Expand Up @@ -1785,7 +1792,14 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_BOOL, "fluentd_compat", "false",
0, FLB_TRUE, offsetof(struct flb_forward_config, fluentd_compat),
"Send cmetrics and ctreaces with Fluentd compatible format"
"Send metrics and traces with Fluentd compatible format"
},

{
FLB_CONFIG_MAP_SLIST_2, "add_option", NULL,
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_forward_config, extra_options),
"Set an extra Forward protocol option. This is an advance feature, use it only for "
"very specific use-cases."
},

/* EOF */
Expand Down
4 changes: 4 additions & 0 deletions plugins/out_forward/forward.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ struct flb_forward_config {
int time_as_integer; /* Use backward compatible timestamp ? */
int fluentd_compat; /* Use Fluentd compatible payload for
* metrics and ctraces */

/* add extra options to the Forward payload (advanced) */
struct mk_list *extra_options;

int fwd_retain_metadata; /* Do not drop metadata in forward mode */

/* config */
Expand Down
18 changes: 18 additions & 0 deletions plugins/out_forward/forward_format.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ static int append_options(struct flb_forward *ctx,
char *chunk = NULL;
uint8_t checksum[64];
int result;
struct mk_list *head;
struct flb_config_map_val *mv;
struct flb_mp_map_header mh;
struct flb_slist_entry *eopt_key;
struct flb_slist_entry *eopt_val;

/* options is map, use the dynamic map type */
flb_mp_map_header_init(&mh, mp_pck);
Expand Down Expand Up @@ -152,6 +156,20 @@ static int append_options(struct flb_forward *ctx,
msgpack_pack_str_body(mp_pck, "fluent_signal", 13);
msgpack_pack_int64(mp_pck, event_type);

/* process 'extra_option(s)' */
if (fc->extra_options) {
flb_config_map_foreach(head, mv, fc->extra_options) {
eopt_key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
eopt_val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);

flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, flb_sds_len(eopt_key->str));
msgpack_pack_str_body(mp_pck, eopt_key->str, flb_sds_len(eopt_key->str));
msgpack_pack_str(mp_pck, flb_sds_len(eopt_val->str));
msgpack_pack_str_body(mp_pck, eopt_val->str, flb_sds_len(eopt_val->str));
}
}

if (metadata != NULL &&
metadata->type == MSGPACK_OBJECT_MAP &&
metadata->via.map.size > 0) {
Expand Down

0 comments on commit 1490882

Please sign in to comment.