Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement][connector-tdengine] support read column data from tdengine table or write column data to tdengine table #7894

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from

Conversation

alextinng
Copy link
Contributor

Purpose of this pull request

close #7886

Does this PR introduce any user-facing change?

no

How was this patch tested?

case 1: column count(reader) < column count(writer)

result: operation failed. according to config, seatunnel try write 2 columns into table but only 1 column provided(except table_name and tags)

st config:

env {
  execution.parallelism = 1
  job.mode = "BATCH"
}

source {
  # This is a example input plugin **only for test and demonstrate the feature input plugin**
  FakeSource {
    row.num = 1
    schema = {
      fields {
        table_name = int
        ts = timestamp
        tenant_id = int
      }
    }
    result_table_name = "fake"
  }
}

sink {
        TDengine {
source_table_name='fake'
          url : "jdbc:TAOS-RS://localhost:6041/"
          username : "root"
          password : "taosdata"
          database : "test"
fields: ['ts','latitude']
          stable : "meter"
          timezone: UTC
        }
}

test log:

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.sql.SQLException: TDengine ERROR (216): sql: INSERT INTO 534905432 using meter tags ( 1.9691318559072168E307 ) ('ts','latitude') VALUES ( '2024-09-24 02:27:45.000' );, desc: syntax error near '1.9691318559072168e307 ) ('ts','latitude') values ( '2024-09-24' (invalid int data)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:253)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:66)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
        at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: TDengine ERROR (216): sql: INSERT INTO 534905432 using meter tags ( 1.9691318559072168E307 ) ('ts','latitude') VALUES ( '2024-09-24 02:27:45.000' );, desc: syntax error near '1.9691318559072168e307 ) ('ts','latitude') values ( '2024-09-24' (invalid int data)
        at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76)
        at com.taosdata.jdbc.rs.RestfulStatement.execute(RestfulStatement.java:77)
        at com.taosdata.jdbc.rs.RestfulStatement.executeUpdate(RestfulStatement.java:46)
        at org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:109)
        at org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:51)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:249)
        ... 17 more

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)

case 2: column count(reader) == column count(writer)

result: succeed write data into database

st config:

env {
  execution.parallelism = 1
  job.mode = "BATCH"
}

source {
  # This is a example input plugin **only for test and demonstrate the feature input plugin**
  FakeSource {
    row.num = 1
    schema = {
      fields {
        table_name = int
        ts = timestamp
        lat = double
        tenant_id = int
      }
    }
    result_table_name = "fake"
  }
}

sink {
        TDengine {
source_table_name='fake'
          url : "jdbc:TAOS-RS://localhost:6041/"
          username : "root"
          password : "taosdata"
          database : "test"
fields: ['ts','latitude']
          stable : "meter"
          timezone: UTC
        }
}

test log:

2024-10-23 20:07:54,149 DEBUG [o.a.s.c.s.f.s.FakeSourceReader] [hz.main.generic-operation.thread-20] - reader 0 add splits [FakeSourceSplit(tableId=fake, splitId=0, rowNum=1)]
2024-10-23 20:07:54,464 DEBUG [c.h.s.i.o.i.InvocationMonitor ] [hz.main.InvocationMonitorThread] - [localhost]:5801 [seatunnel-868391] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2024-10-23 20:07:55,098 INFO  [o.a.s.c.s.f.s.FakeSourceReader] [BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, taskGroupId=30000}] - 1 rows of data have been generated in split(fake_0) for table fake. Generation time: 1729685275089
2024-10-23 20:07:55,098 INFO  [o.a.s.c.s.f.s.FakeSourceReader] [BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, taskGroupId=30000}] - Closed the bounded fake source
2024-10-23 20:07:55,107 DEBUG [a.s.c.s.t.s.TDengineSinkWriter] [BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, taskGroupId=30000}] - sql content: INSERT INTO 1173840892 using meter tags ( 1730702775 ) ('ts','latitude') VALUES ( '2024-08-04 16:11:32.000',6.408192514684265E307 );
2024-10-23 20:07:55,109 DEBUG [o.a.h.c.p.RequestAddCookies   ] [BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, taskGroupId=30000}] - CookieSpec selected: default
2024-10-23 20:07:55,109 DEBUG [o.a.h.c.p.RequestAuthCache    ] [BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, taskGroupId=30000}] - Auth cache not set in the context
2024-10-23 20:07:55,109 DEBUG [ingHttpClientConnectionManager] [BlockingWorker-TaskGroupLocation{jobId=901441798688210945, pipelineId=1, taskGroupId=30000}] - Connection request: [route: {}->http://10.5.2.26:6041][total available: 1; route allocated: 1 of 20; total allocated: 1 of 200]

case 3: column count(reader) > column count(writer)

result: operation failed. according to config, seatunnel shuld write 2 columns into table but 3 column provided(except table_name and tags), currently seatunnel does not support find column value by column name, so a wrong sql is generated.

st config:

env {
  execution.parallelism = 1
  job.mode = "BATCH"
}

source {
  # This is a example input plugin **only for test and demonstrate the feature input plugin**
  FakeSource {
    row.num = 1
    schema = {
      fields {
        table_name = int
        ts = timestamp
        test_field = string
        lat = double
        tenant_id = int
      }
    }
    result_table_name = "fake"
  }
}

sink {
        TDengine {
source_table_name='fake'
          url : "jdbc:TAOS-RS://localhost:6041/"
          username : "root"
          password : "taosdata"
          database : "test"
fields: ['ts','latitude']
          stable : "meter"
          timezone: UTC
        }
}

test log:

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.sql.SQLException: TDengine ERROR (216): sql: INSERT INTO 1574017200 using meter tags ( 18329617 ) ('ts','latitude') VALUES ( '2024-07-13 19:04:23.000','douag',1.6443365459008203E308 );, desc: syntax error near 'douag' (illegal double data)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:253)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:66)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
        at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: TDengine ERROR (216): sql: INSERT INTO 1574017200 using meter tags ( 18329617 ) ('ts','latitude') VALUES ( '2024-07-13 19:04:23.000','douag',1.6443365459008203E308 );, desc: syntax error near 'douag' (illegal double data)
        at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76)
        at com.taosdata.jdbc.rs.RestfulStatement.execute(RestfulStatement.java:77)
        at com.taosdata.jdbc.rs.RestfulStatement.executeUpdate(RestfulStatement.java:46)
        at org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:109)
        at org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter.write(TDengineSinkWriter.java:51)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:249)
        ... 17 more

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
        ... 2 more

Check list

@alextinng alextinng force-pushed the support_query_subtable_and_fields branch from 5c876c1 to 78e56d1 Compare October 24, 2024 09:29
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test case for this?

@alextinng alextinng changed the title Support query subtable and fields [Improvement][connector-tdengine] support read column data from tdengine table Oct 25, 2024
@alextinng alextinng changed the title [Improvement][connector-tdengine] support read column data from tdengine table [Improvement][connector-tdengine] support read column data from tdengine table or write column data to tdengine table Oct 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improvement][connector-tdengine] support read column data from tdengine table
2 participants