From 86ae9272c42dc0e64b88e02bea397a0974b556f2 Mon Sep 17 00:00:00 2001 From: Jast Date: Thu, 31 Oct 2024 13:55:23 +0800 Subject: [PATCH] [Fix][Connector-V2] Fix file binary format sync convert directory to file (#7942) --- .github/workflows/backend.yml | 2 +- .../file/hadoop/HadoopFileSystemProxy.java | 4 ++ .../source/reader/BinaryReadStrategy.java | 2 +- .../e2e/connector/file/ftp/FtpFileIT.java | 19 ++++++++ .../resources/text/ftp_to_ftp_for_binary.conf | 47 +++++++++++++++++++ 5 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_to_ftp_for_binary.conf diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 39efd998da1..67b2511f107 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -364,7 +364,7 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 120 + timeout-minutes: 180 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java index d4f1791a4e8..8f8aef05481 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java @@ -64,6 +64,10 @@ public boolean fileExist(@NonNull String filePath) throws IOException { return execute(() -> getFileSystem().exists(new Path(filePath))); } + public boolean isFile(@NonNull String filePath) throws IOException { + return execute(() -> getFileSystem().getFileStatus(new Path(filePath)).isFile()); + } + public void createFile(@NonNull String filePath) throws IOException { execute( () -> { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java index 3bbb90c774b..7849415b32d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java @@ -55,7 +55,7 @@ public void read(String path, String tableId, Collector output) throws IOException, FileConnectorException { try (InputStream inputStream = hadoopFileSystemProxy.getInputStream(path)) { String relativePath; - if (basePath.isFile()) { + if (hadoopFileSystemProxy.isFile(basePath.getAbsolutePath())) { relativePath = basePath.getName(); } else { relativePath = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java index 7b73c3ee706..26c32f247cd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java @@ -187,6 +187,25 @@ public void testFtpFileReadAndWriteForPassive(TestContainer container) deleteFileFromContainer(homePath); } + @TestTemplate + public void testFtpToFtpForBinary(TestContainer container) + throws IOException, InterruptedException { + + Container.ExecResult execResult = container.executeJob("/text/ftp_to_ftp_for_binary.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + String homePath = "/home/vsftpd/seatunnel/uploads/seatunnel"; + Assertions.assertEquals(1, getFileListFromContainer(homePath).size()); + + // Confirm data is written correctly + Container.ExecResult resultExecResult = + ftpContainer.execInContainer( + "sh", "-c", "awk 'END {print NR}' " + homePath + "/e2e.txt"); + Assertions.assertEquals("5", resultExecResult.getStdout().trim()); + + deleteFileFromContainer(homePath); + } + private void assertJobExecution(TestContainer container, String configPath, List params) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob(configPath, params); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_to_ftp_for_binary.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_to_ftp_for_binary.conf new file mode 100644 index 00000000000..f8b8b92cd5c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_to_ftp_for_binary.conf @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FtpFile { + host = "ftp" + port = 21 + user = seatunnel + password = pass + path= "/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt" + file_format_type= "binary" + encoding = "UTF-8" + } +} + + +sink { + FtpFile { + host = "ftp" + port = 21 + user = seatunnel + password = pass + tmp_path = "/upload-tmp/seatunnel" + path= "/uploads/seatunnel" + file_format_type= "binary" + encoding="UTF-8" + } +} \ No newline at end of file