Skip to content

Commit

Permalink
add label config
Browse files Browse the repository at this point in the history
  • Loading branch information
joker-star-l committed Oct 30, 2024
1 parent f02d806 commit 9d552fb
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
27 changes: 24 additions & 3 deletions plugins/out_doris/doris.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ static int cb_doris_init(struct flb_output_instance *ins,
static int http_put(struct flb_out_doris *ctx,
const char *host, int port,
const void *body, size_t body_len,
const char *tag, int tag_len)
const char *tag, int tag_len,
const char *label, int label_len)
{
int ret;
int out_ret = FLB_OK;
Expand Down Expand Up @@ -134,6 +135,11 @@ static int http_put(struct flb_out_doris *ctx,
flb_http_add_header(c, "strip_outer_array", 17, "true", 4);
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);

if (ctx->add_label) {
flb_http_add_header(c, "label", 5, label, label_len);
flb_plg_debug(ctx->ins, "add label: %s", label);
}

flb_config_map_foreach(head, mv, ctx->headers) {
key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
Expand Down Expand Up @@ -170,7 +176,7 @@ static int http_put(struct flb_out_doris *ctx,
memcpy(redict_port, mid + 1, end - (mid + 1));

out_ret = http_put(ctx, redict_host, atoi(redict_port),
body, body_len, tag, tag_len);
body, body_len, tag, tag_len, label, label_len);
}
else if (c->resp.status == 200 && c->resp.payload_size > 0) {
ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
Expand Down Expand Up @@ -296,6 +302,9 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk,
size_t out_size;
(void) i_ins;

char label[256] = {0};
int len = 0;

ret = compose_payload(ctx, event_chunk->data, event_chunk->size,
&out_body, &out_size);

Expand All @@ -306,8 +315,14 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(ret);
}

if (ctx->add_label) {
len = snprintf(label, sizeof(label) - 1, "%s_%lu_", ctx->label_prefix, cfl_time_now() / 1000000000L);
flb_utils_uuid_v4_gen(label + len);
len += 36;
}

ret = http_put(ctx, ctx->host, ctx->port, out_body, out_size,
event_chunk->tag, flb_sds_len(event_chunk->tag));
event_chunk->tag, flb_sds_len(event_chunk->tag), label, len);
flb_sds_destroy(out_body);

if (ret == FLB_OK) {
Expand Down Expand Up @@ -353,6 +368,12 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_out_doris, table),
"Set table"
},
// label_prefix
{
FLB_CONFIG_MAP_STR, "label_prefix", "flubentbit",
0, FLB_TRUE, offsetof(struct flb_out_doris, label_prefix),
"Set label prefix"
},
// time_key
{
FLB_CONFIG_MAP_STR, "time_key", "date",
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_doris/doris.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ struct flb_out_doris {
flb_sds_t database;
flb_sds_t table;

flb_sds_t label_prefix;
int add_label;

flb_sds_t time_key;
flb_sds_t date_key; /* internal use */

Expand Down
10 changes: 10 additions & 0 deletions plugins/out_doris/doris_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins,
/* url: /api/{database}/{table}/_stream_load */
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/api/%s/%s/_stream_load", ctx->database, ctx->table);

/* label prefix */
ctx->add_label = 1;
tmp = flb_output_get_property("label_prefix", ins);
if (tmp) {
/* Just check if we have to disable it */
if (flb_utils_bool(tmp) == FLB_FALSE) {
ctx->add_label = 0;
}
}

/* Date key */
ctx->date_key = ctx->time_key;
tmp = flb_output_get_property("time_key", ins);
Expand Down

0 comments on commit 9d552fb

Please sign in to comment.