diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md index 6d6ec5ea8db..3eadcd3a69e 100644 --- a/docs/en/connector-v2/source/SftpFile.md +++ b/docs/en/connector-v2/source/SftpFile.md @@ -264,4 +264,44 @@ sink { } } ``` +### Multiple Table + +```hocon + +SftpFile { + tables_configs = [ + { + schema { + table = "student" + fields { + name = string + age = int + } + } + path = "/tmp/seatunnel/sink/text" + host = "192.168.31.48" + port = 21 + user = tyrantlucifer + password = tianchao + file_format_type = "parquet" + }, + { + schema { + table = "teacher" + fields { + name = string + age = int + } + } + path = "/tmp/seatunnel/sink/text" + host = "192.168.31.48" + port = 21 + user = tyrantlucifer + password = tianchao + file_format_type = "parquet" + } + ] +} + +``` diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/MultipleTableSFTPFileSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/MultipleTableSFTPFileSourceConfig.java new file mode 100644 index 00000000000..4ae5f89f8a3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/MultipleTableSFTPFileSourceConfig.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.sftp.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig; + +public class MultipleTableSFTPFileSourceConfig extends BaseMultipleTableFileSourceConfig { + + public MultipleTableSFTPFileSourceConfig(ReadonlyConfig ossFileSourceRootConfig) { + super(ossFileSourceRootConfig); + } + + @Override + public BaseFileSourceConfig getBaseSourceConfig(ReadonlyConfig readonlyConfig) { + return new SFTPFileSourceConfig(readonlyConfig); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SFTPFileSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SFTPFileSourceConfig.java new file mode 100644 index 00000000000..b2d2c3084ea --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SFTPFileSourceConfig.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.sftp.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; + +import lombok.Getter; + +@Getter +public class SFTPFileSourceConfig extends BaseFileSourceConfig { + + private static final long serialVersionUID = 1L; + + @Override + public HadoopConf getHadoopConfig() { + return SftpConf.buildWithConfig(getBaseFileSourceConfig()); + } + + @Override + public String getPluginName() { + return FileSystemType.SFTP.getFileSystemPluginName(); + } + + public SFTPFileSourceConfig(ReadonlyConfig readonlyConfig) { + super(readonlyConfig); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java index 04e6620a757..bc984383a6b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java @@ -17,121 +17,18 @@ package org.apache.seatunnel.connectors.seatunnel.file.sftp.source; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; -import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; -import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf; -import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions; -import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource; -import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; - -import com.google.auto.service.AutoService; +import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.MultipleTableSFTPFileSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource; -import java.io.IOException; +public class SftpFileSource extends BaseMultipleTableFileSource { + public SftpFileSource(ReadonlyConfig config) { + super(new MultipleTableSFTPFileSourceConfig(config)); + } -@AutoService(SeaTunnelSource.class) -public class SftpFileSource extends BaseFileSource { @Override public String getPluginName() { return FileSystemType.SFTP.getFileSystemPluginName(); } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, - SftpConfigOptions.FILE_PATH.key(), - SftpConfigOptions.FILE_FORMAT_TYPE.key(), - SftpConfigOptions.SFTP_HOST.key(), - SftpConfigOptions.SFTP_PORT.key(), - SftpConfigOptions.SFTP_USER.key(), - SftpConfigOptions.SFTP_PASSWORD.key()); - if (!result.isSuccess()) { - throw new FileConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - FileFormat fileFormat = - FileFormat.valueOf( - pluginConfig - .getString(SftpConfigOptions.FILE_FORMAT_TYPE.key()) - .toUpperCase()); - if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) { - throw new FileConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - "Sftp file source connector only support read [text, csv, json, xml] files"); - } - String path = pluginConfig.getString(SftpConfigOptions.FILE_PATH.key()); - hadoopConf = SftpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig)); - readStrategy = - ReadStrategyFactory.of( - pluginConfig.getString(SftpConfigOptions.FILE_FORMAT_TYPE.key())); - readStrategy.setPluginConfig(pluginConfig); - readStrategy.init(hadoopConf); - try { - filePaths = readStrategy.getFileNamesByPath(path); - } catch (IOException e) { - String errorMsg = String.format("Get file list from this path [%s] failed", path); - throw new FileConnectorException( - FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e); - } - // support user-defined schema - // only json csv text type support user-defined schema now - if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) { - switch (fileFormat) { - case CSV: - case TEXT: - case JSON: - case EXCEL: - case XML: - SeaTunnelRowType userDefinedSchema = - CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); - readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); - rowType = readStrategy.getActualSeaTunnelRowTypeInfo(); - break; - case ORC: - case PARQUET: - case BINARY: - throw new FileConnectorException( - CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, - "SeaTunnel does not support user-defined schema for [parquet, orc, binary] files"); - default: - // never got in there - throw new FileConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - "SeaTunnel does not supported this file format"); - } - } else { - if (filePaths.isEmpty()) { - // When the directory is empty, distribute default behavior schema - rowType = CatalogTableUtil.buildSimpleTextSchema(); - return; - } - try { - rowType = readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0)); - } catch (FileConnectorException e) { - String errorMsg = - String.format("Get table schema from file [%s] failed", filePaths.get(0)); - throw new FileConnectorException( - CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, errorMsg, e); - } - } - } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java index c0f6aefda30..b4d1d1c63f5 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java @@ -19,9 +19,12 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; @@ -29,6 +32,7 @@ import com.google.auto.service.AutoService; +import java.io.Serializable; import java.util.Arrays; @AutoService(Factory.class) @@ -41,12 +45,12 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(SftpConfigOptions.FILE_PATH) - .required(SftpConfigOptions.SFTP_HOST) - .required(SftpConfigOptions.SFTP_PORT) - .required(SftpConfigOptions.SFTP_USER) - .required(SftpConfigOptions.SFTP_PASSWORD) - .required(BaseSourceConfigOptions.FILE_FORMAT_TYPE) + .optional(SftpConfigOptions.FILE_PATH) + .optional(SftpConfigOptions.SFTP_HOST) + .optional(SftpConfigOptions.SFTP_PORT) + .optional(SftpConfigOptions.SFTP_USER) + .optional(SftpConfigOptions.SFTP_PASSWORD) + .optional(BaseSourceConfigOptions.FILE_FORMAT_TYPE) .conditional( BaseSourceConfigOptions.FILE_FORMAT_TYPE, FileFormat.TEXT, @@ -75,6 +79,12 @@ public OptionRule optionRule() { .build(); } + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> (SeaTunnelSource) new SftpFileSource(context.getOptions()); + } + @Override public Class getSourceClass() { return SftpFileSource.class; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java index 0f9f7dc61d0..2ac185aabbd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java @@ -146,6 +146,15 @@ public void testSftpFileReadAndWrite(TestContainer container) helper.execute("/xml/fake_to_sftp_file_xml.conf"); // test read sftp xml file helper.execute("/xml/sftp_file_xml_to_assert.conf"); + // test sftp source support multipleTable + String homePath = "/home/seatunnel"; + String sink01 = "/tmp/multipleSource/seatunnel/json/fake01"; + String sink02 = "/tmp/multipleSource/seatunnel/json/fake02"; + deleteFileFromContainer(homePath + sink01); + deleteFileFromContainer(homePath + sink02); + helper.execute("/json/sftp_file_json_to_assert_with_multipletable.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + sink01).size(), 1); + Assertions.assertEquals(getFileListFromContainer(homePath + sink02).size(), 1); } @TestTemplate diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_file_json_to_assert_with_multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_file_json_to_assert_with_multipletable.conf new file mode 100644 index 00000000000..ae2bbb6120e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_file_json_to_assert_with_multipletable.conf @@ -0,0 +1,143 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + SftpFile { + tables_configs = [ + { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/seatunnel/read/json" + file_format_type = "json" + schema = { + table = "fake01" + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + C_MAP = "map" + C_ARRAY = "array" + C_STRING = string + C_BOOLEAN = boolean + C_TINYINT = tinyint + C_SMALLINT = smallint + C_INT = int + C_BIGINT = bigint + C_FLOAT = float + C_DOUBLE = double + C_BYTES = bytes + C_DATE = date + C_DECIMAL = "decimal(38, 18)" + C_TIMESTAMP = timestamp + } + } + } + }, + { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/seatunnel/read/json" + file_format_type = "json" + schema = { + table = "fake02" + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + C_MAP = "map" + C_ARRAY = "array" + C_STRING = string + C_BOOLEAN = boolean + C_TINYINT = tinyint + C_SMALLINT = smallint + C_INT = int + C_BIGINT = bigint + C_FLOAT = float + C_DOUBLE = double + C_BYTES = bytes + C_DATE = date + C_DECIMAL = "decimal(38, 18)" + C_TIMESTAMP = timestamp + } + } + } + } + ] + result_table_name = "sftp" + } +} + +sink { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/multipleSource/seatunnel/json/${table_name}" + source_table_name = "sftp" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + "schema_save_mode"="RECREATE_SCHEMA" + "data_save_mode"="DROP_DATA" + } +} \ No newline at end of file