Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_syslog: Provide appending source address parameter #7651

Merged
merged 12 commits into from
Jul 24, 2023
5 changes: 5 additions & 0 deletions plugins/in_syslog/syslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_syslog, raw_message_key),
"Key where the raw message will be preserved"
},
{
FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL,
0, FLB_TRUE, offsetof(struct flb_syslog, source_address_key),
"Key where the source address will be injected"
},


/* EOF */
Expand Down
1 change: 1 addition & 0 deletions plugins/in_syslog/syslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct flb_syslog {
flb_sds_t parser_name;
struct flb_parser *parser;
flb_sds_t raw_message_key;
flb_sds_t source_address_key;

int dgram_mode_flag;
int collector_id;
Expand Down
5 changes: 1 addition & 4 deletions plugins/in_syslog/syslog_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,11 @@ int syslog_dgram_conn_event(void *data)
struct flb_connection *connection;
int bytes;
struct syslog_conn *conn;
struct flb_syslog *ctx;

connection = (struct flb_connection *) data;

conn = connection->user_data;

ctx = conn->ctx;

bytes = flb_io_net_read(connection,
(void *) &conn->buf_data[conn->buf_len],
conn->buf_size - 1);
Expand All @@ -144,7 +141,7 @@ int syslog_dgram_conn_event(void *data)
conn->buf_data[bytes] = '\0';
conn->buf_len = bytes;

syslog_prot_process_udp(conn->buf_data, conn->buf_len, ctx);
syslog_prot_process_udp(conn);
}
else {
flb_errno();
Expand Down
162 changes: 131 additions & 31 deletions plugins/in_syslog/syslog_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_pack.h>

#include "syslog.h"
#include "syslog_conn.h"
Expand All @@ -39,68 +40,123 @@ static int append_raw_message_to_record_data(char **result_buffer,
char *raw_message_buffer,
size_t raw_message_size)
{
int i;
int result;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize result to FLB_MAP_NOT_MODIFIED, othwewise if message_key_name is NULL the check in line 87 is accessing garbage which considering that the value of FLB_MAP_EXPAND_SUCCESS is 0 could cause this function to return garbage.

size_t unpacker_offset;
char *modified_data_buffer;
int modified_data_size;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv raw_message_entry;
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
msgpack_unpacked unpacked_buffer;
*result_buffer = NULL;
*result_size = 0;
modified_data_buffer = NULL;

unpacker_offset = 0;
msgpack_unpacked_init(&unpacked_buffer);
result = msgpack_unpack_next(&unpacked_buffer,
base_object_buffer,
base_object_size,
&unpacker_offset);
msgpack_sbuffer_init(&mp_sbuf);

if (result != MSGPACK_UNPACK_SUCCESS) {
return -1;
if (raw_message_key_name != NULL) {
new_map_entries[0] = &raw_message_entry;

raw_message_entry.key.type = MSGPACK_OBJECT_STR;
raw_message_entry.key.via.str.size = flb_sds_len(raw_message_key_name);
raw_message_entry.key.via.str.ptr = raw_message_key_name;

raw_message_entry.val.type = MSGPACK_OBJECT_BIN;
raw_message_entry.val.via.bin.size = raw_message_size;
raw_message_entry.val.via.bin.ptr = raw_message_buffer;

result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
}

if (unpacker_offset != base_object_size) {
msgpack_unpacked_destroy(&unpacked_buffer);
return -2;
if (modified_data_buffer != NULL) {
msgpack_sbuffer_write(&mp_sbuf, modified_data_buffer, modified_data_size);
}
else {
msgpack_sbuffer_write(&mp_sbuf, base_object_buffer, base_object_size);
}

if (unpacked_buffer.data.type != MSGPACK_OBJECT_MAP) {
msgpack_unpacked_destroy(&unpacked_buffer);
return -3;
*result_buffer = mp_sbuf.data;
*result_size = mp_sbuf.size;

if (modified_data_buffer != NULL) {
flb_free(modified_data_buffer);
}

msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
return result;
}

static int append_source_address_to_record_data(char **result_buffer,
size_t *result_size,
flb_sds_t source_address_key,
char *source_address,
char *base_object_buffer,
size_t base_object_size)
{
int result;
char *modified_data_buffer;
int modified_data_size;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv source_address_entry;
msgpack_sbuffer mp_sbuf;
*result_buffer = NULL;
*result_size = 0;
modified_data_buffer = NULL;

msgpack_pack_map(&mp_pck, unpacked_buffer.data.via.map.size + 1);
msgpack_sbuffer_init(&mp_sbuf);

for (i = 0; i < unpacked_buffer.data.via.map.size; i++) {
msgpack_pack_object(&mp_pck, unpacked_buffer.data.via.map.ptr[i].key);
msgpack_pack_object(&mp_pck, unpacked_buffer.data.via.map.ptr[i].val);
if (source_address_key != NULL) {
new_map_entries[0] = &source_address_entry;

source_address_entry.key.type = MSGPACK_OBJECT_STR;
source_address_entry.key.via.str.size = strlen(source_address_key);
source_address_entry.key.via.str.ptr = source_address_key;

source_address_entry.val.type = MSGPACK_OBJECT_STR;
source_address_entry.val.via.bin.size = strlen(source_address);
source_address_entry.val.via.bin.ptr = source_address;

result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
}

msgpack_pack_str(&mp_pck, flb_sds_len(raw_message_key_name));
msgpack_pack_str_body(&mp_pck, raw_message_key_name, flb_sds_len(raw_message_key_name));
msgpack_pack_str(&mp_pck, raw_message_size);
msgpack_pack_str_body(&mp_pck, raw_message_buffer, raw_message_size);
if (modified_data_buffer != NULL) {
msgpack_sbuffer_write(&mp_sbuf, modified_data_buffer, modified_data_size);
}
else {
msgpack_sbuffer_write(&mp_sbuf, base_object_buffer, base_object_size);
}

*result_buffer = mp_sbuf.data;
*result_size = mp_sbuf.size;

msgpack_unpacked_destroy(&unpacked_buffer);
if (modified_data_buffer != NULL) {
flb_free(modified_data_buffer);
}

return result;
}

static inline int pack_line(struct flb_syslog *ctx,
struct flb_time *time,
struct flb_connection *connection,
char *data, size_t data_size,
char *raw_data, size_t raw_data_size)
{
char *modified_data_buffer;
size_t modified_data_size;
char *appended_address_buffer;
size_t appended_address_size;
int result;
char *source_address;

source_address = NULL;
modified_data_buffer = NULL;
appended_address_buffer = NULL;

if (ctx->raw_message_key != NULL) {
result = append_raw_message_to_record_data(&modified_data_buffer,
Expand All @@ -116,14 +172,44 @@ static inline int pack_line(struct flb_syslog *ctx,
}
}

if (ctx->source_address_key != NULL) {
source_address = flb_connection_get_remote_address(connection);
if (source_address != NULL) {
if (modified_data_buffer != NULL) {
result = append_source_address_to_record_data(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
source_address,
modified_data_buffer,
modified_data_size);
}
else {
result = append_source_address_to_record_data(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
source_address,
data,
data_size);
}

if (result != 0) {
flb_plg_debug(ctx->ins, "error appending source_address : %d", result);
}
}
}

result = flb_log_event_encoder_begin_record(ctx->log_encoder);

if (result == FLB_EVENT_ENCODER_SUCCESS) {
result = flb_log_event_encoder_set_timestamp(ctx->log_encoder, time);
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
if (modified_data_buffer != NULL) {
if (appended_address_buffer != NULL) {
result = flb_log_event_encoder_set_body_from_raw_msgpack(
ctx->log_encoder, appended_address_buffer, appended_address_size);
}
else if (modified_data_buffer != NULL) {
result = flb_log_event_encoder_set_body_from_raw_msgpack(
ctx->log_encoder, modified_data_buffer, modified_data_size);
}
Expand Down Expand Up @@ -154,6 +240,9 @@ static inline int pack_line(struct flb_syslog *ctx,
if (modified_data_buffer != NULL) {
flb_free(modified_data_buffer);
}
if (appended_address_buffer != NULL) {
flb_free(appended_address_buffer);
}

return result;
}
Expand Down Expand Up @@ -210,6 +299,7 @@ int syslog_prot_process(struct syslog_conn *conn)
flb_time_get(&out_time);
}
pack_line(ctx, &out_time,
conn->connection,
out_buf, out_size,
p, len);
flb_free(out_buf);
Expand All @@ -235,12 +325,21 @@ int syslog_prot_process(struct syslog_conn *conn)
return 0;
}

int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx)
int syslog_prot_process_udp(struct syslog_conn *conn)
{
int ret;
void *out_buf;
size_t out_size;
struct flb_time out_time = {0};
char *buf;
size_t size;
struct flb_syslog *ctx;
struct flb_connection *connection;

buf = conn->buf_data;
size = conn->buf_len;
ctx = conn->ctx;
connection = conn->connection;

ret = flb_parser_do(ctx->parser, buf, size,
&out_buf, &out_size, &out_time);
Expand All @@ -249,6 +348,7 @@ int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx)
flb_time_get(&out_time);
}
pack_line(ctx, &out_time,
connection,
out_buf, out_size,
buf, size);
flb_free(out_buf);
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_syslog/syslog_prot.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
#include "syslog.h"

int syslog_prot_process(struct syslog_conn *conn);
int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx);
int syslog_prot_process_udp(struct syslog_conn *conn);

#endif
Loading