Skip to content

Commit

Permalink
[Feature][Connector-V2] Jdbc DB2 support upsert SQL (#7879)
Browse files Browse the repository at this point in the history
  • Loading branch information
shashwatsai authored Oct 21, 2024
1 parent c0f27c2 commit 1399193
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;

public class DB2Dialect implements JdbcDialect {

Expand All @@ -44,6 +46,56 @@ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
// Generate field list for USING and INSERT clauses
String fieldList = String.join(", ", fieldNames);

// Generate placeholder list for VALUES clause
String placeholderList =
Arrays.stream(fieldNames).map(field -> "?").collect(Collectors.joining(", "));

// Generate ON clause
String onClause =
Arrays.stream(uniqueKeyFields)
.map(field -> "target." + field + " = source." + field)
.collect(Collectors.joining(" AND "));

// Generate WHEN MATCHED clause
String whenMatchedClause =
Arrays.stream(fieldNames)
.map(field -> "target." + field + " <> source." + field)
.collect(Collectors.joining(" OR "));

// Generate UPDATE SET clause
String updateSetClause =
Arrays.stream(fieldNames)
.map(field -> "target." + field + " = source." + field)
.collect(Collectors.joining(", "));

// Generate WHEN NOT MATCHED clause
String insertClause =
"INSERT ("
+ fieldList
+ ") VALUES ("
+ Arrays.stream(fieldNames)
.map(field -> "source." + field)
.collect(Collectors.joining(", "))
+ ")";

// Combine all parts to form the final SQL statement
String mergeStatement =
String.format(
"MERGE INTO %s.%s AS target USING (VALUES (%s)) AS source (%s) ON %s "
+ "WHEN MATCHED AND (%s) THEN UPDATE SET %s "
+ "WHEN NOT MATCHED THEN %s;",
database,
tableName,
placeholderList,
fieldList,
onClause,
whenMatchedClause,
updateSetClause,
insertClause);

return Optional.of(mergeStatement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,17 @@ protected void createNeededTables() {
jdbcCase.getSourceTable()));
statement.execute(createSource);

if (jdbcCase.getAdditionalSqlOnSource() != null) {
String additionalSql =
String.format(
jdbcCase.getAdditionalSqlOnSource(),
buildTableInfoWithSchema(
jdbcCase.getDatabase(),
jdbcCase.getSchema(),
jdbcCase.getSourceTable()));
statement.execute(additionalSql);
}

if (!jdbcCase.isUseSaveModeCreateTable()) {
if (jdbcCase.getSinkCreateSql() != null) {
createTemplate = jdbcCase.getSinkCreateSql();
Expand All @@ -223,6 +234,17 @@ protected void createNeededTables() {
statement.execute(createSink);
}

if (jdbcCase.getAdditionalSqlOnSink() != null) {
String additionalSql =
String.format(
jdbcCase.getAdditionalSqlOnSink(),
buildTableInfoWithSchema(
jdbcCase.getDatabase(),
jdbcCase.getSchema(),
jdbcCase.getSinkTable()));
statement.execute(additionalSql);
}

connection.commit();
} catch (Exception exception) {
log.error(ExceptionUtils.getMessage(exception));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class JdbcCase {
private String jdbcUrl;
private String createSql;
private String sinkCreateSql;
private String additionalSqlOnSource;
private String additionalSqlOnSink;
private String insertSql;
private List<String> configFile;
private Pair<String[], List<SeaTunnelRow>> testData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class JdbcDb2IT extends AbstractJdbcIT {

private static final String DB2_CONTAINER_HOST = "db2-e2e";

private static final String DB2_DATABASE = "E2E";
private static final String DB2_SOURCE = "SOURCE";
private static final String DB2_SINK = "SINK";
protected static final String DB2_DATABASE = "E2E";
protected static final String DB2_SOURCE = "SOURCE";
protected static final String DB2_SINK = "SINK";

private static final String DB2_URL = "jdbc:db2://" + HOST + ":%s/%s";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc;

import org.apache.seatunnel.e2e.common.container.TestContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;

import com.google.common.collect.Lists;

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

public class JdbcDb2UpsertIT extends JdbcDb2IT {

private static final String CREATE_SQL_SINK =
"create table %s\n"
+ "(\n"
+ " C_BOOLEAN BOOLEAN,\n"
+ " C_SMALLINT SMALLINT,\n"
+ " C_INT INTEGER NOT NULL PRIMARY KEY,\n"
+ " C_INTEGER INTEGER,\n"
+ " C_BIGINT BIGINT,\n"
+ " C_DECIMAL DECIMAL(5),\n"
+ " C_DEC DECIMAL(5),\n"
+ " C_NUMERIC DECIMAL(5),\n"
+ " C_NUM DECIMAL(5),\n"
+ " C_REAL REAL,\n"
+ " C_FLOAT DOUBLE,\n"
+ " C_DOUBLE DOUBLE,\n"
+ " C_DOUBLE_PRECISION DOUBLE,\n"
+ " C_CHAR CHARACTER(1),\n"
+ " C_VARCHAR VARCHAR(255),\n"
+ " C_BINARY BINARY(1),\n"
+ " C_VARBINARY VARBINARY(2048),\n"
+ " C_DATE DATE,\n"
+ " C_UPDATED_AT TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n"
+ ");\n";

// create a trigger to update the timestamp when the row is updated.
// if no changes are made to the row, the timestamp should not be updated.
private static final String CREATE_TRIGGER_SQL =
"CREATE TRIGGER c_updated_at_trigger\n"
+ " BEFORE UPDATE ON %s\n"
+ " REFERENCING NEW AS new_row\n"
+ " FOR EACH ROW\n"
+ "BEGIN ATOMIC\n"
+ "SET new_row.c_updated_at = CURRENT_TIMESTAMP;\n"
+ "END;";

private static final List<String> CONFIG_FILE =
Lists.newArrayList("/jdbc_db2_source_and_sink_upsert.conf");

@Override
JdbcCase getJdbcCase() {
jdbcCase = super.getJdbcCase();
jdbcCase.setSinkCreateSql(CREATE_SQL_SINK);
jdbcCase.setConfigFile(CONFIG_FILE);
jdbcCase.setAdditionalSqlOnSink(CREATE_TRIGGER_SQL);
return jdbcCase;
}

@TestTemplate
public void testDb2UpsertE2e(TestContainer container)
throws IOException, InterruptedException, SQLException {
try {
// step 1: run the job to migrate data from source to sink.
Container.ExecResult execResult =
container.executeJob("/jdbc_db2_source_and_sink_upsert.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
List<List<Object>> updatedAtTimestampsBeforeUpdate =
query(
String.format(
"SELECT C_UPDATED_AT FROM %s",
buildTableInfoWithSchema(DB2_DATABASE, DB2_SINK)));
// step 2: run the job to update the data in the sink.
// expected: timestamps should not be updated as the data is not changed.
execResult = container.executeJob("/jdbc_db2_source_and_sink_upsert.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
List<List<Object>> updatedAtTimestampsAfterUpdate =
query(
String.format(
"SELECT C_UPDATED_AT FROM %s",
buildTableInfoWithSchema(DB2_DATABASE, DB2_SINK)));
Assertions.assertIterableEquals(
updatedAtTimestampsBeforeUpdate, updatedAtTimestampsAfterUpdate);
} finally {
clearTable(DB2_DATABASE, DB2_SINK);
}
}

private List<List<Object>> query(String sql) {
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
List<List<Object>> result = new ArrayList<>();
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
ArrayList<Object> objects = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
objects.add(resultSet.getString(i));
}
result.add(objects);
log.debug(String.format("Print query, sql: %s, data: %s", sql, objects));
}
connection.commit();
return result;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Jdbc {
driver = com.ibm.db2.jcc.DB2Driver
url = "jdbc:db2://db2-e2e:50000/E2E"
user = "db2inst1"
password = "123456"
query = """
select * from "E2E".SOURCE;
"""
}

# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
}

sink {
Jdbc {
driver = com.ibm.db2.jcc.DB2Driver
url = "jdbc:db2://db2-e2e:50000/E2E"
user = "db2inst1"
password = "123456"
database = "E2E"
table = "SINK"
enable_upsert = true
# The primary keys of the table, which will be used to generate the upsert sql
generate_sink_sql = true
primary_keys = [
C_INT
]
}

# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}

0 comments on commit 1399193

Please sign in to comment.