Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement][connector-tdengine] support read column data from tdengine table or write column data to tdengine table #7894

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 48 additions & 10 deletions docs/en/connector-v2/sink/TDengine.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@

Used to write data to TDengine. You need to create stable before running seatunnel task

***attention please***
+ you don't need to config sub-table name, first column read is used as sub-table name
+ we will use last n columns as table tags, n is count of tags

## Key features

- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
| name | type | required | default value |
|----------|--------|----------|---------------|
| url | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | |
| stable | string | yes | - |
| fields | list | no | - |
| timezone | string | no | UTC |

### url [string]
Expand Down Expand Up @@ -54,18 +59,51 @@ the timeznoe of the TDengine sever, it's important to the ts field

## Example

### sink
### write 3 columns into table power.meter

power.meter schema information:

```sql
CREATE STABLE `meter` (`ts` TIMESTAMP, `latitude` DOUBLE, `longtitude` DOUBLE) TAGS (`tenant_id` INT)
```
seatunnel config file

```hocon
source {
# This is a example input plugin **only for test and demonstrate the feature input plugin**
FakeSource {
row.num = 1
schema = {
fields {
table_name = int
ts = timestamp
latitude = double
tenant_id = int
}
}
result_table_name = "fake"
}
}

sink {
TDengine {
url : "jdbc:TAOS-RS://localhost:6041/"
username : "root"
password : "taosdata"
database : "power2"
stable : "meters2"
timezone: UTC
}
TDengine {
source_table_name='fake'
url : "jdbc:TAOS-RS://localhost:6041/"
username : "root"
password : "taosdata"
database : "test"
fields: ['ts','latitude']
stable : "meter"
timezone: UTC
}
}
```
execute query and print table data
```sql
taos> select * from meter;
ts | latitude | longtitude | tenant_id |
================================================================================================
2024-03-08 03:47:54.000 | 9.594743198982019e+306 | NULL | 1215981593 |
2024-08-18 07:34:56.000 | 1.712320739538011e+308 | NULL | 1965348204 |
2024-07-05 21:59:04.000 | 4.499682197436227e+307 | NULL | 203469706 |

8 changes: 7 additions & 1 deletion docs/en/connector-v2/source/TDengine.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ supports query SQL and can achieve projection effect.

## Options

| name | type | required | default value |
| name | type | required | default value |
|-------------|--------|----------|---------------|
| url | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | |
| stable | string | yes | - |
| fields | list | no | - |
| lower_bound | long | yes | - |
| upper_bound | long | yes | - |

Expand Down Expand Up @@ -56,6 +57,10 @@ the database of the TDengine when you select

the stable of the TDengine when you select

### fields [list]

table columns selected from source table, such as: [ts,longtitude]

### lower_bound [long]

the lower_bound of the migration period
Expand All @@ -76,6 +81,7 @@ source {
password : "taosdata"
database : "power"
stable : "meters"
fields : [ts,longtitude]
lower_bound : "2018-10-03 14:38:05.000"
upper_bound : "2018-10-03 14:38:16.800"
result_table_name = "tdengine_result"
Expand Down
82 changes: 82 additions & 0 deletions docs/zh/connector-v2/sink/TDengine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# TDengine

> TDengine(Taos)数据接收器

## Description

连接TDengine(Taos)数据库并将数据写入数据库

***需要注意的点***

+ 写入tdengine时无需指定子表名称,会将reader中读取出来的第一个字段当作子表名称,假如数据源是一个jdbc任务,query语句是select device_id,ts,power,tenant_id from xxx,那么写入的子表名称将会是device_id的值。
+ 同理,写入时会将读出来的最后几列当作tag,假设写入的超级表指定了tag是tenant_id,那么在写入时会将tenant_id的值当作tag

## Key features

- [x] [批处理](../../concept/connector-v2-features.md)
- [ ] [流处理](../../concept/connector-v2-features.md)
- [x] [精确一次](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)

## 源选项

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|----------|--------|-------|---------------|----------------------------------------------------|
| url | string | 是 | - | 连接数据库使用的jdbc url,比如: jdbc:TAOS-RS://localhost:6041 |
| username | string | 是 | - | 连接数据库使用的用户名 |
| password | string | 是 | - | 连接数据库使用的密码 |
| database | string | 是 | | 数据写入的数据库名称 |
| stable | string | 是 | - | 数据写入的数据表名称 |
| fields | list | 否 | - | 写入的数据表字段 |
| timezone | string | 否 | - | 客户端时区 |

## 任务示例

### 往power.meters表中写入ts和longtitude两个字段的数据

power.meters表结构:

```sql
CREATE STABLE `meter` (`ts` TIMESTAMP, `latitude` DOUBLE, `longtitude` DOUBLE) TAGS (`tenant_id` INT)
```
seatunnel 配置信息
```hocon
source {
# This is a example input plugin **only for test and demonstrate the feature input plugin**
FakeSource {
row.num = 1
schema = {
fields {
table_name = int
ts = timestamp
latitude = double
tenant_id = int
}
}
result_table_name = "fake"
}
}

sink {
TDengine {
source_table_name='fake'
url : "jdbc:TAOS-RS://localhost:6041/"
username : "root"
password : "taosdata"
database : "test"
fields: ['ts','latitude']
stable : "meter"
timezone: UTC
}
}
```
查询tdengine数据库查看最新数据

```sql
taos> select * from meter;
ts | latitude | longtitude | tenant_id |
================================================================================================
2024-03-08 03:47:54.000 | 9.594743198982019e+306 | NULL | 1215981593 |
2024-08-18 07:34:56.000 | 1.712320739538011e+308 | NULL | 1965348204 |
2024-07-05 21:59:04.000 | 4.499682197436227e+307 | NULL | 203469706 |
```
48 changes: 48 additions & 0 deletions docs/zh/connector-v2/source/TDengine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# TDengine

> TDengine(Taos)数据源连接器

## Description

从TDengine(Taos)数据源读取数据,支持全表读取、指定列读取。

## Key features

- [x] [批处理](../../concept/connector-v2-features.md)
- [ ] [流处理](../../concept/connector-v2-features.md)
- [x] [精确一次](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)

## 源选项

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|-------------|--------|-------|---------------|------------------------------------------------------------------------|
| url | string | 是 | - | 连接数据库使用的jdbc url,比如: jdbc:TAOS-RS://localhost:6041 |
| username | string | 是 | - | 连接数据库使用的用户名 |
| password | string | 是 | - | 连接数据库使用的密码 |
| database | string | 是 | | 数据源表所在的数据库名称 |
| stable | string | 是 | - | 数据源表的名称 |
| fields | list | 否 | - | 需要从数据源表中读取的字段 |
| lower_bound | long | 是 | - | 过滤条件。从数据源中读取数据时,数据的最早时间 |
| upper_bound | long | 是 | - | 过滤条件。从数据源中读取数据时,数据的最晚时间 |

## 任务示例

### 从power.meters表中读取ts和longtitude两个字段的数据,指定的数据时间范围是[2018-10-03 14:38:05.000,2018-10-03 14:38:16.800)

```hocon
source {
TDengine {
url : "jdbc:TAOS-RS://localhost:6041/"
username : "root"
password : "taosdata"
database : "power"
stable : "meters"
fields : [ts,longtitude]
lower_bound : "2018-10-03 14:38:05.000"
upper_bound : "2018-10-03 14:38:16.800"
result_table_name = "tdengine_result"
}
}
```

Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import lombok.Data;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.LOWER_BOUND;
import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
Expand Down Expand Up @@ -71,6 +73,10 @@ public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
pluginConfig.hasPath(LOWER_BOUND) ? pluginConfig.getString(LOWER_BOUND) : null);
tdengineSourceConfig.setTimezone(
pluginConfig.hasPath(TIMEZONE) ? pluginConfig.getString(TIMEZONE) : "UTC");
tdengineSourceConfig.setFields(
pluginConfig.hasPath(FIELDS)
? pluginConfig.getStringList(FIELDS)
: new ArrayList<>());

return tdengineSourceConfig;
}
Expand All @@ -85,5 +91,6 @@ public static class ConfigNames {
public static String TIMEZONE = "timezone";
public static String LOWER_BOUND = "lower_bound";
public static String UPPER_BOUND = "upper_bound";
public static String FIELDS = "fields";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;

public enum TDengineConnectorErrorCode implements SeaTunnelErrorCode {
LOAD_DRIVER_FAILED("TDengine-01", "Fail to create driver of class");
LOAD_DRIVER_FAILED("TDengine-01", "Fail to create driver of class"),
FIELD_NOT_EXIST("TDengine-02", "Selected fields does not exist!");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -105,6 +106,17 @@ public void write(SeaTunnelRow element) {
config.getStable(),
tagValues,
StringUtils.join(convertDataType(metrics), ","));
if (CollectionUtils.isNotEmpty(config.getFields())) {
sql =
String.format(
"INSERT INTO %s using %s tags ( %s ) (%s) VALUES ( %s );",
element.getField(0),
config.getStable(),
tagValues,
String.join(",", config.getFields()),
StringUtils.join(convertDataType(metrics), ","));
}
log.debug("sql content: {}", sql);
final int rowCount = statement.executeUpdate(sql);
if (rowCount == 0) {
Throwables.propagateIfPossible(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
import org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTypeMapper;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;

import com.google.auto.service.AutoService;
Expand Down Expand Up @@ -128,6 +129,7 @@ private StableMetadata getStableMetadata(TDengineSourceConfig config) throws SQL
List<String> subTableNames = new ArrayList<>();
List<String> fieldNames = new ArrayList<>();
List<SeaTunnelDataType<?>> fieldTypes = new ArrayList<>();
List<String> fields = config.getFields();

String jdbcUrl = String.join("", config.getUrl(), config.getDatabase());

Expand All @@ -152,8 +154,16 @@ private StableMetadata getStableMetadata(TDengineSourceConfig config) throws SQL
if (timestampFieldName == null) {
timestampFieldName = metaResultSet.getString(1);
}
fieldNames.add(metaResultSet.getString(1));
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
String fieldName = metaResultSet.getString(1);
if (CollectionUtils.isNotEmpty(fields)) {
if (fields.contains(fieldName)) {
fieldNames.add(fieldName);
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
}
} else {
fieldNames.add(fieldName);
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
}
}

while (subTableNameResultSet.next()) {
Expand Down
Loading