diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java index 051507e8e3f..6194e7c1e0f 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java @@ -153,6 +153,13 @@ private void updateRequestParam(PageInfo pageInfo) { .put(pageInfo.getPageField(), pageInfo.getPageIndex().toString()); } + @Override + public void pollNext(Collector output) throws Exception { + synchronized (output.getCheckpointLock()) { + internalPollNext(output); + } + } + @Override public void internalPollNext(Collector output) throws Exception { try { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml index 69b776da5f0..97e435e092b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml @@ -44,6 +44,25 @@ ${project.version} test + + org.testcontainers + postgresql + ${testcontainer.version} + test + + + org.apache.seatunnel + connector-jdbc-e2e-common + ${project.version} + test-jar + test + + + + org.postgresql + postgresql + 42.5.1 + org.apache.seatunnel connector-http-lemlist @@ -104,7 +123,6 @@ 5.14.0 test - diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java index c3d4cf936c9..46349605e11 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java @@ -23,7 +23,9 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -34,6 +36,7 @@ import org.mockserver.model.Format; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.lifecycle.Startables; @@ -45,19 +48,30 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import java.io.File; import java.io.IOException; import java.math.BigDecimal; import java.net.URL; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.given; import static org.mockserver.model.HttpRequest.request; +@Slf4j public class HttpIT extends TestSuiteBase implements TestResource { private static final String TMP_DIR = "/tmp"; @@ -70,9 +84,31 @@ public class HttpIT extends TestSuiteBase implements TestResource { private MockServerClient mockServerClient; + private static final String POSTGRESQL_SCHEMA = "public"; + private static final String SINK_TABLE_1 = "sink"; + private static final Integer MAX_COUNT = 15; + private static final String COUNT_QUERY = "select count(*) from sink"; + + private static final String PG_IMAGE = "postgres:14-alpine"; + private static final String PG_DRIVER_JAR = + "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar"; + private PostgreSQLContainer postgreSQLContainer; + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + + PG_DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + @BeforeAll @Override - public void startUp() { + public void startUp() throws ClassNotFoundException { Optional resource = Optional.ofNullable(HttpIT.class.getResource(getMockServerConfig())); this.mockserverContainer = @@ -100,6 +136,22 @@ public void startUp() { Startables.deepStart(Stream.of(mockserverContainer)).join(); mockServerClient = new MockServerClient("127.0.0.1", 1080); fillMockRecords(); + + postgreSQLContainer = + new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE)) + .withNetwork(TestSuiteBase.NETWORK) + .withNetworkAliases("postgresql") + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE))); + Startables.deepStart(Stream.of(postgreSQLContainer)).join(); + log.info("PostgreSQL container started"); + Class.forName(postgreSQLContainer.getDriverClassName()); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted(this::initializeJdbcTable); } private static void fillMockRecords() { @@ -149,6 +201,71 @@ public void tearDown() { if (mockServerClient != null) { mockServerClient.close(); } + if (postgreSQLContainer != null) { + postgreSQLContainer.stop(); + } + } + + @TestTemplate + public void testStreamingSourceToPostgresqlSink(TestContainer container) { + try { + CompletableFuture.supplyAsync( + () -> { + try { + Container.ExecResult execResult1 = + container.executeJob("/http_streaming_json_to_postgresql.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Long count = queryCount(COUNT_QUERY); + Assertions.assertTrue( + count >= MAX_COUNT, + "Actual value should be greater than expected value"); + }); + } finally { + log.info("clear schema:{}", SINK_TABLE_1); + clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_1); + } + } + + private Long queryCount(String sql) { + try (Connection connection = getJdbcConnection()) { + ResultSet resultSet = connection.createStatement().executeQuery(sql); + if (resultSet.next()) { + + return resultSet.getLong(1); + } + return 0L; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + postgreSQLContainer.getJdbcUrl(), + postgreSQLContainer.getUsername(), + postgreSQLContainer.getPassword()); + } + + private void executeSql(String sql) { + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("SET search_path TO inventory;"); + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void clearTable(String database, String tableName) { + executeSql("truncate table " + database + "." + tableName); } @TestTemplate @@ -259,6 +376,24 @@ public void testMultiTableHttp(TestContainer container) Assertions.assertIterableEquals(records, recordResponse); } + private void initializeJdbcTable() { + try (Connection connection = + DriverManager.getConnection( + postgreSQLContainer.getJdbcUrl(), + postgreSQLContainer.getUsername(), + postgreSQLContainer.getPassword())) { + Statement statement = connection.createStatement(); + String sink = + "create table sink(\n" + + "c_String varchar(255) NOT NULL PRIMARY KEY,\n" + + "c_int INT\n" + + ")"; + statement.execute(sink); + } catch (SQLException e) { + throw new RuntimeException("Initializing PostgreSql table failed!", e); + } + } + @Getter @Setter @EqualsAndHashCode diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_streaming_json_to_postgresql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_streaming_json_to_postgresql.conf new file mode 100644 index 00000000000..ec1aaf0469d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_streaming_json_to_postgresql.conf @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Http { + result_table_name = "fake" + url = "http://mockserver:1080/example/http" + method = "GET" + format = "json" + date_format="yyyy-MM-dd" + datetime_format="yyyy-MM-dd'T'HH:mm:ss" + time_format="HH:mm:ss" + poll_interval_millis = 5000 + schema = { + fields { + c_string = string + c_int = int + } + } + } +} + +transform { + Sql { + source_table_name = "fake" + result_table_name = "fake1" + query = "select CONCAT(c_string, CAST(RAND() AS STRING)) as c_string, c_int from fake" + } +} + +sink { + Jdbc { + source_table_name = "fake1" + driver = org.postgresql.Driver + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" + user = test + password = test + generate_sink_sql = true + database = test + table = "public.sink" + primary_keys = ["c_string"] + support_upsert_by_query_primary_key_exist = true + batch_size = 1 + } +}