Skip to content

Commit

Permalink
in_syslog: Provide appending source address parameter (#7651)
Browse files Browse the repository at this point in the history
* in_syslog: Append source_address into records if needed
  • Loading branch information
cosmo0920 authored and leonardo-albertovich committed Oct 5, 2023
1 parent 5560628 commit 82c3ef1
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 63 deletions.
5 changes: 5 additions & 0 deletions plugins/in_syslog/syslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_syslog, raw_message_key),
"Key where the raw message will be preserved"
},
{
FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL,
0, FLB_TRUE, offsetof(struct flb_syslog, source_address_key),
"Key where the source address will be injected"
},


/* EOF */
Expand Down
1 change: 1 addition & 0 deletions plugins/in_syslog/syslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct flb_syslog {
flb_sds_t parser_name;
struct flb_parser *parser;
flb_sds_t raw_message_key;
flb_sds_t source_address_key;

int dgram_mode_flag;
int collector_id;
Expand Down
5 changes: 1 addition & 4 deletions plugins/in_syslog/syslog_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,11 @@ int syslog_dgram_conn_event(void *data)
struct flb_connection *connection;
int bytes;
struct syslog_conn *conn;
struct flb_syslog *ctx;

connection = (struct flb_connection *) data;

conn = connection->user_data;

ctx = conn->ctx;

bytes = flb_io_net_read(connection,
(void *) &conn->buf_data[conn->buf_len],
conn->buf_size - 1);
Expand All @@ -144,7 +141,7 @@ int syslog_dgram_conn_event(void *data)
conn->buf_data[bytes] = '\0';
conn->buf_len = bytes;

syslog_prot_process_udp(conn->buf_data, conn->buf_len, ctx);
syslog_prot_process_udp(conn);
}
else {
flb_errno();
Expand Down
175 changes: 117 additions & 58 deletions plugins/in_syslog/syslog_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_pack.h>

#include "syslog.h"
#include "syslog_conn.h"
#include "syslog_prot.h"

#include <string.h>

Expand All @@ -31,88 +33,127 @@ static inline void consume_bytes(char *buf, int bytes, int length)
memmove(buf, buf + bytes, length - bytes);
}

static int append_raw_message_to_record_data(char **result_buffer,
size_t *result_size,
flb_sds_t raw_message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *raw_message_buffer,
size_t raw_message_size)
static int append_message_to_record_data(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *message_buffer,
size_t message_size,
int message_type)
{
int i;
int result;
size_t unpacker_offset;
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
msgpack_unpacked unpacked_buffer;
int result = FLB_MAP_NOT_MODIFIED;
char *modified_data_buffer;
int modified_data_size;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv message_entry;
*result_buffer = NULL;
*result_size = 0;
modified_data_buffer = NULL;

unpacker_offset = 0;
msgpack_unpacked_init(&unpacked_buffer);
result = msgpack_unpack_next(&unpacked_buffer,
base_object_buffer,
base_object_size,
&unpacker_offset);
if (message_key_name != NULL) {
new_map_entries[0] = &message_entry;

if (result != MSGPACK_UNPACK_SUCCESS) {
return -1;
}
message_entry.key.type = MSGPACK_OBJECT_STR;
message_entry.key.via.str.size = flb_sds_len(message_key_name);
message_entry.key.via.str.ptr = message_key_name;

if (unpacker_offset != base_object_size) {
msgpack_unpacked_destroy(&unpacked_buffer);
return -2;
}
if (message_type == MSGPACK_OBJECT_BIN) {
message_entry.val.type = MSGPACK_OBJECT_BIN;
message_entry.val.via.bin.size = message_size;
message_entry.val.via.bin.ptr = message_buffer;
}
else if (message_type == MSGPACK_OBJECT_STR) {
message_entry.val.type = MSGPACK_OBJECT_STR;
message_entry.val.via.str.size = message_size;
message_entry.val.via.str.ptr = message_buffer;
}
else {
result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE;
}

if (unpacked_buffer.data.type != MSGPACK_OBJECT_MAP) {
msgpack_unpacked_destroy(&unpacked_buffer);
return -3;
if (result == FLB_MAP_NOT_MODIFIED) {
result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
if (result == 0) {
result = FLB_MAP_EXPAND_SUCCESS;
}
else {
result = FLB_MAP_EXPANSION_ERROR;
}
}
}

msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

msgpack_pack_map(&mp_pck, unpacked_buffer.data.via.map.size + 1);

for (i = 0; i < unpacked_buffer.data.via.map.size; i++) {
msgpack_pack_object(&mp_pck, unpacked_buffer.data.via.map.ptr[i].key);
msgpack_pack_object(&mp_pck, unpacked_buffer.data.via.map.ptr[i].val);
if (result == FLB_MAP_EXPAND_SUCCESS) {
*result_buffer = modified_data_buffer;
*result_size = modified_data_size;
}

msgpack_pack_str(&mp_pck, flb_sds_len(raw_message_key_name));
msgpack_pack_str_body(&mp_pck, raw_message_key_name, flb_sds_len(raw_message_key_name));
msgpack_pack_str(&mp_pck, raw_message_size);
msgpack_pack_str_body(&mp_pck, raw_message_buffer, raw_message_size);

*result_buffer = mp_sbuf.data;
*result_size = mp_sbuf.size;

msgpack_unpacked_destroy(&unpacked_buffer);
return result;
}

static inline int pack_line(struct flb_syslog *ctx,
struct flb_time *time,
struct flb_connection *connection,
char *data, size_t data_size,
char *raw_data, size_t raw_data_size)
{
char *modified_data_buffer;
size_t modified_data_size;
char *appended_address_buffer;
size_t appended_address_size;
int result;
char *source_address;

source_address = NULL;
modified_data_buffer = NULL;
appended_address_buffer = NULL;

if (ctx->raw_message_key != NULL) {
result = append_raw_message_to_record_data(&modified_data_buffer,
&modified_data_size,
ctx->raw_message_key,
data,
data_size,
raw_data,
raw_data_size);

if (result != 0) {
flb_plg_debug(ctx->ins, "error appending raw message : %d", result);
result = append_message_to_record_data(&modified_data_buffer,
&modified_data_size,
ctx->raw_message_key,
data,
data_size,
raw_data,
raw_data_size,
MSGPACK_OBJECT_BIN);

if (result == FLB_MAP_EXPANSION_ERROR) {
flb_plg_debug(ctx->ins, "error expanding raw message : %d", result);
}
}

if (ctx->source_address_key != NULL) {
source_address = flb_connection_get_remote_address(connection);
if (source_address != NULL) {
if (modified_data_buffer != NULL) {
result = append_message_to_record_data(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
modified_data_buffer,
modified_data_size,
source_address,
strlen(source_address),
MSGPACK_OBJECT_STR);
}
else {
result = append_message_to_record_data(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
data,
data_size,
source_address,
strlen(source_address),
MSGPACK_OBJECT_STR);
}

if (result == FLB_MAP_EXPANSION_ERROR) {
flb_plg_debug(ctx->ins, "error expanding source_address : %d", result);
}
}
}

Expand All @@ -123,7 +164,11 @@ static inline int pack_line(struct flb_syslog *ctx,
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
if (modified_data_buffer != NULL) {
if (appended_address_buffer != NULL) {
result = flb_log_event_encoder_set_body_from_raw_msgpack(
ctx->log_encoder, appended_address_buffer, appended_address_size);
}
else if (modified_data_buffer != NULL) {
result = flb_log_event_encoder_set_body_from_raw_msgpack(
ctx->log_encoder, modified_data_buffer, modified_data_size);
}
Expand Down Expand Up @@ -154,6 +199,9 @@ static inline int pack_line(struct flb_syslog *ctx,
if (modified_data_buffer != NULL) {
flb_free(modified_data_buffer);
}
if (appended_address_buffer != NULL) {
flb_free(appended_address_buffer);
}

return result;
}
Expand Down Expand Up @@ -210,6 +258,7 @@ int syslog_prot_process(struct syslog_conn *conn)
flb_time_get(&out_time);
}
pack_line(ctx, &out_time,
conn->connection,
out_buf, out_size,
p, len);
flb_free(out_buf);
Expand All @@ -235,12 +284,21 @@ int syslog_prot_process(struct syslog_conn *conn)
return 0;
}

int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx)
int syslog_prot_process_udp(struct syslog_conn *conn)
{
int ret;
void *out_buf;
size_t out_size;
struct flb_time out_time = {0};
char *buf;
size_t size;
struct flb_syslog *ctx;
struct flb_connection *connection;

buf = conn->buf_data;
size = conn->buf_len;
ctx = conn->ctx;
connection = conn->connection;

ret = flb_parser_do(ctx->parser, buf, size,
&out_buf, &out_size, &out_time);
Expand All @@ -249,6 +307,7 @@ int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx)
flb_time_get(&out_time);
}
pack_line(ctx, &out_time,
connection,
out_buf, out_size,
buf, size);
flb_free(out_buf);
Expand Down
7 changes: 6 additions & 1 deletion plugins/in_syslog/syslog_prot.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@

#include "syslog.h"

#define FLB_MAP_EXPAND_SUCCESS 0
#define FLB_MAP_NOT_MODIFIED -1
#define FLB_MAP_EXPANSION_ERROR -2
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3

int syslog_prot_process(struct syslog_conn *conn);
int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx);
int syslog_prot_process_udp(struct syslog_conn *conn);

#endif
Loading

0 comments on commit 82c3ef1

Please sign in to comment.