Skip to content

Commit

Permalink
[Feature][Connector-V2]Sftp file source support multiple table (#7824)
Browse files Browse the repository at this point in the history
  • Loading branch information
chl-wxp authored Oct 14, 2024
1 parent 511c8af commit cfb8760
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 115 deletions.
40 changes: 40 additions & 0 deletions docs/en/connector-v2/source/SftpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,44 @@ sink {
}
}
```
### Multiple Table

```hocon
SftpFile {
tables_configs = [
{
schema {
table = "student"
fields {
name = string
age = int
}
}
path = "/tmp/seatunnel/sink/text"
host = "192.168.31.48"
port = 21
user = tyrantlucifer
password = tianchao
file_format_type = "parquet"
},
{
schema {
table = "teacher"
fields {
name = string
age = int
}
}
path = "/tmp/seatunnel/sink/text"
host = "192.168.31.48"
port = 21
user = tyrantlucifer
password = tianchao
file_format_type = "parquet"
}
]
}
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.sftp.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;

public class MultipleTableSFTPFileSourceConfig extends BaseMultipleTableFileSourceConfig {

public MultipleTableSFTPFileSourceConfig(ReadonlyConfig ossFileSourceRootConfig) {
super(ossFileSourceRootConfig);
}

@Override
public BaseFileSourceConfig getBaseSourceConfig(ReadonlyConfig readonlyConfig) {
return new SFTPFileSourceConfig(readonlyConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.sftp.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;

import lombok.Getter;

@Getter
public class SFTPFileSourceConfig extends BaseFileSourceConfig {

private static final long serialVersionUID = 1L;

@Override
public HadoopConf getHadoopConfig() {
return SftpConf.buildWithConfig(getBaseFileSourceConfig());
}

@Override
public String getPluginName() {
return FileSystemType.SFTP.getFileSystemPluginName();
}

public SFTPFileSourceConfig(ReadonlyConfig readonlyConfig) {
super(readonlyConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,121 +17,18 @@

package org.apache.seatunnel.connectors.seatunnel.file.sftp.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;

import com.google.auto.service.AutoService;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.MultipleTableSFTPFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;

import java.io.IOException;
public class SftpFileSource extends BaseMultipleTableFileSource {
public SftpFileSource(ReadonlyConfig config) {
super(new MultipleTableSFTPFileSourceConfig(config));
}

@AutoService(SeaTunnelSource.class)
public class SftpFileSource extends BaseFileSource {
@Override
public String getPluginName() {
return FileSystemType.SFTP.getFileSystemPluginName();
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
SftpConfigOptions.FILE_PATH.key(),
SftpConfigOptions.FILE_FORMAT_TYPE.key(),
SftpConfigOptions.SFTP_HOST.key(),
SftpConfigOptions.SFTP_PORT.key(),
SftpConfigOptions.SFTP_USER.key(),
SftpConfigOptions.SFTP_PASSWORD.key());
if (!result.isSuccess()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
FileFormat fileFormat =
FileFormat.valueOf(
pluginConfig
.getString(SftpConfigOptions.FILE_FORMAT_TYPE.key())
.toUpperCase());
if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"Sftp file source connector only support read [text, csv, json, xml] files");
}
String path = pluginConfig.getString(SftpConfigOptions.FILE_PATH.key());
hadoopConf = SftpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig));
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(SftpConfigOptions.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
readStrategy.init(hadoopConf);
try {
filePaths = readStrategy.getFileNamesByPath(path);
} catch (IOException e) {
String errorMsg = String.format("Get file list from this path [%s] failed", path);
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
}
// support user-defined schema
// only json csv text type support user-defined schema now
if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
case EXCEL:
case XML:
SeaTunnelRowType userDefinedSchema =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
case PARQUET:
case BINARY:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
"SeaTunnel does not support user-defined schema for [parquet, orc, binary] files");
default:
// never got in there
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"SeaTunnel does not supported this file format");
}
} else {
if (filePaths.isEmpty()) {
// When the directory is empty, distribute default behavior schema
rowType = CatalogTableUtil.buildSimpleTextSchema();
return;
}
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
} catch (FileConnectorException e) {
String errorMsg =
String.format("Get table schema from file [%s] failed", filePaths.get(0));
throw new FileConnectorException(
CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions;

import com.google.auto.service.AutoService;

import java.io.Serializable;
import java.util.Arrays;

@AutoService(Factory.class)
Expand All @@ -41,12 +45,12 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(SftpConfigOptions.FILE_PATH)
.required(SftpConfigOptions.SFTP_HOST)
.required(SftpConfigOptions.SFTP_PORT)
.required(SftpConfigOptions.SFTP_USER)
.required(SftpConfigOptions.SFTP_PASSWORD)
.required(BaseSourceConfigOptions.FILE_FORMAT_TYPE)
.optional(SftpConfigOptions.FILE_PATH)
.optional(SftpConfigOptions.SFTP_HOST)
.optional(SftpConfigOptions.SFTP_PORT)
.optional(SftpConfigOptions.SFTP_USER)
.optional(SftpConfigOptions.SFTP_PASSWORD)
.optional(BaseSourceConfigOptions.FILE_FORMAT_TYPE)
.conditional(
BaseSourceConfigOptions.FILE_FORMAT_TYPE,
FileFormat.TEXT,
Expand Down Expand Up @@ -75,6 +79,12 @@ public OptionRule optionRule() {
.build();
}

@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
return () -> (SeaTunnelSource<T, SplitT, StateT>) new SftpFileSource(context.getOptions());
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return SftpFileSource.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ public void testSftpFileReadAndWrite(TestContainer container)
helper.execute("/xml/fake_to_sftp_file_xml.conf");
// test read sftp xml file
helper.execute("/xml/sftp_file_xml_to_assert.conf");
// test sftp source support multipleTable
String homePath = "/home/seatunnel";
String sink01 = "/tmp/multipleSource/seatunnel/json/fake01";
String sink02 = "/tmp/multipleSource/seatunnel/json/fake02";
deleteFileFromContainer(homePath + sink01);
deleteFileFromContainer(homePath + sink02);
helper.execute("/json/sftp_file_json_to_assert_with_multipletable.conf");
Assertions.assertEquals(getFileListFromContainer(homePath + sink01).size(), 1);
Assertions.assertEquals(getFileListFromContainer(homePath + sink02).size(), 1);
}

@TestTemplate
Expand Down
Loading

0 comments on commit cfb8760

Please sign in to comment.