From 8baa012cedb79036d844008270b0cdf3a2fde604 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 30 Oct 2024 18:05:01 +0800 Subject: [PATCH] [Improve][Connector-V2] Improve jdbc merge table from path and query when type is decimal (#7917) --- .../dialect/saphana/SapHanaTypeConverter.java | 13 +++-- .../jdbc/utils/JdbcCatalogUtils.java | 4 +- .../saphana/SapHanaTypeConverterTest.java | 6 +- .../jdbc/utils/JdbcCatalogUtilsTest.java | 56 +++++++++++++++++++ 4 files changed, 69 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaTypeConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaTypeConverter.java index 15faaff4a8e..a6f3791a694 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaTypeConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaTypeConverter.java @@ -252,9 +252,9 @@ public Column convert(BasicTypeDefine typeDefine) { ? typeDefine.getLength().intValue() : MAX_PRECISION - 4; if (scale == null) { - builder.dataType(new DecimalType((int) precision, MAX_SCALE)); + builder.dataType(new DecimalType((int) precision, 0)); builder.columnLength(precision); - builder.scale(MAX_SCALE); + builder.scale(0); } else if (scale < 0) { int newPrecision = (int) (precision - scale); if (newPrecision == 1) { @@ -277,16 +277,17 @@ public Column convert(BasicTypeDefine typeDefine) { } break; case HANA_SMALLDECIMAL: + int smallDecimalScale = typeDefine.getScale() != null ? typeDefine.getScale() : 0; if (typeDefine.getPrecision() == null) { - builder.dataType(new DecimalType(DEFAULT_PRECISION, MAX_SMALL_DECIMAL_SCALE)); + builder.dataType(new DecimalType(DEFAULT_PRECISION, smallDecimalScale)); builder.columnLength((long) DEFAULT_PRECISION); - builder.scale(MAX_SMALL_DECIMAL_SCALE); + builder.scale(smallDecimalScale); } else { builder.dataType( new DecimalType( - typeDefine.getPrecision().intValue(), MAX_SMALL_DECIMAL_SCALE)); + typeDefine.getPrecision().intValue(), smallDecimalScale)); builder.columnLength(typeDefine.getPrecision()); - builder.scale(MAX_SMALL_DECIMAL_SCALE); + builder.scale(smallDecimalScale); } break; case HANA_REAL: diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java index 6eabba1edc1..0ab6b58e209 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java @@ -232,10 +232,12 @@ static CatalogTable mergeCatalogTable(CatalogTable tableOfPath, CatalogTable tab && columnsOfPath .get(column.getName()) .getDataType() + .getSqlType() .equals( columnsOfQuery .get(column.getName()) - .getDataType())) + .getDataType() + .getSqlType())) .map(column -> columnsOfPath.get(column.getName())) .collect(Collectors.toList()); boolean schemaIncludeAllColumns = columnsOfMerge.size() == columnKeysOfQuery.size(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaTypeConverterTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaTypeConverterTest.java index 8ff301d71ee..9f672bbeeeb 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaTypeConverterTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaTypeConverterTest.java @@ -126,7 +126,7 @@ public void testConvertSmallDecimal() { .build(); Column column = SapHanaTypeConverter.INSTANCE.convert(typeDefine); Assertions.assertEquals(typeDefine.getName(), column.getName()); - Assertions.assertEquals(new DecimalType(38, 368), column.getDataType()); + Assertions.assertEquals(new DecimalType(38, 0), column.getDataType()); Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); typeDefine = @@ -139,7 +139,7 @@ public void testConvertSmallDecimal() { .build(); column = SapHanaTypeConverter.INSTANCE.convert(typeDefine); Assertions.assertEquals(typeDefine.getName(), column.getName()); - Assertions.assertEquals(new DecimalType(10, 368), column.getDataType()); + Assertions.assertEquals(new DecimalType(10, 5), column.getDataType()); Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); } @@ -153,7 +153,7 @@ public void testConvertDecimal() { .build(); Column column = SapHanaTypeConverter.INSTANCE.convert(typeDefine); Assertions.assertEquals(typeDefine.getName(), column.getName()); - Assertions.assertEquals(new DecimalType(34, 6176), column.getDataType()); + Assertions.assertEquals(new DecimalType(34, 0), column.getDataType()); Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); BasicTypeDefine typeDefine2 = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java index 4162bce30bb..872dc26f8f0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -314,4 +315,59 @@ public void testColumnNotIncludeMerge() { tableOfQuery.getTableSchema().getColumns(), mergeTable.getTableSchema().getColumns()); } + + @Test + public void testDecimalColumnMerge() { + CatalogTable tableOfQuery = + CatalogTable.of( + TableIdentifier.of("default", null, null, "default"), + TableSchema.builder() + .column( + PhysicalColumn.of( + "f1", + new DecimalType(10, 1), + null, + true, + null, + null, + null, + false, + false, + null, + null, + null)) + .build(), + Collections.emptyMap(), + Collections.emptyList(), + null); + + CatalogTable tableOfPath = + CatalogTable.of( + TableIdentifier.of("default", null, null, "default"), + TableSchema.builder() + .column( + PhysicalColumn.of( + "f1", + new DecimalType(10, 2), + null, + true, + null, + null, + null, + false, + false, + null, + null, + null)) + .build(), + Collections.emptyMap(), + Collections.emptyList(), + null); + + CatalogTable mergeTable = JdbcCatalogUtils.mergeCatalogTable(tableOfPath, tableOfQuery); + // When column type is decimal, the precision and scale should not affect the merge result + Assertions.assertEquals( + tableOfPath.getTableSchema().getColumns().get(0), + mergeTable.getTableSchema().getColumns().get(0)); + } }