From 25d060002243cb41067f92fbc8aa232c100955b7 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 11 Feb 2020 00:53:13 -0800 Subject: [PATCH] filter_replace_character: Add plugin to replace a generic character in key names --- CMakeLists.txt | 1 + plugins/CMakeLists.txt | 2 +- .../filter_replace_character/CMakeLists.txt | 5 + plugins/filter_replace_character/replace.c | 233 ++++++++++++++++++ plugins/filter_replace_character/replace.h | 28 +++ 5 files changed, 268 insertions(+), 1 deletion(-) create mode 100644 plugins/filter_replace_character/CMakeLists.txt create mode 100644 plugins/filter_replace_character/replace.c create mode 100644 plugins/filter_replace_character/replace.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 00f985ce041..2a67a48f2c2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -155,6 +155,7 @@ option(FLB_OUT_KAFKA "Enable Kafka output plugin" No) option(FLB_OUT_KAFKA_REST "Enable Kafka Rest output plugin" Yes) option(FLB_FILTER_AWS "Enable aws filter" Yes) option(FLB_FILTER_EXPECT "Enable expect filter" Yes) +option(FLB_FILTER_REPLACE_CHAR "Enable replace_character filter" Yes) option(FLB_FILTER_GREP "Enable grep filter" Yes) option(FLB_FILTER_MODIFY "Enable modify filter" Yes) option(FLB_FILTER_STDOUT "Enable stdout filter" Yes) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index e11393149bf..4af302bd0a5 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -200,6 +200,7 @@ REGISTER_OUT_PLUGIN("out_gelf") # ======= REGISTER_FILTER_PLUGIN("filter_aws") REGISTER_FILTER_PLUGIN("filter_record_modifier") +REGISTER_FILTER_PLUGIN("filter_replace_character") REGISTER_FILTER_PLUGIN("filter_throttle") if(FLB_REGEX) @@ -221,7 +222,6 @@ endif() REGISTER_FILTER_PLUGIN("filter_stdout") - # Register external input and output plugins if(EXT_IN_PLUGINS) string(REPLACE "," ";" plugins ${EXT_IN_PLUGINS}) diff --git a/plugins/filter_replace_character/CMakeLists.txt b/plugins/filter_replace_character/CMakeLists.txt new file mode 100644 index 00000000000..551d96e1b84 --- /dev/null +++ b/plugins/filter_replace_character/CMakeLists.txt @@ -0,0 +1,5 @@ +set(src + replace.c + ) + +FLB_PLUGIN(filter_replace_character "${src}" "") diff --git a/plugins/filter_replace_character/replace.c b/plugins/filter_replace_character/replace.c new file mode 100644 index 00000000000..f56d25bef25 --- /dev/null +++ b/plugins/filter_replace_character/replace.c @@ -0,0 +1,233 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "replace.h" + +static int cb_replace_init(struct flb_filter_instance *f_ins, + struct flb_config *config, + void *data) +{ + struct flb_filter_replace_char *ctx = NULL; + struct mk_list *head; + struct flb_kv *kv; + char *tmp; + char find; + char replace; + (void) data; + + /* Iterate all filter properties */ + mk_list_foreach(head, &f_ins->properties) { + kv = mk_list_entry(head, struct flb_kv, _head); + + // TODO: better names for these options... + if (strcasecmp(kv->key, "find") == 0) { + tmp = kv->val; + if (strlen(tmp) != 1) { + flb_error("[filter_replace_character] 'find' should be a single" + " character"); + return -1; + } + find = tmp[0]; + } + if (strcasecmp(kv->key, "replace") == 0) { + tmp = kv->val; + if (strlen(tmp) != 1) { + flb_error("[filter_replace_character] 'replace' should be a " + "single character"); + return -1; + } + replace = tmp[0]; + } + } + + /* Create context */ + ctx = flb_calloc(1, sizeof(struct flb_filter_replace_char)); + if (!ctx) { + flb_errno(); + return -1; + } + + ctx->find = find; + ctx->replace = replace; + + flb_filter_set_context(f_ins, ctx); + + return 0; +} + +static int cb_replace_filter(const void *data, size_t bytes, + const char *tag, int tag_len, + void **out_buf, size_t *out_size, + struct flb_filter_instance *f_ins, + void *context, + struct flb_config *config) +{ + struct flb_filter_replace_char *ctx = context; + (void) f_ins; + (void) config; + size_t off = 0; + int i = 0; + int j = 0; + int ret; + struct flb_time tm; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + msgpack_unpacked result; + msgpack_object *obj; + msgpack_object key; + msgpack_object_kv *kv; + char *key_str = NULL; + size_t key_str_size = 0; + int modify = FLB_FALSE; + char *key_buf = NULL; + size_t key_buf_size = 256; + + key_buf = flb_malloc(key_buf_size + 1); + if (!key_buf) { + flb_errno(); + return FLB_FILTER_NOTOUCH; + } + + /* Create temporary msgpack buffer */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + + /* Iterate over each item */ + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, data, bytes, &off) + == MSGPACK_UNPACK_SUCCESS) { + /* + * Each record is a msgpack array [timestamp, map] of the + * timestamp and record map. We 'unpack' each record, and then re-pack + * it with the keys modified. + */ + + if (result.data.type != MSGPACK_OBJECT_ARRAY) { + continue; + } + + /* unpack the array of [timestamp, map] */ + flb_time_pop_from_msgpack(&tm, &result, &obj); + + /* obj should now be the record map */ + if (obj->type != MSGPACK_OBJECT_MAP) { + continue; + } + + /* re-pack the array into a new buffer */ + msgpack_pack_array(&tmp_pck, 2); + flb_time_append_to_msgpack(&tm, &tmp_pck, 0); + + /* new record map size is old size + the new keys we will add */ + msgpack_pack_map(&tmp_pck, obj->via.map.size); + + /* iterate through the old record map and add it to the new buffer */ + kv = obj->via.map.ptr; + for(i=0; i < obj->via.map.size; i++) { + modify = FLB_FALSE; + key = (kv+i)->key + if (k->type == MSGPACK_OBJECT_BIN) { + key_str = (char *) k->via.bin.ptr; + key_str_size = k->via.bin.size; + modify = FLB_TRUE; + } + else if (k->type == MSGPACK_OBJECT_STR) { + key_str = (char *) k->via.str.ptr; + key_str_size = k->via.str.size; + modify = FLB_TRUE; + } + if (modify == FLB_TRUE) { + /* increase key_buf if it is too small */ + if (key_str_size > key_buf_size) { + key_buf_size = key_str_size; + key_buf = flb_malloc(key_buf_size + 1); + if (!key_buf) { + flb_errno(); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&tmp_sbuf); + return FLB_FILTER_NOTOUCH; + } + } + + /* copy to temporary buffer */ + memcpy(key_buf, key_str, key_str_size); + key_buf[key_str_size] = '\0'; + for (j=0; jfind) { + key_buf[j] = ctx->replace; + } + } + /* Append the new key */ + msgpack_pack_str(tmp_pck, key_str_size); + msgpack_pack_str_body(tmp_pck, key_buf, key_str_size); + } else { + msgpack_pack_object(&tmp_pck, (kv+i)->key); + } + msgpack_pack_object(&tmp_pck, (kv+i)->val); + } + } + msgpack_unpacked_destroy(&result); + + /* link new buffers */ + *out_buf = tmp_sbuf.data; + *out_size = tmp_sbuf.size; + return FLB_FILTER_MODIFIED; +} + +static void flb_filter_replace_char_destroy(struct flb_filter_replace_char *ctx) +{ + if (!ctx) { + return; + } + + flb_free(ctx); +} + +static int cb_replace_exit(void *data, struct flb_config *config) +{ + struct flb_filter_replace_char *ctx = data; + + if (ctx != NULL) { + flb_filter_replace_char_destroy(ctx); + } + return 0; +} + +struct flb_filter_plugin filter_aws_plugin = { + .name = "replace_character", + .description = "Replace characters in key names", + .cb_init = cb_replace_init, + .cb_filter = cb_replace_filter, + .cb_exit = cb_replace_exit, + .flags = 0 +}; diff --git a/plugins/filter_replace_character/replace.h b/plugins/filter_replace_character/replace.h new file mode 100644 index 00000000000..6cc0c6e1747 --- /dev/null +++ b/plugins/filter_replace_character/replace.h @@ -0,0 +1,28 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_FILTER_REPLACE_H +#define FLB_FILTER_REPLACE_H + +struct flb_filter_replace_char { + char find; + char replace; +}; + +#endif