Skip to content

Commit

Permalink
[Bug][Connector-v2] MongoDB CDC Set SeatunnelRow's tableId
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Oct 31, 2024
1 parent d9ddc85 commit 8d332e4
Showing 1 changed file with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
Expand All @@ -35,12 +36,15 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;

import com.google.auto.service.AutoService;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;

import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;

@AutoService(Factory.class)
public class MongodbIncrementalSourceFactory implements TableSourceFactory {
@Override
Expand All @@ -54,7 +58,8 @@ public OptionRule optionRule() {
.required(
MongodbSourceOptions.HOSTS,
MongodbSourceOptions.DATABASE,
MongodbSourceOptions.COLLECTION)
MongodbSourceOptions.COLLECTION,
TableSchemaOptions.SCHEMA)
.optional(
MongodbSourceOptions.USERNAME,
MongodbSourceOptions.PASSWORD,
Expand Down Expand Up @@ -87,6 +92,11 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
List<String> collections = context.getOptions().get(MongodbSourceOptions.COLLECTION);
if (collections.size() != configCatalog.size()) {
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT,
"The number of collections must be equal to the number of schema tables");
}
List<CatalogTable> catalogTables =
IntStream.range(0, configCatalog.size())
.mapToObj(
Expand Down

0 comments on commit 8d332e4

Please sign in to comment.