Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Postgres-CDC] java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Struct.get(String)" because "struct" is null #7905

Open
2 of 3 tasks
cobolbaby opened this issue Oct 24, 2024 · 0 comments
Labels

Comments

@cobolbaby
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Struct.get(String)" because "struct" is null

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 60000
  read_limit.bytes_per_second=50000000
  read_limit.rows_per_second=10000
}

source {
  Postgres-CDC {
    # 源端数据库 JDBC Url
    base-url = "jdbc:postgresql://***/bdc"
    username = "***"
    password = "***"
    database-names = ["bdc"]
    schema-names = ["dw"]
    table-names = ["bdc.dw.fact_cpu_sn", "bdc.dw.dim_cpu_dn"]
    result_table_name = "SQT_PG_BDC_CDC_dw_cpu"
  }
}

transform {

}

sink {
  jdbc {
    # https://seatunnel.apache.org/docs/2.3.8/connector-v2/sink/Jdbc
    source_table_name = "SQT_PG_BDC_CDC_dw_cpu"
    url = "jdbc:postgresql://***/bdc_test"
    driver = "org.postgresql.Driver"
    user = "***"
    password = "***"

    # You need to configure both database and table
    database = "bdc_test"
    table = "dw_sqt.${table_name}"
    primary_keys = ["${primary_key}"]
    schema_save_mode = "IGNORE"
    generate_sink_sql = "true"
  }
}

Running Command

./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/pgcdc2pg_bdc_dw_cpu.conf

Error Exception

==> flink--standalonesession-0-0489b20f21c4.log <==
2024-10-24 18:24:50,728 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1) (8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5) switched from RUNNING to FAILED on 172.17.232.66:40409-cf2488 @ 0489b20f21c4 (dataPort=41559).
java.lang.NullPointerException: null
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88) ~[?:?]
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:223) ~[?:?]
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:188) ~[?:?]
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111) ~[?:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:198) ~[?:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:150) ~[?:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:101) ~[?:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61) ~[?:?]
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110) ~[?:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[?:?]
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114) ~[?:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[?:?]
	at org.apache.seatunnel.translation.flink.source.FlinkSourceReader.pollNext(FlinkSourceReader.java:80) ~[?:?]
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
	at java.lang.Thread.run(Unknown Source) ~[?:?]
2024-10-24 18:24:50,729 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Clearing resource requirements of job 515026a838b438877c64bef449244e53
2024-10-24 18:24:50,729 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 0 (#5) of source Source: Postgres-CDC-Source.
2024-10-24 18:24:50,729 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - 1 tasks will be restarted to recover the failed task 8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5.
2024-10-24 18:24:50,729 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job SeaTunnel (515026a838b438877c64bef449244e53) switched from state RUNNING to RESTARTING.

==> flink--taskexecutor-0-0489b20f21c4.log <==
2024-10-24 18:24:50,726 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
2024-10-24 18:24:50,726 INFO  org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited.
2024-10-24 18:24:50,726 INFO  org.apache.seatunnel.api.event.LoggingEventHandler           [] - log event: ReaderCloseEvent(createdTime=1729794290726, jobId=515026a838b438877c64bef449244e53, eventType=LIFECYCLE_READER_CLOSE)
2024-10-24 18:24:50,726 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1)#5 (8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5) switched from RUNNING to FAILED with failure cause:
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Struct.get(String)" because "struct" is null
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:223) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:188) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:198) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:150) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:101) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110) ~[blob_p-9113e096f42818c442db749bb2f798c1c2e41d04-20bab250076a2147589ecdb43456d7d8:2.3.8]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114) ~[blob_p-9113e096f42818c442db749bb2f798c1c2e41d04-20bab250076a2147589ecdb43456d7d8:2.3.8]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8]
	at org.apache.seatunnel.translation.flink.source.FlinkSourceReader.pollNext(FlinkSourceReader.java:80) ~[blob_p-9113e096f42818c442db749bb2f798c1c2e41d04-20bab250076a2147589ecdb43456d7d8:2.3.8]
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) [flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) [flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.18.1.jar:1.18.1]
	at java.lang.Thread.run(Unknown Source) [?:?]
2024-10-24 18:24:50,726 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1)#5 (8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5).
2024-10-24 18:24:50,727 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1)#5 8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5.

Zeta or Flink or Spark Version

Flink: 1.18.1

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@cobolbaby cobolbaby added the bug label Oct 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant