Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_syslog: Provide appending source address parameter #7651

Merged
merged 12 commits into from
Jul 24, 2023
5 changes: 5 additions & 0 deletions plugins/in_syslog/syslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_syslog, raw_message_key),
"Key where the raw message will be preserved"
},
{
FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL,
0, FLB_TRUE, offsetof(struct flb_syslog, source_address_key),
"Key where the source address will be injected"
},


/* EOF */
Expand Down
1 change: 1 addition & 0 deletions plugins/in_syslog/syslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct flb_syslog {
flb_sds_t parser_name;
struct flb_parser *parser;
flb_sds_t raw_message_key;
flb_sds_t source_address_key;

int dgram_mode_flag;
int collector_id;
Expand Down
5 changes: 1 addition & 4 deletions plugins/in_syslog/syslog_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,11 @@ int syslog_dgram_conn_event(void *data)
struct flb_connection *connection;
int bytes;
struct syslog_conn *conn;
struct flb_syslog *ctx;

connection = (struct flb_connection *) data;

conn = connection->user_data;

ctx = conn->ctx;

bytes = flb_io_net_read(connection,
(void *) &conn->buf_data[conn->buf_len],
conn->buf_size - 1);
Expand All @@ -144,7 +141,7 @@ int syslog_dgram_conn_event(void *data)
conn->buf_data[bytes] = '\0';
conn->buf_len = bytes;

syslog_prot_process_udp(conn->buf_data, conn->buf_len, ctx);
syslog_prot_process_udp(conn);
}
else {
flb_errno();
Expand Down
175 changes: 117 additions & 58 deletions plugins/in_syslog/syslog_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_pack.h>

#include "syslog.h"
#include "syslog_conn.h"
#include "syslog_prot.h"

#include <string.h>

Expand All @@ -31,88 +33,127 @@ static inline void consume_bytes(char *buf, int bytes, int length)
memmove(buf, buf + bytes, length - bytes);
}

static int append_raw_message_to_record_data(char **result_buffer,
size_t *result_size,
flb_sds_t raw_message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *raw_message_buffer,
size_t raw_message_size)
static int append_message_to_record_data(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *message_buffer,
size_t message_size,
int message_type)
{
int i;
int result;
size_t unpacker_offset;
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
msgpack_unpacked unpacked_buffer;
int result = FLB_MAP_NOT_MODIFIED;
char *modified_data_buffer;
int modified_data_size;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv message_entry;
*result_buffer = NULL;
*result_size = 0;
modified_data_buffer = NULL;

unpacker_offset = 0;
msgpack_unpacked_init(&unpacked_buffer);
result = msgpack_unpack_next(&unpacked_buffer,
base_object_buffer,
base_object_size,
&unpacker_offset);
if (message_key_name != NULL) {
new_map_entries[0] = &message_entry;

if (result != MSGPACK_UNPACK_SUCCESS) {
return -1;
}
message_entry.key.type = MSGPACK_OBJECT_STR;
message_entry.key.via.str.size = flb_sds_len(message_key_name);
message_entry.key.via.str.ptr = message_key_name;

if (unpacker_offset != base_object_size) {
msgpack_unpacked_destroy(&unpacked_buffer);
return -2;
}
if (message_type == MSGPACK_OBJECT_BIN) {
message_entry.val.type = MSGPACK_OBJECT_BIN;
message_entry.val.via.bin.size = message_size;
message_entry.val.via.bin.ptr = message_buffer;
}
else if (message_type == MSGPACK_OBJECT_STR) {
message_entry.val.type = MSGPACK_OBJECT_STR;
message_entry.val.via.str.size = message_size;
message_entry.val.via.str.ptr = message_buffer;
}
else {
result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE;
}

if (unpacked_buffer.data.type != MSGPACK_OBJECT_MAP) {
msgpack_unpacked_destroy(&unpacked_buffer);
return -3;
if (result == FLB_MAP_NOT_MODIFIED) {
result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
if (result == 0) {
result = FLB_MAP_EXPAND_SUCCESS;
}
else {
result = FLB_MAP_EXPANSION_ERROR;
}
}
}

msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

msgpack_pack_map(&mp_pck, unpacked_buffer.data.via.map.size + 1);

for (i = 0; i < unpacked_buffer.data.via.map.size; i++) {
msgpack_pack_object(&mp_pck, unpacked_buffer.data.via.map.ptr[i].key);
msgpack_pack_object(&mp_pck, unpacked_buffer.data.via.map.ptr[i].val);
if (result == FLB_MAP_EXPAND_SUCCESS) {
*result_buffer = modified_data_buffer;
*result_size = modified_data_size;
}

msgpack_pack_str(&mp_pck, flb_sds_len(raw_message_key_name));
msgpack_pack_str_body(&mp_pck, raw_message_key_name, flb_sds_len(raw_message_key_name));
msgpack_pack_str(&mp_pck, raw_message_size);
msgpack_pack_str_body(&mp_pck, raw_message_buffer, raw_message_size);

*result_buffer = mp_sbuf.data;
*result_size = mp_sbuf.size;

msgpack_unpacked_destroy(&unpacked_buffer);
return result;
}

static inline int pack_line(struct flb_syslog *ctx,
struct flb_time *time,
struct flb_connection *connection,
char *data, size_t data_size,
char *raw_data, size_t raw_data_size)
{
char *modified_data_buffer;
size_t modified_data_size;
char *appended_address_buffer;
size_t appended_address_size;
int result;
char *source_address;

source_address = NULL;
modified_data_buffer = NULL;
appended_address_buffer = NULL;

if (ctx->raw_message_key != NULL) {
result = append_raw_message_to_record_data(&modified_data_buffer,
&modified_data_size,
ctx->raw_message_key,
data,
data_size,
raw_data,
raw_data_size);

if (result != 0) {
flb_plg_debug(ctx->ins, "error appending raw message : %d", result);
result = append_message_to_record_data(&modified_data_buffer,
&modified_data_size,
ctx->raw_message_key,
data,
data_size,
raw_data,
raw_data_size,
MSGPACK_OBJECT_BIN);

if (result == FLB_MAP_EXPANSION_ERROR) {
flb_plg_debug(ctx->ins, "error expanding raw message : %d", result);
}
}

if (ctx->source_address_key != NULL) {
source_address = flb_connection_get_remote_address(connection);
if (source_address != NULL) {
if (modified_data_buffer != NULL) {
result = append_message_to_record_data(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
modified_data_buffer,
modified_data_size,
source_address,
strlen(source_address),
MSGPACK_OBJECT_STR);
}
else {
result = append_message_to_record_data(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
data,
data_size,
source_address,
strlen(source_address),
MSGPACK_OBJECT_STR);
}

if (result == FLB_MAP_EXPANSION_ERROR) {
flb_plg_debug(ctx->ins, "error expanding source_address : %d", result);
}
}
}

Expand All @@ -123,7 +164,11 @@ static inline int pack_line(struct flb_syslog *ctx,
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
if (modified_data_buffer != NULL) {
if (appended_address_buffer != NULL) {
result = flb_log_event_encoder_set_body_from_raw_msgpack(
ctx->log_encoder, appended_address_buffer, appended_address_size);
}
else if (modified_data_buffer != NULL) {
result = flb_log_event_encoder_set_body_from_raw_msgpack(
ctx->log_encoder, modified_data_buffer, modified_data_size);
}
Expand Down Expand Up @@ -154,6 +199,9 @@ static inline int pack_line(struct flb_syslog *ctx,
if (modified_data_buffer != NULL) {
flb_free(modified_data_buffer);
}
if (appended_address_buffer != NULL) {
flb_free(appended_address_buffer);
}

return result;
}
Expand Down Expand Up @@ -210,6 +258,7 @@ int syslog_prot_process(struct syslog_conn *conn)
flb_time_get(&out_time);
}
pack_line(ctx, &out_time,
conn->connection,
out_buf, out_size,
p, len);
flb_free(out_buf);
Expand All @@ -235,12 +284,21 @@ int syslog_prot_process(struct syslog_conn *conn)
return 0;
}

int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx)
int syslog_prot_process_udp(struct syslog_conn *conn)
{
int ret;
void *out_buf;
size_t out_size;
struct flb_time out_time = {0};
char *buf;
size_t size;
struct flb_syslog *ctx;
struct flb_connection *connection;

buf = conn->buf_data;
size = conn->buf_len;
ctx = conn->ctx;
connection = conn->connection;

ret = flb_parser_do(ctx->parser, buf, size,
&out_buf, &out_size, &out_time);
Expand All @@ -249,6 +307,7 @@ int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx)
flb_time_get(&out_time);
}
pack_line(ctx, &out_time,
connection,
out_buf, out_size,
buf, size);
flb_free(out_buf);
Expand Down
7 changes: 6 additions & 1 deletion plugins/in_syslog/syslog_prot.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@

#include "syslog.h"

#define FLB_MAP_EXPAND_SUCCESS 0
#define FLB_MAP_NOT_MODIFIED -1
#define FLB_MAP_EXPANSION_ERROR -2
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3

int syslog_prot_process(struct syslog_conn *conn);
int syslog_prot_process_udp(char *buf, size_t size, struct flb_syslog *ctx);
int syslog_prot_process_udp(struct syslog_conn *conn);

#endif
Loading
Loading