diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 4292ef204b5..2d86f69bc9b 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -111,6 +111,7 @@ DEFINE_OPTION(FLB_OUT_CHRONICLE "Enable Google Chronicle output pl DEFINE_OPTION(FLB_OUT_CLOUDWATCH_LOGS "Enable AWS CloudWatch output plugin" ON) DEFINE_OPTION(FLB_OUT_COUNTER "Enable Counter output plugin" ON) DEFINE_OPTION(FLB_OUT_DATADOG "Enable DataDog output plugin" ON) +DEFINE_OPTION(FLB_OUT_DORIS "Enable Apache Doris output plugin" ON) DEFINE_OPTION(FLB_OUT_ES "Enable Elasticsearch output plugin" ON) DEFINE_OPTION(FLB_OUT_EXIT "Enable Exit output plugin" ON) DEFINE_OPTION(FLB_OUT_FILE "Enable file output plugin" ON) diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index b947d6cbfa0..e24909211e5 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -66,6 +66,7 @@ if(FLB_WINDOWS_DEFAULTS) set(FLB_OUT_COUNTER Yes) set(FLB_OUT_CHRONICLE Yes) set(FLB_OUT_DATADOG Yes) + set(FLB_OUT_DORIS Yes) set(FLB_OUT_ES Yes) set(FLB_OUT_EXIT No) set(FLB_OUT_FORWARD Yes) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 200a09b449c..7bf3f81af19 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -344,6 +344,7 @@ REGISTER_OUT_PLUGIN("out_prometheus_remote_write") REGISTER_OUT_PLUGIN("out_s3") REGISTER_OUT_PLUGIN("out_vivo_exporter") REGISTER_OUT_PLUGIN("out_chronicle") +REGISTER_OUT_PLUGIN("out_doris") # FILTERS # ======= diff --git a/plugins/out_doris/CMakeLists.txt b/plugins/out_doris/CMakeLists.txt new file mode 100644 index 00000000000..79338fdbb20 --- /dev/null +++ b/plugins/out_doris/CMakeLists.txt @@ -0,0 +1,6 @@ +set(src + doris.c + doris_conf.c + ) + +FLB_PLUGIN(out_doris "${src}" "") \ No newline at end of file diff --git a/plugins/out_doris/doris.c b/plugins/out_doris/doris.c new file mode 100644 index 00000000000..b9025f80c08 --- /dev/null +++ b/plugins/out_doris/doris.c @@ -0,0 +1,461 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "doris.h" +#include "doris_conf.h" + +#include + +#ifdef FLB_SYSTEM_WINDOWS +#include +#endif + +static int cb_doris_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ + struct flb_out_doris *ctx = NULL; + (void) data; + + ctx = flb_doris_conf_create(ins, config); + if (!ctx) { + return -1; + } + + /* Set the plugin context */ + flb_output_set_context(ins, ctx); + + /* + * This plugin instance uses the HTTP client interface, let's register + * it debugging callbacks. + */ + flb_output_set_http_debug_callbacks(ins); + + return 0; +} + +static int http_put(struct flb_out_doris *ctx, + const char *host, int port, + const void *body, size_t body_len, + const char *tag, int tag_len, + const char *label, int label_len) +{ + int ret; + int out_ret = FLB_OK; + size_t b_sent; + void *payload_buf = NULL; + size_t payload_size = 0; + struct flb_upstream *u; + struct flb_connection *u_conn; + struct flb_http_client *c; + struct mk_list *head; + struct flb_config_map_val *mv; + struct flb_slist_entry *key = NULL; + struct flb_slist_entry *val = NULL; + + int i; + int root_type; + char *out_buf; + size_t off = 0; + size_t out_size; + msgpack_unpacked result; + msgpack_object root; + msgpack_object msg_key; + msgpack_object msg_val; + + /* Get upstream context and connection */ + if (strcmp(host, ctx->host) == 0 && port == ctx->port) { + u = ctx->u; + } + else { + // TODO cache + u = flb_upstream_create(ctx->u->base.config, + host, + port, + ctx->u->base.flags, + ctx->u->base.tls_context); + } + u_conn = flb_upstream_conn_get(u); + if (!u_conn) { + flb_plg_error(ctx->ins, "no upstream connections available to %s:%i", + u->tcp_host, u->tcp_port); + return FLB_RETRY; + } + + /* Map payload */ + payload_buf = (void *) body; + payload_size = body_len; + + /* Create HTTP client context */ + c = flb_http_client(u_conn, FLB_HTTP_PUT, ctx->uri, + payload_buf, payload_size, + host, port, + NULL, 0); + + /* + * Direct assignment of the callback context to the HTTP client context. + * This needs to be improved through a more clean API. + */ + c->cb_ctx = ctx->ins->callback; + + /* Append headers */ + flb_http_add_header(c, "format", 6, "json", 4); + flb_http_add_header(c, "Expect", 6, "100-continue", 12); + flb_http_add_header(c, "strip_outer_array", 17, "true", 4); + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + + if (ctx->add_label) { + flb_http_add_header(c, "label", 5, label, label_len); + flb_plg_debug(ctx->ins, "add label: %s", label); + } + + flb_config_map_foreach(head, mv, ctx->headers) { + key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); + val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); + + flb_http_add_header(c, + key->str, flb_sds_len(key->str), + val->str, flb_sds_len(val->str)); + } + + /* Basic Auth headers */ + flb_http_basic_auth(c, ctx->user, ctx->password); + + ret = flb_http_do(c, &b_sent); + if (ret == 0) { + if (ctx->log_request) { + flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", + host, port, + c->resp.status, c->resp.payload); + } else { + flb_plg_debug(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", + host, port, + c->resp.status, c->resp.payload); + } + + if (c->resp.status == 307) { // redict + // example: Location: http://admin:admin@127.0.0.1:8040/api/d_fb/t_fb/_stream_load? + char* location = strstr(c->resp.data, "Location:"); + char* start = strstr(location, "@") + 1; + char* mid = strstr(start, ":"); + char* end = strstr(mid, "/api"); + char redict_host[50] = {0}; + memcpy(redict_host, start, mid - start); + char redict_port[10] = {0}; + memcpy(redict_port, mid + 1, end - (mid + 1)); + + out_ret = http_put(ctx, redict_host, atoi(redict_port), + body, body_len, tag, tag_len, label, label_len); + } + else if (c->resp.status == 200 && c->resp.payload_size > 0) { + ret = flb_pack_json(c->resp.payload, c->resp.payload_size, + &out_buf, &out_size, &root_type, NULL); + + if (ret == -1) { + out_ret = FLB_RETRY; + } + + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, out_buf, out_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + out_ret = FLB_RETRY; + } + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + out_ret = FLB_RETRY; + } + + for (i = 0; i < root.via.map.size; i++) { + msg_key = root.via.map.ptr[i].key; + if (msg_key.type != MSGPACK_OBJECT_STR) { + out_ret = FLB_RETRY; + break; + } + + if (msg_key.via.str.size == 6 && strncmp(msg_key.via.str.ptr, "Status", 6) == 0) { + msg_val = root.via.map.ptr[i].val; + if (msg_val.type != MSGPACK_OBJECT_STR) { + out_ret = FLB_RETRY; + break; + } + + if (msg_val.via.str.size == 7 && strncmp(msg_val.via.str.ptr, "Success", 7) == 0) { + out_ret = FLB_OK; + break; + } + + if (msg_val.via.str.size == 15 && strncmp(msg_val.via.str.ptr, "Publish Timeout", 15) == 0) { + out_ret = FLB_OK; + break; + } + + out_ret = FLB_RETRY; + break; + } + } + + flb_free(out_buf); + msgpack_unpacked_destroy(&result); + } + else { + out_ret = FLB_RETRY; + } + } + else { + flb_plg_error(ctx->ins, "could not flush records to %s:%i (http_do=%i)", + ctx->host, ctx->port, ret); + out_ret = FLB_RETRY; + } + + /* cleanup */ + + /* + * If the payload buffer is different than incoming records in body, means + * we generated a different payload and must be freed. + */ + if (payload_buf != body) { + flb_free(payload_buf); + } + + /* Destroy HTTP client context */ + flb_http_client_destroy(c); + + /* Release the TCP connection */ + flb_upstream_conn_release(u_conn); + + /* Release flb_upstream */ + if (u != ctx->u) { + flb_upstream_destroy(u); + } + + return out_ret; +} + +static int compose_payload(struct flb_out_doris *ctx, + const void *in_body, size_t in_size, + void **out_body, size_t *out_size) +{ + flb_sds_t encoded; + + *out_body = NULL; + *out_size = 0; + + encoded = flb_pack_msgpack_to_json_format(in_body, + in_size, + FLB_PACK_JSON_FORMAT_JSON, + FLB_PACK_JSON_DATE_EPOCH, + ctx->date_key); + if (encoded == NULL) { + flb_plg_error(ctx->ins, "failed to convert json"); + return FLB_ERROR; + } + *out_body = (void*)encoded; + *out_size = flb_sds_len(encoded); + + if (ctx->log_request) { + flb_plg_info(ctx->ins, "http body: %s", (char*) *out_body); + } else { + flb_plg_debug(ctx->ins, "http body: %s", (char*) *out_body); + } + + return FLB_OK; +} + +static void cb_doris_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret = FLB_ERROR; + struct flb_out_doris *ctx = out_context; + void *out_body; + size_t out_size; + (void) i_ins; + + char label[256] = {0}; + int len = 0; + + ret = compose_payload(ctx, event_chunk->data, event_chunk->size, + &out_body, &out_size); + + if (ret != FLB_OK) { + if (ret == FLB_ERROR && ctx->log_progress_interval > 0) { +#ifdef FLB_SYSTEM_WINDOWS + InterlockedAdd(&ctx->reporter->failed_rows, event_chunk->total_events); +#else + __sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); +#endif + } + FLB_OUTPUT_RETURN(ret); + } + + if (ctx->add_label) { + len = snprintf(label, sizeof(label) - 1, "%s_%lu_", ctx->label_prefix, cfl_time_now() / 1000000000L); + flb_utils_uuid_v4_gen(label + len); + len += 36; + } + + ret = http_put(ctx, ctx->host, ctx->port, out_body, out_size, + event_chunk->tag, flb_sds_len(event_chunk->tag), label, len); + flb_sds_destroy(out_body); + + if (ret == FLB_OK && ctx->log_progress_interval > 0) { +#ifdef FLB_SYSTEM_WINDOWS + InterlockedAdd(&ctx->reporter->total_bytes, out_size); + InterlockedAdd(&ctx->reporter->total_rows, event_chunk->total_events); +#else + __sync_fetch_and_add(&ctx->reporter->total_bytes, out_size); + __sync_fetch_and_add(&ctx->reporter->total_rows, event_chunk->total_events); +#endif + } else if (ret == FLB_ERROR && ctx->log_progress_interval > 0) { +#ifdef FLB_SYSTEM_WINDOWS + InterlockedAdd(&ctx->reporter->failed_rows, event_chunk->total_events); +#else + __sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events); +#endif + } + FLB_OUTPUT_RETURN(ret); +} + +static int cb_doris_exit(void *data, struct flb_config *config) +{ + struct flb_out_doris *ctx = data; + + flb_doris_conf_destroy(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + // user + { + FLB_CONFIG_MAP_STR, "user", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_doris, user), + "Set HTTP auth user" + }, + // password + { + FLB_CONFIG_MAP_STR, "password", "", + 0, FLB_TRUE, offsetof(struct flb_out_doris, password), + "Set HTTP auth password" + }, + // database + { + FLB_CONFIG_MAP_STR, "database", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_doris, database), + "Set database" + }, + // table + { + FLB_CONFIG_MAP_STR, "table", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_doris, table), + "Set table" + }, + // label_prefix + { + FLB_CONFIG_MAP_STR, "label_prefix", "fluentbit", + 0, FLB_TRUE, offsetof(struct flb_out_doris, label_prefix), + "Set label prefix" + }, + // time_key + { + FLB_CONFIG_MAP_STR, "time_key", "date", + 0, FLB_TRUE, offsetof(struct flb_out_doris, time_key), + "Specify the name of the date field in output" + }, + // header + { + FLB_CONFIG_MAP_SLIST_1, "header", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_doris, headers), + "Add a doris stream load header key/value pair. Multiple headers can be set" + }, + // log_request + { + FLB_CONFIG_MAP_BOOL, "log_request", "true", + 0, FLB_TRUE, offsetof(struct flb_out_doris, log_request), + "Specify if the doris stream load request and response should be logged or not" + }, + // log_progress_interval + { + FLB_CONFIG_MAP_INT, "log_progress_interval", "10", + 0, FLB_TRUE, offsetof(struct flb_out_doris, log_progress_interval), + "Specify the interval in seconds to log the progress of the doris stream load" + }, + + /* EOF */ + {0} +}; + +static int cb_doris_format_test(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *flush_ctx, + int event_type, + const char *tag, int tag_len, + const void *data, size_t bytes, + void **out_data, size_t *out_size) +{ + struct flb_out_doris *ctx = plugin_context; + int ret; + + ret = compose_payload(ctx, data, bytes, out_data, out_size); + if (ret != FLB_OK) { + flb_error("ret=%d", ret); + return -1; + } + return 0; +} + +/* Plugin reference */ +struct flb_output_plugin out_doris_plugin = { + .name = "doris", + .description = "Doris Output", + .cb_init = cb_doris_init, + .cb_pre_run = NULL, + .cb_flush = cb_doris_flush, + .cb_exit = cb_doris_exit, + .config_map = config_map, + + /* for testing */ + .test_formatter.callback = cb_doris_format_test, + + .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, + .workers = 2 +}; \ No newline at end of file diff --git a/plugins/out_doris/doris.h b/plugins/out_doris/doris.h new file mode 100644 index 00000000000..b18a48bad2a --- /dev/null +++ b/plugins/out_doris/doris.h @@ -0,0 +1,64 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_DORIS_H +#define FLB_OUT_DORIS_H + +#include + +struct flb_doris_progress_reporter { + size_t total_bytes; + size_t total_rows; + size_t failed_rows; +}; + +struct flb_out_doris { + char *host; + int port; + char uri[256]; + + char *user; + char *password; + + flb_sds_t database; + flb_sds_t table; + + flb_sds_t label_prefix; + int add_label; + + flb_sds_t time_key; + flb_sds_t date_key; /* internal use */ + + /* doris stream load headers */ + struct mk_list *headers; + + int log_request; + int log_progress_interval; + + struct flb_doris_progress_reporter *reporter; + pthread_t reporter_thread; + + /* Upstream connection to the backend server */ + struct flb_upstream *u; + + /* Plugin instance */ + struct flb_output_instance *ins; +}; + +#endif diff --git a/plugins/out_doris/doris_conf.c b/plugins/out_doris/doris_conf.c new file mode 100644 index 00000000000..4730a1bd604 --- /dev/null +++ b/plugins/out_doris/doris_conf.c @@ -0,0 +1,207 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include "doris.h" +#include "doris_conf.h" + +void *report(void *c) { + struct flb_out_doris *ctx = (struct flb_out_doris *) c; + + size_t init_time = cfl_time_now() / 1000000000L; + size_t last_time = init_time; + size_t last_bytes = ctx->reporter->total_bytes; + size_t last_rows = ctx->reporter->total_rows; + + size_t cur_time, cur_bytes, cur_rows, total_time, total_speed_mbps, total_speed_rps; + size_t inc_bytes, inc_rows, inc_time, inc_speed_mbps, inc_speed_rps; + + pthread_detach(pthread_self()); + + flb_plg_info(ctx->ins, "Start progress reporter with interval %d", ctx->log_progress_interval); + + while (ctx->log_progress_interval > 0) { + sleep(ctx->log_progress_interval); + + cur_time = cfl_time_now() / 1000000000L; + cur_bytes = ctx->reporter->total_bytes; + cur_rows = ctx->reporter->total_rows; + total_time = cur_time - init_time; + total_speed_mbps = cur_bytes / 1024 / 1024 / total_time; + total_speed_rps = cur_rows / total_time; + + inc_bytes = cur_bytes - last_bytes; + inc_rows = cur_rows - last_rows; + inc_time = cur_time - last_time; + inc_speed_mbps = inc_bytes / 1024 / 1024 / inc_time; + inc_speed_rps = inc_rows / inc_time; + + flb_plg_info(ctx->ins, "total %zu MB %zu ROWS, total speed %zu MB/s %zu R/s, last %zu seconds speed %zu MB/s %zu R/s", + cur_bytes/1024/1024, cur_rows, total_speed_mbps, total_speed_rps, + inc_time, inc_speed_mbps, inc_speed_rps); + + last_time = cur_time; + last_bytes = cur_bytes; + last_rows = cur_rows; + } + + return NULL; +} + +struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret; + int io_flags = 0; + const char *tmp; + struct flb_upstream *upstream; + struct flb_out_doris *ctx = NULL; + struct flb_doris_progress_reporter *reporter = NULL; + + /* Allocate plugin context */ + ctx = flb_calloc(1, sizeof(struct flb_out_doris)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Set default network configuration */ + flb_output_net_default("127.0.0.1", 8030, ins); + + /* Validate */ + if (!ctx->user) { + flb_plg_error(ins, "user is not set"); + } + if (!ctx->database) { + flb_plg_error(ins, "database is not set"); + } + if (!ctx->table) { + flb_plg_error(ins, "table is not set"); + } + + /* Check if SSL/TLS is enabled */ +#ifdef FLB_HAVE_TLS + if (ins->use_tls == FLB_TRUE) { + io_flags = FLB_IO_TLS; + } + else { + io_flags = FLB_IO_TCP; + } +#else + io_flags = FLB_IO_TCP; +#endif + + if (ins->host.ipv6 == FLB_TRUE) { + io_flags |= FLB_IO_IPV6; + } + + /* Prepare an upstream handler */ + upstream = flb_upstream_create(config, + ins->host.name, + ins->host.port, + io_flags, ins->tls); + + if (!upstream) { + flb_free(ctx); + return NULL; + } + + /* url: /api/{database}/{table}/_stream_load */ + snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/api/%s/%s/_stream_load", ctx->database, ctx->table); + + /* label prefix */ + ctx->add_label = 1; + tmp = flb_output_get_property("label_prefix", ins); + if (tmp) { + /* Just check if we have to disable it */ + if (flb_utils_bool(tmp) == FLB_FALSE) { + ctx->add_label = 0; + } + } + + /* Date key */ + ctx->date_key = ctx->time_key; + tmp = flb_output_get_property("time_key", ins); + if (tmp) { + /* Just check if we have to disable it */ + if (flb_utils_bool(tmp) == FLB_FALSE) { + ctx->date_key = NULL; + } + } + + ctx->u = upstream; + ctx->host = ins->host.name; + ctx->port = ins->host.port; + + /* Set instance flags into upstream */ + flb_output_upstream_set(ctx->u, ins); + + /* create and start the progress reporter */ + if (ctx->log_progress_interval > 0) { + reporter = flb_calloc(1, sizeof(struct flb_doris_progress_reporter)); + if (!reporter) { + flb_plg_error(ins, "failed to create progress reporter"); + flb_doris_conf_destroy(ctx); + return NULL; + } + reporter->total_bytes = 0; + reporter->total_rows = 0; + reporter->failed_rows = 0; + ctx->reporter = reporter; + + if(pthread_create(&ctx->reporter_thread, NULL, report, (void *) ctx)) { + flb_plg_error(ins, "failed to create progress reporter"); + flb_doris_conf_destroy(ctx); + return NULL; + } + } + + return ctx; +} + +void flb_doris_conf_destroy(struct flb_out_doris *ctx) +{ + if (!ctx) { + return; + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + + if (ctx->reporter) { + pthread_cancel(ctx->reporter_thread); + flb_free(ctx->reporter); + } + + flb_free(ctx); +} diff --git a/plugins/out_doris/doris_conf.h b/plugins/out_doris/doris_conf.h new file mode 100644 index 00000000000..5c9eae4e331 --- /dev/null +++ b/plugins/out_doris/doris_conf.h @@ -0,0 +1,32 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_DORIS_CONF_H +#define FLB_OUT_DORIS_CONF_H + +#include +#include + +#include "doris.h" + +struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins, + struct flb_config *config); +void flb_doris_conf_destroy(struct flb_out_doris *ctx); + +#endif diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index e902f7892ff..99323c48817 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -125,6 +125,7 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_S3 "out_s3.c") FLB_RT_TEST(FLB_OUT_TD "out_td.c") FLB_RT_TEST(FLB_OUT_INFLUXDB "out_influxdb.c") + FLB_RT_TEST(FLB_OUT_DORIS "out_doris.c") endif() diff --git a/tests/runtime/out_doris.c b/tests/runtime/out_doris.c new file mode 100644 index 00000000000..d91ab581a2d --- /dev/null +++ b/tests/runtime/out_doris.c @@ -0,0 +1,268 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2022 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" + +struct test_ctx { + flb_ctx_t *flb; /* Fluent Bit library context */ + int i_ffd; /* Input fd */ + int f_ffd; /* Filter fd (unused) */ + int o_ffd; /* Output fd */ +}; + +pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; +int num_output = 0; +static int get_output_num() +{ + int ret; + pthread_mutex_lock(&result_mutex); + ret = num_output; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +static void set_output_num(int num) +{ + pthread_mutex_lock(&result_mutex); + num_output = num; + pthread_mutex_unlock(&result_mutex); +} + +static void clear_output_num() +{ + set_output_num(0); +} + +struct str_list { + size_t size; + char **lists; +}; + +/* Callback to check expected results */ +static void cb_check_str_list(void *ctx, int ffd, int res_ret, + void *res_data, size_t res_size, void *data) +{ + char *p; + flb_sds_t out_line = res_data; + int num = get_output_num(); + size_t i; + struct str_list *l = (struct str_list *)data; + + if (!TEST_CHECK(res_data != NULL)) { + TEST_MSG("res_data is NULL"); + return; + } + + if (!TEST_CHECK(l != NULL)) { + TEST_MSG("l is NULL"); + flb_sds_destroy(out_line); + return; + } + + if(!TEST_CHECK(res_ret == 0)) { + TEST_MSG("callback ret=%d", res_ret); + } + if (!TEST_CHECK(res_data != NULL)) { + TEST_MSG("res_data is NULL"); + flb_sds_destroy(out_line); + return; + } + + for (i=0; isize; i++) { + p = strstr(out_line, l->lists[i]); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG(" Got :%s\n expect:%s", out_line, l->lists[i]); + } + } + set_output_num(num+1); + + flb_sds_destroy(out_line); +} + +static struct test_ctx *test_ctx_create() +{ + int i_ffd; + int o_ffd; + struct test_ctx *ctx = NULL; + + ctx = flb_malloc(sizeof(struct test_ctx)); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("malloc failed"); + flb_errno(); + return NULL; + } + + /* Service config */ + ctx->flb = flb_create(); + flb_service_set(ctx->flb, + "Flush", "0.200000000", + "Grace", "1", + "Log_Level", "error", + NULL); + + /* Input */ + i_ffd = flb_input(ctx->flb, (char *) "lib", NULL); + TEST_CHECK(i_ffd >= 0); + ctx->i_ffd = i_ffd; + + /* Output */ + o_ffd = flb_output(ctx->flb, (char *) "doris", NULL); + ctx->o_ffd = o_ffd; + + return ctx; +} + +static void test_ctx_destroy(struct test_ctx *ctx) +{ + TEST_CHECK(ctx != NULL); + + sleep(1); + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); +} + +void flb_test_json() +{ + struct test_ctx *ctx; + int ret; + int num; + + char *buf1 = "[1, {\"msg\":\"hello world\"}]"; + size_t size1 = strlen(buf1); + char *buf2 = "[2, {\"msg\":\"hello world\"}]"; + size_t size2 = strlen(buf2); + + char *expected_strs[] = {"[{\"date\":1,\"msg\":\"hello world\"},{\"date\":2,\"msg\":\"hello world\"}]"}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + clear_output_num(); + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "user", "admin", + "database", "d_fb", + "table", "t_fb", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set_test(ctx->flb, ctx->o_ffd, + "formatter", cb_check_str_list, + &expected, NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx->flb, ctx->i_ffd, (char *) buf1, size1); + TEST_CHECK(ret >= 0); + ret = flb_lib_push(ctx->flb, ctx->i_ffd, (char *) buf2, size2); + TEST_CHECK(ret >= 0); + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + test_ctx_destroy(ctx); +} + +void flb_test_time_key() +{ + struct test_ctx *ctx; + int ret; + int num; + + char *buf1 = "[1, {\"msg\":\"hello world\"}]"; + size_t size1 = strlen(buf1); + + char *expected_strs[] = {"{\"timestamp\":1,\"msg\":\"hello world\"}"}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + clear_output_num(); + + ctx = test_ctx_create(); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "user", "admin", + "database", "d_fb", + "table", "t_fb", + "time_key", "timestamp", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set_test(ctx->flb, ctx->o_ffd, + "formatter", cb_check_str_list, + &expected, NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx->flb, ctx->i_ffd, (char *) buf1, size1); + TEST_CHECK(ret >= 0); + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + test_ctx_destroy(ctx); +} + +/* Test list */ +TEST_LIST = { + {"json" , flb_test_json}, + {"time_key" , flb_test_time_key}, + {NULL, NULL} +}; \ No newline at end of file