Skip to content

Commit

Permalink
[Fix] [Flink/Spark] Fix transform stream choose wrong when not config…
Browse files Browse the repository at this point in the history
…ure source_table_name (#7907)

Co-authored-by: njh_cmss <[email protected]>
  • Loading branch information
CosmosNi and njh_cmss authored Oct 26, 2024
1 parent 18dcca3 commit a6954ed
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
throws TaskExecuteException {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
DataStreamTableInfo input = upstreamDataStreams.get(0);
DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
throws TaskExecuteException {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
DataStreamTableInfo input = upstreamDataStreams.get(0);
DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
throws TaskExecuteException {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
DatasetTableInfo input = upstreamDataStreams.get(0);
DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1);
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
DatasetTableInfo datasetTableInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
throws TaskExecuteException {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
DatasetTableInfo input = upstreamDataStreams.get(0);
DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1);
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
DatasetTableInfo datasetTableInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,9 @@ public class TestFieldMapperIT extends TestSuiteBase {
public void testFieldMapper(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/field_mapper_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());

Container.ExecResult execResult1 =
container.executeJob("/field_mapper_transform_without_result_table.conf");
Assertions.assertEquals(0, execResult1.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
age = "int"
string1 = "string"
int1 = "int"
c_bigint = "bigint"
c_row = {
c_row = {
c_int = int
}
}
}
}
}
}

transform {
FieldMapper {
source_table_name = "fake"
result_table_name = "fake1"
field_mapper = {
id = id
age = age_as
int1 = int1_as
name = name
c_row = c_row
}
}
}

sink {
Assert {
rules =
{
row_rules = [
{
rule_type = MIN_ROW
rule_value = 100
}
],
field_rules = [
{
field_name = id
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = age_as
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = int1_as
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
}
}

0 comments on commit a6954ed

Please sign in to comment.