diff --git a/CHANGES.md b/CHANGES.md index 0ddf928..af1a7ae 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,10 @@ ## Unreleased - DynamoDB/Testing: Use CrateDB nightly again +- DynamoDB: Use `ON CONFLICT DO NOTHING` clause on CDC operations + of type `INSERT`, to mitigate errors when events are relayed + redundantly from retries after partially failed batches on the + Lambda processor. ## 2024/10/09 v0.0.21 - MongoDB: Fixed BSON decoding of `{"$date": 1180690093000}` timestamps diff --git a/src/commons_codec/transform/dynamodb.py b/src/commons_codec/transform/dynamodb.py index 2814b78..7bd8408 100644 --- a/src/commons_codec/transform/dynamodb.py +++ b/src/commons_codec/transform/dynamodb.py @@ -201,7 +201,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation: f") VALUES (" f":pk, " f":typed, " - f":untyped);" + f":untyped) " + f"ON CONFLICT DO NOTHING;" ) parameters = record.to_dict() diff --git a/tests/transform/test_dynamodb_cdc.py b/tests/transform/test_dynamodb_cdc.py index a4d7add..153cdc1 100644 --- a/tests/transform/test_dynamodb_cdc.py +++ b/tests/transform/test_dynamodb_cdc.py @@ -4,7 +4,7 @@ import pytest from commons_codec.model import SQLOperation, UniversalRecord -from commons_codec.transform.dynamodb import CrateDBTypeDeserializer +from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoDBCDCTranslator, DynamoDBFullLoadTranslator pytestmark = pytest.mark.dynamodb @@ -200,7 +200,7 @@ def test_decode_cdc_unknown_event(dynamodb_cdc_translator_foo): def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo): assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_BASIC) == SQLOperation( - statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);", + statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;", parameters={ "pk": { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", @@ -221,7 +221,7 @@ def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo): def test_decode_cdc_insert_nested(dynamodb_cdc_translator_foo): assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_NESTED) == SQLOperation( - statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);", + statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;", parameters={ "pk": { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", @@ -344,3 +344,36 @@ def test_deserialize_list_objects(): ] } ) + + +@pytest.mark.integration +def test_to_sql_cratedb(caplog, cratedb, dynamodb_full_translator_foo): + """ + Verify converging NewImage CDC INSERT events to CrateDB. + """ + + # Use the full-load translator only for running the SQL DDL. + translator = DynamoDBFullLoadTranslator( + table_name="from.dynamodb", primary_key_schema=dynamodb_full_translator_foo.primary_key_schema + ) + cratedb.database.run_sql(translator.sql_ddl) + + # Compute CrateDB operation (SQL+parameters) from DynamoDB record. + translator = DynamoDBCDCTranslator(table_name="from.dynamodb") + operation = translator.to_sql(MSG_INSERT_NESTED) + + # Insert into CrateDB. Running the INSERT operation twice proves it + # does not raise any `DuplicateKeyException`, because CDC INSERT + # statements use `ON CONFLICT DO NOTHING`. + cratedb.database.run_sql(operation.statement, operation.parameters) + cratedb.database.run_sql(operation.statement, operation.parameters) + + # Verify data in target database. + assert cratedb.database.table_exists("from.dynamodb") is True + assert cratedb.database.refresh_table("from.dynamodb") is True + assert cratedb.database.count_records("from.dynamodb") == 1 + + results = cratedb.database.run_sql('SELECT * FROM "from".dynamodb;', records=True) # noqa: S608 + assert results[0]["pk"]["id"] == MSG_INSERT_NESTED["dynamodb"]["NewImage"]["id"]["S"] + assert results[0]["data"]["meta"]["timestamp"] == "2024-07-12T01:17:42" + assert results[0]["aux"] == {}