From a6954ed98ba283f4d82e553c030aabb6d5ed41b5 Mon Sep 17 00:00:00 2001 From: CosmosNi <40288034+CosmosNi@users.noreply.github.com> Date: Sat, 26 Oct 2024 21:19:26 +0800 Subject: [PATCH] [Fix] [Flink/Spark] Fix transform stream choose wrong when not configure source_table_name (#7907) Co-authored-by: njh_cmss --- .../flink/execution/SinkExecuteProcessor.java | 2 +- .../flink/execution/SinkExecuteProcessor.java | 2 +- .../spark/execution/SinkExecuteProcessor.java | 2 +- .../spark/execution/SinkExecuteProcessor.java | 2 +- .../e2e/transform/TestFieldMapperIT.java | 4 + ...mapper_transform_without_result_table.conf | 111 ++++++++++++++++++ 6 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform_without_result_table.conf diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index 3b0a7db8e89..b6de6d2cad2 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -98,7 +98,7 @@ public List execute(List upstreamDataS throws TaskExecuteException { SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); - DataStreamTableInfo input = upstreamDataStreams.get(0); + DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); for (int i = 0; i < plugins.size(); i++) { Config sinkConfig = pluginConfigs.get(i); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index cc167239292..c7d4e1f8800 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -99,7 +99,7 @@ public List execute(List upstreamDataS throws TaskExecuteException { SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); - DataStreamTableInfo input = upstreamDataStreams.get(0); + DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); for (int i = 0; i < plugins.size(); i++) { Config sinkConfig = pluginConfigs.get(i); diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index d5529d4ba08..6c3aabe691d 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -89,7 +89,7 @@ public List execute(List upstreamDataStreams throws TaskExecuteException { SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - DatasetTableInfo input = upstreamDataStreams.get(0); + DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); for (int i = 0; i < plugins.size(); i++) { Config sinkConfig = pluginConfigs.get(i); DatasetTableInfo datasetTableInfo = diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 2763e3f949e..4cccedeb79f 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -90,7 +90,7 @@ public List execute(List upstreamDataStreams throws TaskExecuteException { SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - DatasetTableInfo input = upstreamDataStreams.get(0); + DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); for (int i = 0; i < plugins.size(); i++) { Config sinkConfig = pluginConfigs.get(i); DatasetTableInfo datasetTableInfo = diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java index d1d61c9387c..aeb9ebad685 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java @@ -31,5 +31,9 @@ public class TestFieldMapperIT extends TestSuiteBase { public void testFieldMapper(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob("/field_mapper_transform.conf"); Assertions.assertEquals(0, execResult.getExitCode()); + + Container.ExecResult execResult1 = + container.executeJob("/field_mapper_transform_without_result_table.conf"); + Assertions.assertEquals(0, execResult1.getExitCode()); } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform_without_result_table.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform_without_result_table.conf new file mode 100644 index 00000000000..f73b06737fb --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/field_mapper_transform_without_result_table.conf @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + age = "int" + string1 = "string" + int1 = "int" + c_bigint = "bigint" + c_row = { + c_row = { + c_int = int + } + } + } + } + } +} + +transform { + FieldMapper { + source_table_name = "fake" + result_table_name = "fake1" + field_mapper = { + id = id + age = age_as + int1 = int1_as + name = name + c_row = c_row + } + } +} + +sink { + Assert { + rules = + { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ], + field_rules = [ + { + field_name = id + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = age_as + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = int1_as + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +}