Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_syslog: Provide appending source address parameter #7651

Merged
merged 12 commits into from
Jul 24, 2023
5 changes: 5 additions & 0 deletions plugins/in_syslog/syslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_syslog, raw_message_key),
"Key where the raw message will be preserved"
},
{
FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL,
0, FLB_TRUE, offsetof(struct flb_syslog, source_address_key),
"Key where the source address will be injected"
},


/* EOF */
Expand Down
1 change: 1 addition & 0 deletions plugins/in_syslog/syslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct flb_syslog {
flb_sds_t parser_name;
struct flb_parser *parser;
flb_sds_t raw_message_key;
flb_sds_t source_address_key;

int dgram_mode_flag;
int collector_id;
Expand Down
5 changes: 1 addition & 4 deletions plugins/in_syslog/syslog_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,11 @@ int syslog_dgram_conn_event(void *data)
struct flb_connection *connection;
int bytes;
struct syslog_conn *conn;
struct flb_syslog *ctx;

connection = (struct flb_connection *) data;

conn = connection->user_data;

ctx = conn->ctx;

bytes = flb_io_net_read(connection,
(void *) &conn->buf_data[conn->buf_len],
conn->buf_size - 1);
Expand All @@ -144,7 +141,7 @@ int syslog_dgram_conn_event(void *data)
conn->buf_data[bytes] = '\0';
conn->buf_len = bytes;

syslog_prot_process_udp(conn->buf_data, conn->buf_len, ctx);
syslog_prot_process_udp(conn);
}
else {
flb_errno();
Expand Down
170 changes: 115 additions & 55 deletions plugins/in_syslog/syslog_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_pack.h>

#include "syslog.h"
#include "syslog_conn.h"
#include "syslog_prot.h"

#include <string.h>

Expand All @@ -31,88 +33,128 @@ static inline void consume_bytes(char *buf, int bytes, int length)
memmove(buf, buf + bytes, length - bytes);
}

static int append_raw_message_to_record_data(char **result_buffer,
size_t *result_size,
flb_sds_t raw_message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *raw_message_buffer,
size_t raw_message_size)
static int append_message_to_record_data(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *message_buffer,
size_t message_size,
int message_type)
{
int i;
int result;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize result to FLB_MAP_NOT_MODIFIED, othwewise if message_key_name is NULL the check in line 87 is accessing garbage which considering that the value of FLB_MAP_EXPAND_SUCCESS is 0 could cause this function to return garbage.

size_t unpacker_offset;
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
msgpack_unpacked unpacked_buffer;
char *modified_data_buffer;
int modified_data_size;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv message_entry;
*result_buffer = NULL;
*result_size = 0;
modified_data_buffer = NULL;

unpacker_offset = 0;
msgpack_unpacked_init(&unpacked_buffer);
result = msgpack_unpack_next(&unpacked_buffer,
base_object_buffer,
base_object_size,
&unpacker_offset);
if (message_key_name != NULL) {
new_map_entries[0] = &message_entry;

if (result != MSGPACK_UNPACK_SUCCESS) {
return -1;
}
message_entry.key.type = MSGPACK_OBJECT_STR;
message_entry.key.via.str.size = flb_sds_len(message_key_name);
message_entry.key.via.str.ptr = message_key_name;

if (unpacker_offset != base_object_size) {
msgpack_unpacked_destroy(&unpacked_buffer);
return -2;
}
if (message_type == MSGPACK_OBJECT_BIN) {
message_entry.val.type = MSGPACK_OBJECT_BIN;
message_entry.val.via.bin.size = message_size;
message_entry.val.via.bin.ptr = message_buffer;
}
else if (message_type == MSGPACK_OBJECT_STR) {
message_entry.val.type = MSGPACK_OBJECT_BIN;
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
message_entry.val.via.str.size = message_size;
message_entry.val.via.str.ptr = message_buffer;
}
else {
result = FLB_MAP_NOT_MODIFIED;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new constant named FLB_MAP_EXPANSION_INVALID_VALUE_TYPE and use it in this line, trying to use a type that's not acceptable is an error, we don't want to miss it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Will do.


if (unpacked_buffer.data.type != MSGPACK_OBJECT_MAP) {
msgpack_unpacked_destroy(&unpacked_buffer);
return -3;
}
return result;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this return and wrap the code in lines 77-84 in a conditional that only executes if result equals FLB_MAP_NOT_MODIFIED

}

msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
if (result == 0) {
result = FLB_MAP_EXPAND_SUCCESS;
}
}

msgpack_pack_map(&mp_pck, unpacked_buffer.data.via.map.size + 1);
if (result != FLB_MAP_EXPAND_SUCCESS) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the comment in line 42 you need to invert the clause in this conditional block and remove the result override.

Then you need to add an else block to the clause in line 82 so for any other values returned by flb_msgpack_expand_map result is set to FLB_MAP_EXPANSION_ERROR.

result = FLB_MAP_EXPANSION_ERROR;

for (i = 0; i < unpacked_buffer.data.via.map.size; i++) {
msgpack_pack_object(&mp_pck, unpacked_buffer.data.via.map.ptr[i].key);
msgpack_pack_object(&mp_pck, unpacked_buffer.data.via.map.ptr[i].val);
return result;
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
}

msgpack_pack_str(&mp_pck, flb_sds_len(raw_message_key_name));
msgpack_pack_str_body(&mp_pck, raw_message_key_name, flb_sds_len(raw_message_key_name));
msgpack_pack_str(&mp_pck, raw_message_size);
msgpack_pack_str_body(&mp_pck, raw_message_buffer, raw_message_size);

*result_buffer = mp_sbuf.data;
*result_size = mp_sbuf.size;
*result_buffer = modified_data_buffer;
*result_size = modified_data_size;

msgpack_unpacked_destroy(&unpacked_buffer);
return result;
}

static inline int pack_line(struct flb_syslog *ctx,
struct flb_time *time,
struct flb_connection *connection,
char *data, size_t data_size,
char *raw_data, size_t raw_data_size)
{
char *modified_data_buffer;
size_t modified_data_size;
char *appended_address_buffer;
size_t appended_address_size;
int result;
char *source_address;

source_address = NULL;
modified_data_buffer = NULL;
appended_address_buffer = NULL;

if (ctx->raw_message_key != NULL) {
result = append_raw_message_to_record_data(&modified_data_buffer,
&modified_data_size,
ctx->raw_message_key,
data,
data_size,
raw_data,
raw_data_size);

if (result != 0) {
flb_plg_debug(ctx->ins, "error appending raw message : %d", result);
result = append_message_to_record_data(&modified_data_buffer,
&modified_data_size,
ctx->raw_message_key,
data,
data_size,
raw_data,
raw_data_size,
MSGPACK_OBJECT_BIN);

if (result == FLB_MAP_EXPANSION_ERROR) {
flb_plg_debug(ctx->ins, "error expanding raw message : %d", result);
}
}

if (ctx->source_address_key != NULL) {
source_address = flb_connection_get_remote_address(connection);
if (source_address != NULL) {
if (modified_data_buffer != NULL) {
result = append_message_to_record_data(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
modified_data_buffer,
modified_data_size,
source_address,
strlen(source_address),
MSGPACK_OBJECT_STR);
}
else {
result = append_message_to_record_data(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
data,
data_size,
source_address,
strlen(source_address),
MSGPACK_OBJECT_STR);
}

if (result == FLB_MAP_EXPANSION_ERROR) {
flb_plg_debug(ctx->ins, "error expanding source_address : %d", result);
}
}
}

Expand All @@ -123,7 +165,11 @@ static inline int pack_line(struct flb_syslog *ctx,
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
if (modified_data_buffer != NULL) {
if (appended_address_buffer != NULL) {
result = flb_log_event_encoder_set_body_from_raw_msgpack(
ctx->log_encoder, appended_address_buffer, appended_address_size);
}
else if (modified_data_buffer != NULL) {
result = flb_log_event_encoder_set_body_from_raw_msgpack(
ctx->log_encoder, modified_data_buffer, modified_data_size);
}
Expand Down Expand Up @@ -154,6 +200,9 @@ static inline int pack_line(struct flb_syslog *ctx,
if (modified_data_buffer != NULL) {
flb_free(modified_data_buffer);
}
if (appended_address_buffer != NULL) {
flb_free(appended_address_buffer);
}

return result;
}
Expand Down Expand Up @@ -210,6 +259,7 @@ int syslog_prot_process(struct syslog_conn *conn)
flb_time_get(&out_time);
}
pack_line(ctx, &out_time,
conn->connection,
out_buf, out_size,
p, len);
flb_free(out_buf);
Expand All @@ -235,12 +285,21 @@ int syslog_prot_process(struct syslog_conn *conn)
return 0;
}

int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx)
int syslog_prot_process_udp(struct syslog_conn *conn)
{
int ret;
void *out_buf;
size_t out_size;
struct flb_time out_time = {0};
char *buf;
size_t size;
struct flb_syslog *ctx;
struct flb_connection *connection;

buf = conn->buf_data;
size = conn->buf_len;
ctx = conn->ctx;
connection = conn->connection;

ret = flb_parser_do(ctx->parser, buf, size,
&out_buf, &out_size, &out_time);
Expand All @@ -249,6 +308,7 @@ int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx)
flb_time_get(&out_time);
}
pack_line(ctx, &out_time,
connection,
out_buf, out_size,
buf, size);
flb_free(out_buf);
Expand Down
6 changes: 5 additions & 1 deletion plugins/in_syslog/syslog_prot.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@

#include "syslog.h"

#define FLB_MAP_EXPAND_SUCCESS 0
#define FLB_MAP_NOT_MODIFIED -1
#define FLB_MAP_EXPANSION_ERROR -2

int syslog_prot_process(struct syslog_conn *conn);
int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx);
int syslog_prot_process_udp(struct syslog_conn *conn);

#endif
Loading