Skip to content

Commit

Permalink
[Fix][Connector-V2] Fix http source can not read streaming (#7703)
Browse files Browse the repository at this point in the history
Co-authored-by: Jia Fan <[email protected]>
  • Loading branch information
CosmosNi and Hisoka-X authored Sep 29, 2024
1 parent 05cf84f commit a0ffa7b
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ private void updateRequestParam(PageInfo pageInfo) {
.put(pageInfo.getPageField(), pageInfo.getPageIndex().toString());
}

@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
synchronized (output.getCheckpointLock()) {
internalPollNext(output);
}
}

@Override
public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-jdbc-e2e-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<!-- fix CVE-2022-26520 https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520 -->
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-http-lemlist</artifactId>
Expand Down Expand Up @@ -104,7 +123,6 @@
<version>5.14.0</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -34,6 +36,7 @@
import org.mockserver.model.Format;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.lifecycle.Startables;
Expand All @@ -45,19 +48,30 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.given;
import static org.mockserver.model.HttpRequest.request;

@Slf4j
public class HttpIT extends TestSuiteBase implements TestResource {

private static final String TMP_DIR = "/tmp";
Expand All @@ -70,9 +84,31 @@ public class HttpIT extends TestSuiteBase implements TestResource {

private MockServerClient mockServerClient;

private static final String POSTGRESQL_SCHEMA = "public";
private static final String SINK_TABLE_1 = "sink";
private static final Integer MAX_COUNT = 15;
private static final String COUNT_QUERY = "select count(*) from sink";

private static final String PG_IMAGE = "postgres:14-alpine";
private static final String PG_DRIVER_JAR =
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
private PostgreSQLContainer<?> postgreSQLContainer;

@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
container -> {
Container.ExecResult extraCommands =
container.execInContainer(
"bash",
"-c",
"mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ PG_DRIVER_JAR);
Assertions.assertEquals(0, extraCommands.getExitCode());
};

@BeforeAll
@Override
public void startUp() {
public void startUp() throws ClassNotFoundException {
Optional<URL> resource =
Optional.ofNullable(HttpIT.class.getResource(getMockServerConfig()));
this.mockserverContainer =
Expand Down Expand Up @@ -100,6 +136,22 @@ public void startUp() {
Startables.deepStart(Stream.of(mockserverContainer)).join();
mockServerClient = new MockServerClient("127.0.0.1", 1080);
fillMockRecords();

postgreSQLContainer =
new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
.withNetwork(TestSuiteBase.NETWORK)
.withNetworkAliases("postgresql")
.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
Startables.deepStart(Stream.of(postgreSQLContainer)).join();
log.info("PostgreSQL container started");
Class.forName(postgreSQLContainer.getDriverClassName());
given().ignoreExceptions()
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(this::initializeJdbcTable);
}

private static void fillMockRecords() {
Expand Down Expand Up @@ -149,6 +201,71 @@ public void tearDown() {
if (mockServerClient != null) {
mockServerClient.close();
}
if (postgreSQLContainer != null) {
postgreSQLContainer.stop();
}
}

@TestTemplate
public void testStreamingSourceToPostgresqlSink(TestContainer container) {
try {
CompletableFuture.supplyAsync(
() -> {
try {
Container.ExecResult execResult1 =
container.executeJob("/http_streaming_json_to_postgresql.conf");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Long count = queryCount(COUNT_QUERY);
Assertions.assertTrue(
count >= MAX_COUNT,
"Actual value should be greater than expected value");
});
} finally {
log.info("clear schema:{}", SINK_TABLE_1);
clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_1);
}
}

private Long queryCount(String sql) {
try (Connection connection = getJdbcConnection()) {
ResultSet resultSet = connection.createStatement().executeQuery(sql);
if (resultSet.next()) {

return resultSet.getLong(1);
}
return 0L;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

private Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
postgreSQLContainer.getJdbcUrl(),
postgreSQLContainer.getUsername(),
postgreSQLContainer.getPassword());
}

private void executeSql(String sql) {
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("SET search_path TO inventory;");
statement.execute(sql);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

private void clearTable(String database, String tableName) {
executeSql("truncate table " + database + "." + tableName);
}

@TestTemplate
Expand Down Expand Up @@ -259,6 +376,24 @@ public void testMultiTableHttp(TestContainer container)
Assertions.assertIterableEquals(records, recordResponse);
}

private void initializeJdbcTable() {
try (Connection connection =
DriverManager.getConnection(
postgreSQLContainer.getJdbcUrl(),
postgreSQLContainer.getUsername(),
postgreSQLContainer.getPassword())) {
Statement statement = connection.createStatement();
String sink =
"create table sink(\n"
+ "c_String varchar(255) NOT NULL PRIMARY KEY,\n"
+ "c_int INT\n"
+ ")";
statement.execute(sink);
} catch (SQLException e) {
throw new RuntimeException("Initializing PostgreSql table failed!", e);
}
}

@Getter
@Setter
@EqualsAndHashCode
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
Http {
result_table_name = "fake"
url = "http://mockserver:1080/example/http"
method = "GET"
format = "json"
date_format="yyyy-MM-dd"
datetime_format="yyyy-MM-dd'T'HH:mm:ss"
time_format="HH:mm:ss"
poll_interval_millis = 5000
schema = {
fields {
c_string = string
c_int = int
}
}
}
}

transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "select CONCAT(c_string, CAST(RAND() AS STRING)) as c_string, c_int from fake"
}
}

sink {
Jdbc {
source_table_name = "fake1"
driver = org.postgresql.Driver
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
generate_sink_sql = true
database = test
table = "public.sink"
primary_keys = ["c_string"]
support_upsert_by_query_primary_key_exist = true
batch_size = 1
}
}

0 comments on commit a0ffa7b

Please sign in to comment.