Skip to content

Commit

Permalink
[Feature][Connector-v2] Support schema evolution for Oracle connector
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Oct 30, 2024
1 parent 88be4fd commit ab3a30e
Show file tree
Hide file tree
Showing 40 changed files with 3,051 additions and 267 deletions.
99 changes: 96 additions & 3 deletions docs/en/concept/schema-evolution.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
# Schema evolution
Schema Evolution means that the schema of a data table can be changed and the data synchronization task can automatically adapt to the changes of the new table structure without any other operations.
Now we only support the operation about `add column``drop column``rename column` and `modify column` of the table in CDC source. This feature is only support zeta engine at now.
Now we only support the operation about `add column``drop column``rename column` and `modify column` of the table in CDC source. This feature is only support zeta engine at now.


## Supported connectors

### Source
[Mysql-CDC](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/MySQL-CDC.md)
[Oracle-CDC](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Oracle-CDC.md)

### Sink
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)

Note: The schema evolution is not support the transform at now. The schema evolution of different types of databases(Oracle-CDC -> Jdbc-Mysql)is currently not supported the default value of the column in ddl.

Note: The schema evolution is not support the transform at now.
When you use the Oracle-CDC,you can not use the username named `SYS` or `SYSTEM` to modify the table schema, otherwise the ddl event will be filtered out which can lead to the schema evolution not working.
Otherwise, If your table name start with `ORA_TEMP_` will also has the same problem.

## Enable schema evolution
Schema evolution is disabled by default in CDC source. You need configure `debezium.include.schema.changes = true` which is only supported in MySQL-CDC to enable it.
Schema evolution is disabled by default in CDC source. You need configure `debezium.include.schema.changes = true` which is only supported in CDC to enable it.

## Examples

Expand Down Expand Up @@ -56,3 +62,90 @@ sink {
}
}
```

### Oracle-cdc -> Jdbc-Oracle
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
result_table_name = "customers"
username = "dbzuser"
password = "dbz"
database-names = ["ORCLCDB"]
schema-names = ["DEBEZIUM"]
table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
}
}
}
sink {
Jdbc {
source_table_name = "customers"
driver = "oracle.jdbc.driver.OracleDriver"
url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
user = "dbzuser"
password = "dbz"
generate_sink_sql = true
database = "ORCLCDB"
table = "DEBEZIUM.FULL_TYPES_SINK"
batch_size = 1
primary_keys = ["ID"]
connection.pool.size = 1
}
}
```

### Oracle-cdc -> Jdbc-Mysql
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
result_table_name = "customers"
username = "dbzuser"
password = "dbz"
database-names = ["ORCLCDB"]
schema-names = ["DEBEZIUM"]
table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
}
}
}
sink {
jdbc {
source_table_name = "customers"
url = "jdbc:mysql://oracle-host:3306/oracle_sink"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user_sink"
password = "mysqlpw"
generate_sink_sql = true
# You need to configure both database and table
database = oracle_sink
table = oracle_cdc_2_mysql_sink_table
primary_keys = ["ID"]
}
}
```
93 changes: 92 additions & 1 deletion docs/zh/concept/schema-evolution.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@

###
[Mysql-CDC](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/MySQL-CDC.md)
[Oracle-CDC](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Oracle-CDC.md)

### 目标
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)

注意: 目前模式演进不支持transform.
注意: 目前模式演进不支持transform。不同类型数据库(Oracle-CDC -> Jdbc-Mysql)的模式演进目前不支持ddl中列的默认值。

当你使用Oracle-CDC时,你不能使用用户名`SYS``SYSTEM`来修改表结构,否则ddl事件将被过滤,这可能导致模式演进不起作用;
另外,如果你的表名以`ORA_TEMP_`开头,也会有相同的问题。

## 启用Schema evolution功能
在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`debezium.include.schema.changes = true`来启用它。
Expand Down Expand Up @@ -57,3 +61,90 @@ sink {
}
}
```

### Oracle-cdc -> Jdbc-Oracle
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
result_table_name = "customers"
username = "dbzuser"
password = "dbz"
database-names = ["ORCLCDB"]
schema-names = ["DEBEZIUM"]
table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
}
}
}
sink {
Jdbc {
source_table_name = "customers"
driver = "oracle.jdbc.driver.OracleDriver"
url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
user = "dbzuser"
password = "dbz"
generate_sink_sql = true
database = "ORCLCDB"
table = "DEBEZIUM.FULL_TYPES_SINK"
batch_size = 1
primary_keys = ["ID"]
connection.pool.size = 1
}
}
```

### Oracle-cdc -> Jdbc-Mysql
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
result_table_name = "customers"
username = "dbzuser"
password = "dbz"
database-names = ["ORCLCDB"]
schema-names = ["DEBEZIUM"]
table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
}
}
}
sink {
jdbc {
source_table_name = "customers"
url = "jdbc:mysql://oracle-host:3306/oracle_sink"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user_sink"
password = "mysqlpw"
generate_sink_sql = true
# You need to configure both database and table
database = oracle_sink
table = oracle_cdc_2_mysql_sink_table
primary_keys = ["ID"]
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,18 @@ private SeaTunnelRowType applyChangeColumn(
String oldColumn = changeColumnEvent.getOldColumn();
int oldColumnIndex = dataType.indexOf(oldColumn);

// Rename column of oracle which only has the name of old column and the name of new column,
// so we need to fill the data type which is the same as the old column.
SeaTunnelDataType<?> fieldType = dataType.getFieldType(oldColumnIndex);
Column column = changeColumnEvent.getColumn();
if (column.getDataType() == null) {
column = column.copy(fieldType);
}

return applyModifyColumn(
dataType,
oldColumnIndex,
changeColumnEvent.getColumn(),
column,
changeColumnEvent.isFirst(),
changeColumnEvent.getAfterColumn());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

package org.apache.seatunnel.connectors.cdc.base.schema;

import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;

Expand All @@ -25,17 +31,24 @@
import org.apache.kafka.connect.source.SourceRecord;

import com.google.common.collect.Lists;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.HistoryRecord;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Objects;

@Slf4j
public abstract class AbstractSchemaChangeResolver implements SchemaChangeResolver {

protected static final List<String> SUPPORT_DDL = Lists.newArrayList("ALTER TABLE");

protected JdbcSourceConfig jdbcSourceConfig;
protected final JdbcSourceConfig jdbcSourceConfig;
@Setter protected transient DdlParser ddlParser;
@Setter protected transient Tables tables;
@Setter protected String sourceDialectName;

public AbstractSchemaChangeResolver(JdbcSourceConfig jdbcSourceConfig) {
this.jdbcSourceConfig = jdbcSourceConfig;
Expand All @@ -55,4 +68,39 @@ public boolean support(SourceRecord record) {
.map(String::toUpperCase)
.anyMatch(prefix -> ddl.toUpperCase().contains(prefix));
}

@Override
public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType) {
TablePath tablePath = SourceRecordUtils.getTablePath(record);
String ddl = SourceRecordUtils.getDdl(record);
if (Objects.isNull(ddlParser)) {
this.ddlParser = createDdlParser(tablePath);
}
if (Objects.isNull(tables)) {
this.tables = new Tables();
}
ddlParser.setCurrentDatabase(tablePath.getDatabaseName());
ddlParser.setCurrentSchema(tablePath.getSchemaName());
// Parse DDL statement using Debezium's Antlr parser
ddlParser.parse(ddl, tables);
List<AlterTableColumnEvent> parsedEvents = getAndClearParsedEvents();
parsedEvents.forEach(e -> e.setSourceDialectName(getSourceDialectName()));
AlterTableColumnsEvent alterTableColumnsEvent =
new AlterTableColumnsEvent(
TableIdentifier.of(
StringUtils.EMPTY,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName()),
parsedEvents);
alterTableColumnsEvent.setStatement(ddl);
alterTableColumnsEvent.setSourceDialectName(getSourceDialectName());
return parsedEvents.isEmpty() ? null : alterTableColumnsEvent;
}

protected abstract DdlParser createDdlParser(TablePath tablePath);

protected abstract List<AlterTableColumnEvent> getAndClearParsedEvents();

protected abstract String getSourceDialectName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.source.parser;

import org.apache.seatunnel.api.table.catalog.TableIdentifier;

import org.apache.commons.lang3.StringUtils;

import io.debezium.relational.Column;
import io.debezium.relational.TableId;

public interface SeatunnelDDLParser {

/**
* @param column The column to convert
* @return The converted column in SeaTunnel format which has full type information
*/
default org.apache.seatunnel.api.table.catalog.Column toSeatunnelColumnWithFullTypeInfo(
Column column) {
org.apache.seatunnel.api.table.catalog.Column seatunnelColumn = toSeatunnelColumn(column);
String sourceColumnType = getSourceColumnTypeWithLengthScale(column);
return seatunnelColumn.reSourceType(sourceColumnType);
}

/**
* @param column The column to convert
* @return The converted column in SeaTunnel format
*/
org.apache.seatunnel.api.table.catalog.Column toSeatunnelColumn(Column column);

/**
* @param column The column to convert
* @return The type with length and scale
*/
default String getSourceColumnTypeWithLengthScale(Column column) {
StringBuilder sb = new StringBuilder(column.typeName());
if (column.length() >= 0) {
sb.append('(').append(column.length());
if (column.scale().isPresent()) {
sb.append(", ").append(column.scale().get());
}

sb.append(')');
}
return sb.toString();
}

default TableIdentifier toTableIdentifier(TableId tableId) {
return new TableIdentifier(
StringUtils.EMPTY, tableId.catalog(), tableId.schema(), tableId.table());
}
}
Loading

0 comments on commit ab3a30e

Please sign in to comment.