diff --git a/.github/workflows/labeler/label-scope-conf.yml b/.github/workflows/labeler/label-scope-conf.yml index b417d53e72a..e201b2499da 100644 --- a/.github/workflows/labeler/label-scope-conf.yml +++ b/.github/workflows/labeler/label-scope-conf.yml @@ -264,6 +264,12 @@ qdrant: - any-glob-to-any-file: seatunnel-connectors-v2/connector-qdrant/** - all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(qdrant)/**' +tencentvectordb: + - all: + - changed-files: + - any-glob-to-any-file: seatunnel-connectors-v2/connector-tencent-vectordb/** + - all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(tencent-vectordb)/**' + typesense: - all: - changed-files: diff --git a/config/plugin_config b/config/plugin_config index 317b41480e1..a886e5c6e85 100644 --- a/config/plugin_config +++ b/config/plugin_config @@ -92,3 +92,4 @@ connector-sls connector-qdrant connector-typesense connector-cdc-opengauss +connector-tencent-vectordb diff --git a/docs/en/connector-v2/source/TencentVectorDB.md b/docs/en/connector-v2/source/TencentVectorDB.md new file mode 100644 index 00000000000..131b10fecc1 --- /dev/null +++ b/docs/en/connector-v2/source/TencentVectorDB.md @@ -0,0 +1,46 @@ +# Tencent VectorDB + +> Tencent VectorDB source connector + +## Description + +This Tencent VectorDB source connector reads data from Tencent VectorDB +## Key Features + +- [x] [batch](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [column projection](../../concept/connector-v2-features.md) + +## Data Type Mapping + +| Pinecone Data Type | SeaTunnel Data Type | +|---------------------|---------------------| +| FLOAT_VECTOR | FLOAT_VECTOR | +| SPARSE_FLOAT_VECTOR | SPARSE_FLOAT_VECTOR | + +## Source Options + +| Name | Type | Required | Default | Description | +|------------|---------|----------|---------|-----------------------------| +| url | String | Yes | - | endpoint | +| user_name | String | Yes | - | user name | +| api_key | String | Yes | - | api key for authentication. | +| database | String | Yes | - | database | +| collection | String | Yes | - | collection name | +| batch_size | Integer | No | 100 | Batch size to read | + +## Task Example + +```bash +source { + TencentVectorDB { + url = "*****" + user_name = "root" + api_key = "*****" + database = "default" + collection = "movie_reviews" + } +} +``` + +## Changelog diff --git a/plugin-mapping.properties b/plugin-mapping.properties index e314ef86613..fa1a5ba5dae 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -139,6 +139,7 @@ seatunnel.sink.Sls = connector-sls seatunnel.source.Typesense = connector-typesense seatunnel.sink.Typesense = connector-typesense seatunnel.source.Opengauss-CDC = connector-cdc-opengauss +seatunnel.source.TencentVectorDB = connector-tencent-vectordb seatunnel.transform.Sql = seatunnel-transforms-v2 seatunnel.transform.FieldMapper = seatunnel-transforms-v2 diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/pom.xml b/seatunnel-connectors-v2/connector-tencent-vectordb/pom.xml new file mode 100644 index 00000000000..950bcc49e85 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/pom.xml @@ -0,0 +1,60 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + ${revision} + + + connector-tencent-vectordb + SeaTunnel : Connectors V2 : TencentVectorDB + + + + com.google.code.gson + gson + 2.10.1 + + + + + + com.tencent.tcvectordb + vectordatabase-sdk-java + 2.0.4 + + + com.google.code.gson + gson + + + + + + com.google.code.gson + gson + 2.10.1 + + + + diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/config/TencentVectorDBSourceConfig.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/config/TencentVectorDBSourceConfig.java new file mode 100644 index 00000000000..da9c55b79ec --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/config/TencentVectorDBSourceConfig.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.tencent.vectordb.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class TencentVectorDBSourceConfig { + public static final String CONNECTOR_IDENTITY = "TencentVectorDB"; + + public static final Option URL = + Options.key("url").stringType().noDefaultValue().withDescription("url"); + + public static final Option USER_NAME = + Options.key("user_name") + .stringType() + .noDefaultValue() + .withDescription("user name for authentication"); + + public static final Option API_KEY = + Options.key("api_key") + .stringType() + .noDefaultValue() + .withDescription("token for authentication"); + + public static final Option DATABASE = + Options.key("database") + .stringType() + .noDefaultValue() + .withDescription("Tencent Vector DB database name"); + + public static final Option COLLECTION = + Options.key("collection") + .stringType() + .noDefaultValue() + .withDescription("Tencent Vector DB collection name"); + public static final Option BATCH_SIZE = + Options.key("batch_size") + .intType() + .defaultValue(100) + .withDescription("Tencent Vector DB reader batch size"); +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorErrorCode.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorErrorCode.java new file mode 100644 index 00000000000..0bad6ba9ebc --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorErrorCode.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.tencent.vectordb.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +import lombok.Getter; + +@Getter +public enum TencentVectorDBConnectorErrorCode implements SeaTunnelErrorCode { + SOURCE_TABLE_SCHEMA_IS_NULL("TC-VECTORDB-01", "Source table schema is null"), + READ_DATA_FAIL("TC-VECTORDB-02", "Read data fail"); + + private final String code; + private final String description; + + TencentVectorDBConnectorErrorCode(String code, String description) { + this.code = code; + this.description = description; + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorException.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorException.java new file mode 100644 index 00000000000..d1cda8fcfdf --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/exception/TencentVectorDBConnectorException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.tencent.vectordb.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class TencentVectorDBConnectorException extends SeaTunnelRuntimeException { + public TencentVectorDBConnectorException(SeaTunnelErrorCode seaTunnelErrorCode) { + super(seaTunnelErrorCode, seaTunnelErrorCode.getErrorMessage()); + } + + public TencentVectorDBConnectorException( + SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, seaTunnelErrorCode.getErrorMessage(), cause); + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSource.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSource.java new file mode 100644 index 00000000000..ab61f815c41 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSource.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.SupportColumnProjection; +import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig; +import org.apache.seatunnel.connectors.tencent.vectordb.utils.ConnectorUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TencentVectorDBSource + implements SeaTunnelSource< + SeaTunnelRow, TencentVectorDBSourceSplit, TencentVectorDBSourceState>, + SupportParallelism, + SupportColumnProjection { + private final ReadonlyConfig config; + private final Map sourceTables; + + public TencentVectorDBSource(ReadonlyConfig config) { + this.config = config; + ConnectorUtils connectorUtils = new ConnectorUtils(config); + this.sourceTables = connectorUtils.getSourceTables(); + } + + /** + * Get the boundedness of this source. + * + * @return the boundedness of this source. + */ + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + /** + * Create source reader, used to produce data. + * + * @param readerContext reader context. + * @return source reader. + * @throws Exception when create reader failed. + */ + @Override + public SourceReader createReader( + SourceReader.Context readerContext) throws Exception { + return new TencentVectorDBSourceReader(readerContext, config, sourceTables); + } + + @Override + public List getProducedCatalogTables() { + return new ArrayList<>(sourceTables.values()); + } + + /** + * Create source split enumerator, used to generate splits. This method will be called only once + * when start a source. + * + * @param enumeratorContext enumerator context. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + @Override + public SourceSplitEnumerator + createEnumerator( + SourceSplitEnumerator.Context enumeratorContext) + throws Exception { + return new TencentVectorDBSourceSplitEnumertor( + enumeratorContext, config, sourceTables, null); + } + + /** + * Create source split enumerator, used to generate splits. This method will be called when + * restore from checkpoint. + * + * @param enumeratorContext enumerator context. + * @param checkpointState checkpoint state. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + @Override + public SourceSplitEnumerator + restoreEnumerator( + SourceSplitEnumerator.Context enumeratorContext, + TencentVectorDBSourceState checkpointState) + throws Exception { + return new TencentVectorDBSourceSplitEnumertor( + enumeratorContext, config, sourceTables, checkpointState); + } + + /** + * Returns a unique identifier among same factory interfaces. + * + *

For consistency, an identifier should be declared as one lower case word (e.g. {@code + * kafka}). If multiple factories exist for different versions, a version should be appended + * using "-" (e.g. {@code elasticsearch-7}). + */ + @Override + public String getPluginName() { + return TencentVectorDBSourceConfig.CONNECTOR_IDENTITY; + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceFactory.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceFactory.java new file mode 100644 index 00000000000..9016233a1df --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +import java.io.Serializable; + +@Slf4j +@AutoService(Factory.class) +public class TencentVectorDBSourceFactory implements TableSourceFactory { + + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new TencentVectorDBSource(context.getOptions()); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(TencentVectorDBSourceConfig.API_KEY) + .optional() + .build(); + } + + @Override + public Class getSourceClass() { + return TencentVectorDBSource.class; + } + + @Override + public String factoryIdentifier() { + return TencentVectorDBSourceConfig.CONNECTOR_IDENTITY; + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceReader.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceReader.java new file mode 100644 index 00000000000..64175b027ef --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceReader.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.tencent.vectordb.exception.TencentVectorDBConnectorErrorCode; +import org.apache.seatunnel.connectors.tencent.vectordb.exception.TencentVectorDBConnectorException; +import org.apache.seatunnel.connectors.tencent.vectordb.utils.ConverterUtils; + +import com.tencent.tcvectordb.client.RPCVectorDBClient; +import com.tencent.tcvectordb.client.VectorDBClient; +import com.tencent.tcvectordb.model.Collection; +import com.tencent.tcvectordb.model.Database; +import com.tencent.tcvectordb.model.Document; +import com.tencent.tcvectordb.model.param.database.ConnectParam; +import com.tencent.tcvectordb.model.param.dml.QueryParam; +import com.tencent.tcvectordb.model.param.enums.ReadConsistencyEnum; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig.API_KEY; +import static org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig.BATCH_SIZE; +import static org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig.URL; +import static org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig.USER_NAME; + +@Slf4j +public class TencentVectorDBSourceReader + implements SourceReader { + private final Deque pendingSplits = new ConcurrentLinkedDeque<>(); + private final ReadonlyConfig config; + private final Context context; + private final AtomicLong offSet = new AtomicLong(0); + private Map sourceTables; + private VectorDBClient client; + private volatile boolean noMoreSplit; + + public TencentVectorDBSourceReader( + Context readerContext, + ReadonlyConfig config, + Map sourceTables) { + this.context = readerContext; + this.config = config; + this.sourceTables = sourceTables; + } + + /** Open the source reader. */ + @Override + public void open() throws Exception { + ConnectParam connectParam = + ConnectParam.newBuilder() + .withUrl(config.get(URL)) + .withUsername(config.get(USER_NAME)) + .withKey(config.get(API_KEY)) + .withTimeout(30) + .build(); + client = new RPCVectorDBClient(connectParam, ReadConsistencyEnum.EVENTUAL_CONSISTENCY); + } + + /** + * Called to close the reader, in case it holds on to any resources, like threads or network + * connections. + */ + @Override + public void close() throws IOException {} + + /** + * Generate the next batch of records. + * + * @param output output collector. + * @throws Exception if error occurs. + */ + @Override + public void pollNext(Collector output) throws Exception { + synchronized (output.getCheckpointLock()) { + TencentVectorDBSourceSplit split = pendingSplits.poll(); + if (null != split) { + try { + log.info("Begin to read data from split: " + split); + TablePath tablePath = split.getTablePath(); + TableSchema tableSchema = sourceTables.get(tablePath).getTableSchema(); + log.info("begin to read data from tencent vdb, table schema: " + tableSchema); + if (null == tableSchema) { + throw new TencentVectorDBConnectorException( + TencentVectorDBConnectorErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL); + } + Database database = client.database(tablePath.getDatabaseName()); + Collection collection = database.collection(tablePath.getTableName()); + while (true) { + QueryParam queryParam = + QueryParam.newBuilder() + .withRetrieveVector(true) + .withLimit((long) config.get(BATCH_SIZE)) + .withOffset(offSet.get()) + .build(); + List documents = collection.query(queryParam); + if (documents.isEmpty()) { + break; + } + offSet.addAndGet(documents.size()); + for (Document document : documents) { + SeaTunnelRow row = + ConverterUtils.convertToSeatunnelRow(tableSchema, document); + row.setTableId(tablePath.getFullName()); + output.collect(row); + } + } + } catch (Exception e) { + log.error("Read data from split: " + split + " failed", e); + throw new TencentVectorDBConnectorException( + TencentVectorDBConnectorErrorCode.READ_DATA_FAIL, e); + } + } else { + if (!noMoreSplit) { + log.info("Tencent VDB source wait split!"); + } + } + } + if (noMoreSplit + && pendingSplits.isEmpty() + && Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded Tencent VDB source"); + context.signalNoMoreElement(); + } + Thread.sleep(1000L); + } + + /** + * Get the current split checkpoint state by checkpointId. + * + *

If the source is bounded, checkpoint is not triggered. + * + * @param checkpointId checkpoint Id. + * @return split checkpoint state. + * @throws Exception if error occurs. + */ + @Override + public List snapshotState(long checkpointId) throws Exception { + return new ArrayList<>(pendingSplits); + } + + /** + * Add the split checkpoint state to reader. + * + * @param splits split checkpoint state. + */ + @Override + public void addSplits(List splits) { + log.info("Adding Tencent VDB splits to reader: " + splits); + pendingSplits.addAll(splits); + } + + /** + * This method is called when the reader is notified that it will not receive any further + * splits. + * + *

It is triggered when the enumerator calls {@link + * SourceSplitEnumerator.Context#signalNoMoreSplits(int)} with the reader's parallel subtask. + */ + @Override + public void handleNoMoreSplits() { + log.info("receive no more splits message, this Tencent VDB reader will not add new split."); + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplit.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplit.java new file mode 100644 index 00000000000..f8c59380995 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplit.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.Data; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +public class TencentVectorDBSourceSplit implements SourceSplit { + private TablePath tablePath; + private String splitId; + private String partitionName; + + /** + * Get the split id of this source split. + * + * @return id of this source split. + */ + @Override + public String splitId() { + return splitId; + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplitEnumertor.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplitEnumertor.java new file mode 100644 index 00000000000..4feb498374c --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceSplitEnumertor.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Slf4j +public class TencentVectorDBSourceSplitEnumertor + implements SourceSplitEnumerator { + private final Map tables; + private final Context context; + private final ConcurrentLinkedQueue pendingTables; + private final Map> pendingSplits; + private final Object stateLock = new Object(); + + private ReadonlyConfig config; + + public TencentVectorDBSourceSplitEnumertor( + Context context, + ReadonlyConfig config, + Map sourceTables, + TencentVectorDBSourceState sourceState) { + this.context = context; + this.tables = sourceTables; + this.config = config; + if (sourceState == null) { + this.pendingTables = new ConcurrentLinkedQueue<>(tables.keySet()); + this.pendingSplits = new HashMap<>(); + } else { + this.pendingTables = new ConcurrentLinkedQueue<>(sourceState.getPendingTables()); + this.pendingSplits = new HashMap<>(sourceState.getPendingSplits()); + } + } + + private static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + @Override + public void open() {} + + /** The method is executed by the engine only once. */ + @Override + public void run() throws Exception { + log.info("Starting pinecone split enumerator."); + Set readers = context.registeredReaders(); + while (!pendingTables.isEmpty()) { + synchronized (stateLock) { + TablePath tablePath = pendingTables.poll(); + log.info("begin to split table path: {}", tablePath); + Collection splits = + generateSplits(tables.get(tablePath)); + log.info("end to split table {} into {} splits.", tablePath, splits.size()); + + addPendingSplit(splits); + } + + synchronized (stateLock) { + assignSplit(readers); + } + } + + log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + private void assignSplit(Collection readers) { + log.info("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List assignmentForReader = pendingSplits.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.debug("Assign splits {} to reader {}", assignmentForReader, reader); + context.assignSplit(reader, assignmentForReader); + } + } + } + + private void addPendingSplit(Collection splits) { + int readerCount = context.currentParallelism(); + for (TencentVectorDBSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + log.info("Assigning {} to {} reader.", split, ownerReader); + + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split); + } + } + + private Collection generateSplits(CatalogTable catalogTable) { + TencentVectorDBSourceSplit tencentVectorDBSourceSplit = + TencentVectorDBSourceSplit.builder() + .tablePath(catalogTable.getTablePath()) + .splitId(catalogTable.getTablePath().getTableName()) + .build(); + + return Collections.singletonList(tencentVectorDBSourceSplit); + } + + /** + * Called to close the enumerator, in case it holds on to any resources, like threads or network + * connections. + */ + @Override + public void close() throws IOException {} + + /** + * Add a split back to the split enumerator. It will only happen when a {@link SourceReader} + * fails and there are splits assigned to it after the last successful checkpoint. + * + * @param splits The split to add back to the enumerator for reassignment. + * @param subtaskId The id of the subtask to which the returned splits belong. + */ + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (!splits.isEmpty()) { + synchronized (stateLock) { + addPendingSplit(splits, subtaskId); + if (context.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); + } else { + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); + } + } + } + log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size()); + } + + private void addPendingSplit(Collection splits, int ownerReader) { + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).addAll(splits); + } + + @Override + public int currentUnassignedSplitSize() { + return pendingTables.isEmpty() && pendingSplits.isEmpty() ? 0 : 1; + } + + @Override + public void handleSplitRequest(int subtaskId) {} + + @Override + public void registerReader(int subtaskId) { + log.info("Register reader {} to MilvusSourceSplitEnumerator.", subtaskId); + if (!pendingSplits.isEmpty()) { + synchronized (stateLock) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + } + + /** + * If the source is bounded, checkpoint is not triggered. + * + * @param checkpointId + */ + @Override + public TencentVectorDBSourceState snapshotState(long checkpointId) throws Exception { + synchronized (stateLock) { + return new TencentVectorDBSourceState( + new ArrayList(pendingTables), new HashMap<>(pendingSplits)); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceState.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceState.java new file mode 100644 index 00000000000..6987a37ac6b --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/source/TencentVectorDBSourceState.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.tencent.vectordb.source; + +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +@AllArgsConstructor +public class TencentVectorDBSourceState implements Serializable { + private List pendingTables; + private Map> pendingSplits; +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConnectorUtils.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConnectorUtils.java new file mode 100644 index 00000000000..76e53051bf4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConnectorUtils.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.tencent.vectordb.utils; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.CommonOptions; + +import com.google.common.collect.Lists; +import com.tencent.tcvectordb.client.RPCVectorDBClient; +import com.tencent.tcvectordb.client.VectorDBClient; +import com.tencent.tcvectordb.model.Collection; +import com.tencent.tcvectordb.model.Database; +import com.tencent.tcvectordb.model.param.collection.IndexField; +import com.tencent.tcvectordb.model.param.database.ConnectParam; +import com.tencent.tcvectordb.model.param.enums.ReadConsistencyEnum; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; +import static org.apache.seatunnel.api.table.type.VectorType.VECTOR_FLOAT_TYPE; +import static org.apache.seatunnel.api.table.type.VectorType.VECTOR_SPARSE_FLOAT_TYPE; +import static org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig.API_KEY; +import static org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig.COLLECTION; +import static org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig.DATABASE; +import static org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig.URL; +import static org.apache.seatunnel.connectors.tencent.vectordb.config.TencentVectorDBSourceConfig.USER_NAME; + +public class ConnectorUtils { + Map sourceTables; + private ReadonlyConfig config; + + public ConnectorUtils(ReadonlyConfig config) { + this.config = config; + this.sourceTables = new HashMap<>(); + } + + public Map getSourceTables() { + ConnectParam connectParam = + ConnectParam.newBuilder() + .withUrl(config.get(URL)) + .withUsername(config.get(USER_NAME)) + .withKey(config.get(API_KEY)) + .withTimeout(30) + .build(); + VectorDBClient client = + new RPCVectorDBClient(connectParam, ReadConsistencyEnum.EVENTUAL_CONSISTENCY); + Database database = client.database(config.get(DATABASE)); + Collection collection = database.describeCollection(config.get(COLLECTION)); + TablePath tablePath = TablePath.of(config.get(DATABASE), config.get(COLLECTION)); + + List columns = new ArrayList<>(); + String primaryKey = "id"; + for (IndexField indexField : collection.getIndexes()) { + if (indexField.isPrimaryKey()) { + columns.add( + PhysicalColumn.builder() + .name(indexField.getFieldName()) + .dataType(STRING_TYPE) + .build()); + primaryKey = indexField.getFieldName(); + } else if (indexField.isVectorField()) { + columns.add( + PhysicalColumn.builder() + .name(indexField.getFieldName()) + .dataType(VECTOR_FLOAT_TYPE) + .scale(indexField.getDimension()) + .build()); + } else if (indexField.isSparseVectorField()) { + columns.add( + PhysicalColumn.builder() + .name(indexField.getFieldName()) + .dataType(VECTOR_SPARSE_FLOAT_TYPE) + .build()); + } + } + Map options = new HashMap<>(); + options.put(CommonOptions.METADATA.getName(), true); + PhysicalColumn dynamicColumn = + PhysicalColumn.builder() + .name(CommonOptions.METADATA.getName()) + .dataType(STRING_TYPE) + .options(options) + .build(); + columns.add(dynamicColumn); + + TableSchema tableSchema = + TableSchema.builder() + .primaryKey(PrimaryKey.of(primaryKey, Lists.newArrayList(primaryKey))) + .columns(columns) + .build(); + Map sourceTables = new HashMap<>(); + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of("tencent", tablePath), + tableSchema, + new HashMap<>(), + new ArrayList<>(), + ""); + sourceTables.put(tablePath, catalogTable); + return sourceTables; + } +} diff --git a/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConverterUtils.java b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConverterUtils.java new file mode 100644 index 00000000000..21f7818550c --- /dev/null +++ b/seatunnel-connectors-v2/connector-tencent-vectordb/src/main/java/org/apache/seatunnel/connectors/tencent/vectordb/utils/ConverterUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.tencent.vectordb.utils; + +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.CommonOptions; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.BufferUtils; + +import org.apache.commons.lang3.tuple.Pair; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.tencent.tcvectordb.model.DocField; +import com.tencent.tcvectordb.model.Document; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.api.table.type.VectorType.VECTOR_FLOAT_TYPE; +import static org.apache.seatunnel.api.table.type.VectorType.VECTOR_SPARSE_FLOAT_TYPE; + +public class ConverterUtils { + public static SeaTunnelRow convertToSeatunnelRow(TableSchema tableSchema, Document vector) { + SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType(); + Object[] fields = new Object[typeInfo.getTotalFields()]; + List fieldNames = + Arrays.stream(typeInfo.getFieldNames()).collect(Collectors.toList()); + + for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) { + if (fieldNames.get(fieldIndex).equals("id")) { + fields[fieldIndex] = vector.getId(); + } else if (fieldNames.get(fieldIndex).equals(CommonOptions.METADATA.getName())) { + List meta = vector.getDocFields(); + JsonObject data = new JsonObject(); + for (DocField entry : meta) { + data.add(entry.getName(), convertValueToJsonElement(entry.getValue())); + } + fields[fieldIndex] = data.toString(); + } else if (typeInfo.getFieldType(fieldIndex).equals(VECTOR_FLOAT_TYPE)) { + // Convert each Double to Float + List floats = (List) vector.getVector(); + // Convert List to Float[] + Float[] floatArray = floats.stream().map(Double::floatValue).toArray(Float[]::new); + fields[fieldIndex] = BufferUtils.toByteBuffer(floatArray); + } else if (typeInfo.getFieldType(fieldIndex).equals(VECTOR_SPARSE_FLOAT_TYPE)) { + Map sparseMap = new HashMap<>(); + for (Pair pair : vector.getSparseVector()) { + sparseMap.put(pair.getKey(), pair.getValue()); + } + fields[fieldIndex] = sparseMap; + } + } + + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields); + seaTunnelRow.setRowKind(RowKind.INSERT); + return seaTunnelRow; + } + + private static JsonElement convertValueToJsonElement(Object value) { + Gson gson = new Gson(); + return gson.toJsonTree(value); + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index cf7314e619a..2fe5c85feb8 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -82,6 +82,7 @@ connector-qdrant connector-sls connector-typesense + connector-tencent-vectordb diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 553843e0cc5..0f5fcab6dcf 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -598,6 +598,13 @@ provided + + org.apache.seatunnel + connector-tencent-vectordb + ${project.version} + provided + + org.apache.seatunnel connector-activemq