Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2]Jdbc chunk split add “snapshot.split.column” params #7794 #7840

Open
wants to merge 26 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fc75772
[Improve][Sls] Add sls sink connector、e2e、doc
Oct 14, 2024
1576162
[Improve][Sls] Add sls sink connector、e2e、doc
Oct 14, 2024
1a636d3
Merge branch 'dev' of https://github.com/apache/seatunnel into dev
Oct 14, 2024
87d749a
[Improve][Sls] Update sls sink e2e code style
Oct 14, 2024
69bf03c
[Improve][Sls] Add sls sink license header
Oct 14, 2024
1165447
[Feature][Connector-V2]Jdbc chunk split add “snapshot.split.column” p…
Oct 15, 2024
0c2676b
Merge branch 'dev' of https://github.com/apache/seatunnel into dev
Oct 15, 2024
7165bad
[Feature][Connector-V2]Jdbc chunk split add “snapshot.split.column” p…
Oct 15, 2024
f5c5ee8
[Feature][Connector-V2]Update mysql-cdc doc with params snapshot.spl…
Oct 15, 2024
8a0b6ba
[Feature][Connector-V2]Update snapshot.split.column type with map and…
Oct 15, 2024
401723e
[Feature][Connector-V2]Remove local test run println
Oct 15, 2024
f9c973a
[Feature][Connector-V2] code style
Oct 15, 2024
83020fd
[Feature][Connector-V2] SourceConfig remove getSplitColumn
Oct 15, 2024
9a87c8b
[Feature][Connector-V2] uniq_key_f add id
Oct 15, 2024
7be5d6d
[Feature][Connector-V2] add split column type check
Oct 15, 2024
afd9cd9
[Feature][Connector-V2] add split column type check
Oct 15, 2024
2cae910
Update seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/m…
XenosK Oct 16, 2024
edc6f70
Update seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/m…
XenosK Oct 16, 2024
275520d
Update seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/m…
XenosK Oct 16, 2024
fe8f7d3
[Feature][Connector-V2] boolean isUniqueKey
Oct 16, 2024
768ca66
[Feature][Connector-V2] StringUtils.isNotEmpty
Oct 16, 2024
1973c18
[Feature][Connector-V2] code style
Oct 16, 2024
08bd831
[Feature][Connector-V2] &&!xx replace &&xx==false
Oct 16, 2024
047665f
[Feature][Connector-V2] use mao replace properties for split coiumn
Oct 21, 2024
733d823
[Feature][Connector-V2] update snapshot.split.column location
Oct 23, 2024
1388c01
[Feature][Connector-V2] update doc
Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/en/connector-v2/source/MySQL-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ When an initial consistent snapshot is made for large databases, your establishe

## Source Options

| Name | Type | Required | Default | Description |
| Name | Type | Required | Default | Description |
|------------------------------------------------|----------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| base-url | String | Yes | - | The URL of the JDBC connection. Refer to a case: `jdbc:mysql://localhost:3306:3306/test`. |
| username | String | Yes | - | Name of the database to use when connecting to the database server. |
Expand All @@ -184,6 +184,7 @@ When an initial consistent snapshot is made for large databases, your establishe
| stop.specific-offset.file | String | No | - | Stop from the specified binlog file name. **Note, This option is required when the `stop.mode` option used `specific`.** |
| stop.specific-offset.pos | Long | No | - | Stop from the specified binlog file position. **Note, This option is required when the `stop.mode` option used `specific`.** |
| snapshot.split.size | Integer | No | 8096 | The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table. |
| snapshot.split.column | Config | No | - | The table name and split column (must be unique key) of table snapshot, captured tables are split into multiple splits when read the snapshot of table. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@hailin0 hailin0 Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

link
#6106

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@hailin0 hailin0 Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, snapshot.split.column requires binding to a specific table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, snapshot.split.column requires binding to a specific table

Now snapshot.split.column config like this, has binded to a specific table, you mean snapshot.split.column into table-name-config?
截屏2024-10-22 10 15 16

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

table-names-config must have primaryKeys , otherwise, an error will be reported, now I only want to config table and snapshot.split.column

| snapshot.fetch.size | Integer | No | 1024 | The maximum fetch size for per poll when read table snapshot. |
| server-id | String | No | - | A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like `5400`, the numeric ID range syntax is like '5400-5408'. <br/> Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the <br/> MySQL cluster as another server (with this unique ID) so it can read the binlog. <br/> By default, a random number is generated between 6500 and 2,148,492,146, though we recommend setting an explicit value. |
| server-time-zone | String | No | UTC | The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
@Getter protected final StopConfig stopConfig;

@Getter protected final int splitSize;
@Getter protected final Properties splitColumn;

@Getter protected final double distributionFactorUpper;
@Getter protected final double distributionFactorLower;
Expand All @@ -50,6 +51,7 @@ public BaseSourceConfig(
StartupConfig startupConfig,
StopConfig stopConfig,
int splitSize,
Properties splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -59,6 +61,7 @@ public BaseSourceConfig(
this.startupConfig = startupConfig;
this.stopConfig = stopConfig;
this.splitSize = splitSize;
this.splitColumn = splitColumn;
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.sampleShardingThreshold = sampleShardingThreshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public JdbcSourceConfig(
List<String> databaseList,
List<String> tableList,
int splitSize,
Properties splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -70,6 +71,7 @@ public JdbcSourceConfig(
startupConfig,
stopConfig,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory<Jd
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD.defaultValue();
protected int inverseSamplingRate = JdbcSourceOptions.INVERSE_SAMPLING_RATE.defaultValue();
protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
protected Properties splitColumn;
protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
protected String serverTimeZone = JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
protected long connectTimeoutMillis = JdbcSourceOptions.CONNECT_TIMEOUT_MS.defaultValue();
Expand All @@ -65,6 +66,11 @@ public JdbcSourceConfigFactory hostname(String hostname) {
return this;
}

public JdbcSourceConfigFactory splitColumn(Properties splitColumn) {
this.splitColumn = splitColumn;
return this;
}

/** Integer port number of the database server. */
public JdbcSourceConfigFactory port(int port) {
this.port = port;
Expand Down Expand Up @@ -239,6 +245,9 @@ public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
this.sampleShardingThreshold = config.get(JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD);
this.inverseSamplingRate = config.get(JdbcSourceOptions.INVERSE_SAMPLING_RATE);
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
this.splitColumn = new Properties();
config.getOptional(SourceOptions.SNAPSHOT_SPLIT_COLUMN)
.ifPresent(map -> splitColumn.putAll(map));
this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
this.connectTimeoutMillis = config.get(JdbcSourceOptions.CONNECT_TIMEOUT_MS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,26 @@ default List<ConstraintKey> getUniqueKeys(JdbcConnection jdbcConnection, TableId
.collect(Collectors.toList());
}

default Boolean isUniqueKey(JdbcConnection jdbcConnection, TableId tableId, String columnName)
XenosK marked this conversation as resolved.
Show resolved Hide resolved
throws SQLException {
boolean isUnique = false;
if (null != columnName) {
XenosK marked this conversation as resolved.
Show resolved Hide resolved
DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
ResultSet resultSet =
metaData.getIndexInfo(
tableId.catalog(), tableId.schema(), tableId.table(), false, false);

while (resultSet.next()) {
if (columnName.equalsIgnoreCase(resultSet.getString("COLUMN_NAME"))
&& resultSet.getBoolean("NON_UNIQUE") == false) {
isUnique = true;
break;
}
}
}
return isUnique;
}

default List<ConstraintKey> getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId)
throws SQLException {
DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public class SourceOptions {
.withDescription(
"The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.");

public static final Option<Map<String, String>> SNAPSHOT_SPLIT_COLUMN =
Options.key("snapshot.split.column")
.mapType()
.noDefaultValue()
.withDescription(
"The split column of table snapshot, captured tables are split into multiple splits when read the snapshot of table.");

public static final Option<Integer> SNAPSHOT_FETCH_SIZE =
Options.key("snapshot.fetch.size")
.intType()
Expand Down Expand Up @@ -111,7 +118,7 @@ public class SourceOptions {
public static OptionRule.Builder getBaseRule() {
return OptionRule.builder()
.optional(FORMAT)
.optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
.optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE, SNAPSHOT_SPLIT_COLUMN)
.optional(INCREMENTAL_PARALLELISM)
.optional(DEBEZIUM_PROPERTIES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;

import static java.math.BigDecimal.ROUND_CEILING;
import static org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare;
Expand Down Expand Up @@ -379,12 +380,38 @@ protected SnapshotSplit createSnapshotSplit(
protected Column getSplitColumn(
JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId tableId)
throws SQLException {
Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
Column splitColumn = null;
Table table = dialect.queryTableSchema(jdbc, tableId).getTable();

// first , compare user defined split column is in the primary key or unique key
Properties splitColumnProperties = new Properties();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use Map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use Map?

org.apache.seatunnel.connectors.cdc.base.config.BaseSourceConfig , I see dbzProperties use Properties, so do the same

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @XenosK

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Map, properties are used because they interact with debezium, I think we should use Map in here just like other connectors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Map, properties are used because they interact with debezium, I think we should use Map in here just like other connectors.

ok

try {
splitColumnProperties = sourceConfig.getSplitColumn();
} catch (Exception e) {
log.error("Config snapshot.split.column get exception in {}:{}", tableId, e);
}
String tableSc =
(String) splitColumnProperties.get(tableId.catalog() + "." + tableId.table());
Boolean isUniqueKey = dialect.isUniqueKey(jdbc, tableId, tableSc);
if (isUniqueKey) {
Column column = table.columnWithName(tableSc);
if (isEvenlySplitColumn(column)) {
return column;
} else {
log.warn(
"Config snapshot.split.column type in {} is not TINYINT、SMALLINT、INT、BIGINT、DECIMAL、STRING",
tableId);
}
} else {
log.warn(
XenosK marked this conversation as resolved.
Show resolved Hide resolved
"Config snapshot.split.column not exists or not unique key for table {}",
tableId);
}

Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
if (primaryKey.isPresent()) {
List<String> pkColumns = primaryKey.get().getColumnNames();

Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
for (String pkColumn : pkColumns) {
Column column = table.columnWithName(pkColumn);
if (isEvenlySplitColumn(column)) {
Expand All @@ -400,7 +427,6 @@ protected Column getSplitColumn(

List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
if (!uniqueKeys.isEmpty()) {
Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
for (ConstraintKey uniqueKey : uniqueKeys) {
List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
uniqueKey.getColumnNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public MySqlSourceConfig(
List<String> databaseList,
List<String> tableList,
int splitSize,
Properties splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -64,6 +65,7 @@ public MySqlSourceConfig(
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public MySqlSourceConfig create(int subtaskId) {
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public OracleSourceConfig(
List<String> databaseList,
List<String> tableList,
int splitSize,
Properties splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -72,6 +73,7 @@ public OracleSourceConfig(
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public OracleSourceConfig create(int subtask) {
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public PostgresSourceConfig(
List<String> databaseList,
List<String> tableList,
int splitSize,
Properties splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -59,6 +60,7 @@ public PostgresSourceConfig(
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public PostgresSourceConfig create(int subtask) {
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public SqlServerSourceConfig(
List<String> databaseList,
List<String> tableList,
int splitSize,
Properties splitColumn,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
Expand All @@ -64,6 +65,7 @@ public SqlServerSourceConfig(
databaseList,
tableList,
splitSize,
splitColumn,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
Expand Down
Loading
Loading