diff --git a/docs/en/connector-v2/sink/TDengine.md b/docs/en/connector-v2/sink/TDengine.md index 455e0effa20..d2fa9ff3092 100644 --- a/docs/en/connector-v2/sink/TDengine.md +++ b/docs/en/connector-v2/sink/TDengine.md @@ -6,6 +6,10 @@ Used to write data to TDengine. You need to create stable before running seatunnel task +***attention please*** ++ you don't need to config sub-table name, first column read is used as sub-table name ++ we will use last n columns as table tags, n is count of tags + ## Key features - [x] [exactly-once](../../concept/connector-v2-features.md) @@ -13,13 +17,14 @@ Used to write data to TDengine. You need to create stable before running seatunn ## Options -| name | type | required | default value | +| name | type | required | default value | |----------|--------|----------|---------------| | url | string | yes | - | | username | string | yes | - | | password | string | yes | - | | database | string | yes | | | stable | string | yes | - | +| fields | list | no | - | | timezone | string | no | UTC | ### url [string] @@ -54,18 +59,51 @@ the timeznoe of the TDengine sever, it's important to the ts field ## Example -### sink +### write 3 columns into table power.meter + +power.meter schema information: + +```sql +CREATE STABLE `meter` (`ts` TIMESTAMP, `latitude` DOUBLE, `longtitude` DOUBLE) TAGS (`tenant_id` INT) +``` +seatunnel config file ```hocon +source { + # This is a example input plugin **only for test and demonstrate the feature input plugin** + FakeSource { + row.num = 1 + schema = { + fields { + table_name = int + ts = timestamp + latitude = double + tenant_id = int + } + } + result_table_name = "fake" + } +} + sink { - TDengine { - url : "jdbc:TAOS-RS://localhost:6041/" - username : "root" - password : "taosdata" - database : "power2" - stable : "meters2" - timezone: UTC - } + TDengine { + source_table_name='fake' + url : "jdbc:TAOS-RS://localhost:6041/" + username : "root" + password : "taosdata" + database : "test" + fields: ['ts','latitude'] + stable : "meter" + timezone: UTC + } } ``` +execute query and print table data +```sql +taos> select * from meter; + ts | latitude | longtitude | tenant_id | +================================================================================================ + 2024-03-08 03:47:54.000 | 9.594743198982019e+306 | NULL | 1215981593 | + 2024-08-18 07:34:56.000 | 1.712320739538011e+308 | NULL | 1965348204 | + 2024-07-05 21:59:04.000 | 4.499682197436227e+307 | NULL | 203469706 | diff --git a/docs/en/connector-v2/source/TDengine.md b/docs/en/connector-v2/source/TDengine.md index a24744d5c17..a4e00beb62a 100644 --- a/docs/en/connector-v2/source/TDengine.md +++ b/docs/en/connector-v2/source/TDengine.md @@ -20,13 +20,14 @@ supports query SQL and can achieve projection effect. ## Options -| name | type | required | default value | +| name | type | required | default value | |-------------|--------|----------|---------------| | url | string | yes | - | | username | string | yes | - | | password | string | yes | - | | database | string | yes | | | stable | string | yes | - | +| fields | list | no | - | | lower_bound | long | yes | - | | upper_bound | long | yes | - | @@ -56,6 +57,10 @@ the database of the TDengine when you select the stable of the TDengine when you select +### fields [list] + +table columns selected from source table, such as: [ts,longtitude] + ### lower_bound [long] the lower_bound of the migration period @@ -76,6 +81,7 @@ source { password : "taosdata" database : "power" stable : "meters" + fields : [ts,longtitude] lower_bound : "2018-10-03 14:38:05.000" upper_bound : "2018-10-03 14:38:16.800" result_table_name = "tdengine_result" diff --git a/docs/zh/connector-v2/sink/TDengine.md b/docs/zh/connector-v2/sink/TDengine.md new file mode 100644 index 00000000000..4f57b03dad3 --- /dev/null +++ b/docs/zh/connector-v2/sink/TDengine.md @@ -0,0 +1,82 @@ +# TDengine + +> TDengine(Taos)数据接收器 + +## Description + +连接TDengine(Taos)数据库并将数据写入数据库 + +***需要注意的点*** + ++ 写入tdengine时无需指定子表名称,会将reader中读取出来的第一个字段当作子表名称,假如数据源是一个jdbc任务,query语句是select device_id,ts,power,tenant_id from xxx,那么写入的子表名称将会是device_id的值。 ++ 同理,写入时会将读出来的最后几列当作tag,假设写入的超级表指定了tag是tenant_id,那么在写入时会将tenant_id的值当作tag + +## Key features + +- [x] [批处理](../../concept/connector-v2-features.md) +- [ ] [流处理](../../concept/connector-v2-features.md) +- [x] [精确一次](../../concept/connector-v2-features.md) +- [x] [并行度](../../concept/connector-v2-features.md) + +## 源选项 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +|----------|--------|-------|---------------|----------------------------------------------------| +| url | string | 是 | - | 连接数据库使用的jdbc url,比如: jdbc:TAOS-RS://localhost:6041 | +| username | string | 是 | - | 连接数据库使用的用户名 | +| password | string | 是 | - | 连接数据库使用的密码 | +| database | string | 是 | | 数据写入的数据库名称 | +| stable | string | 是 | - | 数据写入的数据表名称 | +| fields | list | 否 | - | 写入的数据表字段 | +| timezone | string | 否 | - | 客户端时区 | + +## 任务示例 + +### 往power.meters表中写入ts和longtitude两个字段的数据 + +power.meters表结构: + +```sql +CREATE STABLE `meter` (`ts` TIMESTAMP, `latitude` DOUBLE, `longtitude` DOUBLE) TAGS (`tenant_id` INT) +``` +seatunnel 配置信息 +```hocon +source { + # This is a example input plugin **only for test and demonstrate the feature input plugin** + FakeSource { + row.num = 1 + schema = { + fields { + table_name = int + ts = timestamp + latitude = double + tenant_id = int + } + } + result_table_name = "fake" + } +} + +sink { + TDengine { + source_table_name='fake' + url : "jdbc:TAOS-RS://localhost:6041/" + username : "root" + password : "taosdata" + database : "test" + fields: ['ts','latitude'] + stable : "meter" + timezone: UTC + } +} +``` +查询tdengine数据库查看最新数据 + +```sql +taos> select * from meter; + ts | latitude | longtitude | tenant_id | +================================================================================================ + 2024-03-08 03:47:54.000 | 9.594743198982019e+306 | NULL | 1215981593 | + 2024-08-18 07:34:56.000 | 1.712320739538011e+308 | NULL | 1965348204 | + 2024-07-05 21:59:04.000 | 4.499682197436227e+307 | NULL | 203469706 | +``` diff --git a/docs/zh/connector-v2/source/TDengine.md b/docs/zh/connector-v2/source/TDengine.md new file mode 100644 index 00000000000..dca8ec6dade --- /dev/null +++ b/docs/zh/connector-v2/source/TDengine.md @@ -0,0 +1,48 @@ +# TDengine + +> TDengine(Taos)数据源连接器 + +## Description + +从TDengine(Taos)数据源读取数据,支持全表读取、指定列读取。 + +## Key features + +- [x] [批处理](../../concept/connector-v2-features.md) +- [ ] [流处理](../../concept/connector-v2-features.md) +- [x] [精确一次](../../concept/connector-v2-features.md) +- [x] [并行度](../../concept/connector-v2-features.md) + +## 源选项 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +|-------------|--------|-------|---------------|------------------------------------------------------------------------| +| url | string | 是 | - | 连接数据库使用的jdbc url,比如: jdbc:TAOS-RS://localhost:6041 | +| username | string | 是 | - | 连接数据库使用的用户名 | +| password | string | 是 | - | 连接数据库使用的密码 | +| database | string | 是 | | 数据源表所在的数据库名称 | +| stable | string | 是 | - | 数据源表的名称 | +| fields | list | 否 | - | 需要从数据源表中读取的字段 | +| lower_bound | long | 是 | - | 过滤条件。从数据源中读取数据时,数据的最早时间 | +| upper_bound | long | 是 | - | 过滤条件。从数据源中读取数据时,数据的最晚时间 | + +## 任务示例 + +### 从power.meters表中读取ts和longtitude两个字段的数据,指定的数据时间范围是[2018-10-03 14:38:05.000,2018-10-03 14:38:16.800) + +```hocon +source { + TDengine { + url : "jdbc:TAOS-RS://localhost:6041/" + username : "root" + password : "taosdata" + database : "power" + stable : "meters" + fields : [ts,longtitude] + lower_bound : "2018-10-03 14:38:05.000" + upper_bound : "2018-10-03 14:38:16.800" + result_table_name = "tdengine_result" + } +} +``` + diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java index 4eabb754cf0..76da911712d 100644 --- a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java @@ -22,9 +22,11 @@ import lombok.Data; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.FIELDS; import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.LOWER_BOUND; import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE; @@ -71,6 +73,10 @@ public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) { pluginConfig.hasPath(LOWER_BOUND) ? pluginConfig.getString(LOWER_BOUND) : null); tdengineSourceConfig.setTimezone( pluginConfig.hasPath(TIMEZONE) ? pluginConfig.getString(TIMEZONE) : "UTC"); + tdengineSourceConfig.setFields( + pluginConfig.hasPath(FIELDS) + ? pluginConfig.getStringList(FIELDS) + : new ArrayList<>()); return tdengineSourceConfig; } @@ -85,5 +91,6 @@ public static class ConfigNames { public static String TIMEZONE = "timezone"; public static String LOWER_BOUND = "lower_bound"; public static String UPPER_BOUND = "upper_bound"; + public static String FIELDS = "fields"; } } diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java index 7d4c64e63ba..0e69cc0a277 100644 --- a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java @@ -20,7 +20,8 @@ import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; public enum TDengineConnectorErrorCode implements SeaTunnelErrorCode { - LOAD_DRIVER_FAILED("TDengine-01", "Fail to create driver of class"); + LOAD_DRIVER_FAILED("TDengine-01", "Fail to create driver of class"), + FIELD_NOT_EXIST("TDengine-02", "Selected fields does not exist!"); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java index ed05e64937e..93e8adbe4ad 100644 --- a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; @@ -105,6 +106,17 @@ public void write(SeaTunnelRow element) { config.getStable(), tagValues, StringUtils.join(convertDataType(metrics), ",")); + if (CollectionUtils.isNotEmpty(config.getFields())) { + sql = + String.format( + "INSERT INTO %s using %s tags ( %s ) (%s) VALUES ( %s );", + element.getField(0), + config.getStable(), + tagValues, + String.join(",", config.getFields()), + StringUtils.join(convertDataType(metrics), ",")); + } + log.debug("sql content: {}", sql); final int rowCount = statement.executeUpdate(sql); if (rowCount == 0) { Throwables.propagateIfPossible( diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java index e72773781ab..4e5f6de2089 100644 --- a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java @@ -37,6 +37,7 @@ import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState; import org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTypeMapper; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.ArrayUtils; import com.google.auto.service.AutoService; @@ -128,6 +129,7 @@ private StableMetadata getStableMetadata(TDengineSourceConfig config) throws SQL List subTableNames = new ArrayList<>(); List fieldNames = new ArrayList<>(); List> fieldTypes = new ArrayList<>(); + List fields = config.getFields(); String jdbcUrl = String.join("", config.getUrl(), config.getDatabase()); @@ -152,8 +154,16 @@ private StableMetadata getStableMetadata(TDengineSourceConfig config) throws SQL if (timestampFieldName == null) { timestampFieldName = metaResultSet.getString(1); } - fieldNames.add(metaResultSet.getString(1)); - fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2))); + String fieldName = metaResultSet.getString(1); + if (CollectionUtils.isNotEmpty(fields)) { + if (fields.contains(fieldName)) { + fieldNames.add(fieldName); + fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2))); + } + } else { + fieldNames.add(fieldName); + fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2))); + } } while (subTableNameResultSet.next()) {