Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicates in the same partition with upsert mode enabled and primary Key given #291

Open
ArkaSarkar19 opened this issue Sep 11, 2024 · 0 comments

Comments

@ArkaSarkar19
Copy link

Hi Team

We are noticing duplicates to occur within the same partition when we configure the primary keys with upset-mode enabled on the table. We are tried the two setups below:

  1. Reading CDC data with cdc-field and mode enabled in iceberg connector.
  2. Reading Data form Kafka with a primary key.

In both the setups the issue occurs.
Ideally we should see only one record per primary key in a partition, however the results are inconsistent there are a few percentage of records which are duplicated within the same partition. We need some support on why this might be occurring, we suspect it could be due to some concurrency issue in the commit coordinator.
I am attaching the connector config below :

{
 "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
 "iceberg.catalog.table-override.write.data.path": "<path>",
 "iceberg.catalog.table-override.write.parquet.compression-codec": "snappy",
 "errors.log.include.messages": "true",
 "iceberg.catalog.s3.region": "us-east-1",
 "iceberg.catalog.client.region": "us-east-1",
 "iceberg.catalog.table-override.write.metadata.path": "<path>",
 "errors.log.enable": "true",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "consumer.override.bootstrap.servers": "<url>",
 "value.converter.schema.registry.url": "<url>",

 "name": "test_connector",
 "iceberg.table.db.table.partition-by": "date,hour",
 "iceberg.tables.evolve-schema-enabled": "true",
 "iceberg.catalog.table-override.write.metadata.previous-versions-max": "100",
 "tasks.max": "1",
 "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
 "iceberg.catalog.s3.sse.key": "AES256",
 "iceberg.tables.upsert-mode-enabled": "true",
 "iceberg.tables.auto-create-enabled": "true",
 "iceberg.tables": "db.table",
 "value.converter": "io.confluent.connect.avro.AvroConverter",
 "iceberg.catalog.s3.sse.type": "s3",
 "iceberg.catalog.table-override.write.metadata.delete-after-commit.enabled": "true",
 "iceberg.table.db.table.id-columns": "id",
 "topics": "<topic>",
 "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
 "iceberg.control.commit.interval-ms": "900000",
 "iceberg.catalog.uri": "<uri>",
 "key.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
 "iceberg.catalog": "spark_catalog",
 "consumer.override.auto.offset.reset": "latest",
 "iceberg.tables.schema-case-insensitive": "true",
 "iceberg.catalog.warehouse": "<warehourse>",
 "iceberg.control.topic": "<topic>",
 "key.converter.schema.registry.url": "<url>",
 "iceberg.catalog.type": "hive",
 "iceberg.catalog.s3.path-style-access": "true"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant