diff --git a/CHANGES.md b/CHANGES.md index 0ddf928..af1a7ae 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,10 @@ ## Unreleased - DynamoDB/Testing: Use CrateDB nightly again +- DynamoDB: Use `ON CONFLICT DO NOTHING` clause on CDC operations + of type `INSERT`, to mitigate errors when events are relayed + redundantly from retries after partially failed batches on the + Lambda processor. ## 2024/10/09 v0.0.21 - MongoDB: Fixed BSON decoding of `{"$date": 1180690093000}` timestamps diff --git a/src/commons_codec/transform/dynamodb.py b/src/commons_codec/transform/dynamodb.py index 2814b78..7bd8408 100644 --- a/src/commons_codec/transform/dynamodb.py +++ b/src/commons_codec/transform/dynamodb.py @@ -201,7 +201,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation: f") VALUES (" f":pk, " f":typed, " - f":untyped);" + f":untyped) " + f"ON CONFLICT DO NOTHING;" ) parameters = record.to_dict() diff --git a/tests/transform/test_dynamodb_cdc.py b/tests/transform/test_dynamodb_cdc.py index 92d1c37..153cdc1 100644 --- a/tests/transform/test_dynamodb_cdc.py +++ b/tests/transform/test_dynamodb_cdc.py @@ -200,7 +200,7 @@ def test_decode_cdc_unknown_event(dynamodb_cdc_translator_foo): def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo): assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_BASIC) == SQLOperation( - statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);", + statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;", parameters={ "pk": { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", @@ -221,7 +221,7 @@ def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo): def test_decode_cdc_insert_nested(dynamodb_cdc_translator_foo): assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_NESTED) == SQLOperation( - statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);", + statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;", parameters={ "pk": { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", @@ -349,7 +349,7 @@ def test_deserialize_list_objects(): @pytest.mark.integration def test_to_sql_cratedb(caplog, cratedb, dynamodb_full_translator_foo): """ - Verify converging NewImage CDC INSERT event to CrateDB. + Verify converging NewImage CDC INSERT events to CrateDB. """ # Use the full-load translator only for running the SQL DDL. @@ -362,7 +362,10 @@ def test_to_sql_cratedb(caplog, cratedb, dynamodb_full_translator_foo): translator = DynamoDBCDCTranslator(table_name="from.dynamodb") operation = translator.to_sql(MSG_INSERT_NESTED) - # Insert into CrateDB. + # Insert into CrateDB. Running the INSERT operation twice proves it + # does not raise any `DuplicateKeyException`, because CDC INSERT + # statements use `ON CONFLICT DO NOTHING`. + cratedb.database.run_sql(operation.statement, operation.parameters) cratedb.database.run_sql(operation.statement, operation.parameters) # Verify data in target database.