Skip to content

Commit

Permalink
DynamoDB: Use ON CONFLICT DO NOTHING clause on INSERT CDC operations
Browse files Browse the repository at this point in the history
... to mitigate errors when events are relayed redundantly from retries
after partially failed batches on the Lambda processor.
  • Loading branch information
amotl committed Oct 24, 2024
1 parent b4db410 commit 671b47d
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased
- DynamoDB/Testing: Use CrateDB nightly again
- DynamoDB: Use `ON CONFLICT DO NOTHING` clause on CDC operations
of type `INSERT`, to mitigate errors when events are relayed
redundantly from retries after partially failed batches on the
Lambda processor.

## 2024/10/09 v0.0.21
- MongoDB: Fixed BSON decoding of `{"$date": 1180690093000}` timestamps
Expand Down
3 changes: 2 additions & 1 deletion src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
f") VALUES ("
f":pk, "
f":typed, "
f":untyped);"
f":untyped) "
f"ON CONFLICT DO NOTHING;"
)
parameters = record.to_dict()

Expand Down
11 changes: 7 additions & 4 deletions tests/transform/test_dynamodb_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def test_decode_cdc_unknown_event(dynamodb_cdc_translator_foo):

def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo):
assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_BASIC) == SQLOperation(
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);",
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;",
parameters={
"pk": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand All @@ -221,7 +221,7 @@ def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo):

def test_decode_cdc_insert_nested(dynamodb_cdc_translator_foo):
assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_NESTED) == SQLOperation(
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);",
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;",
parameters={
"pk": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand Down Expand Up @@ -349,7 +349,7 @@ def test_deserialize_list_objects():
@pytest.mark.integration
def test_to_sql_cratedb(caplog, cratedb, dynamodb_full_translator_foo):
"""
Verify converging NewImage CDC INSERT event to CrateDB.
Verify converging NewImage CDC INSERT events to CrateDB.
"""

# Use the full-load translator only for running the SQL DDL.
Expand All @@ -362,7 +362,10 @@ def test_to_sql_cratedb(caplog, cratedb, dynamodb_full_translator_foo):
translator = DynamoDBCDCTranslator(table_name="from.dynamodb")
operation = translator.to_sql(MSG_INSERT_NESTED)

# Insert into CrateDB.
# Insert into CrateDB. Running the INSERT operation twice proves it
# does not raise any `DuplicateKeyException`, because CDC INSERT
# statements use `ON CONFLICT DO NOTHING`.
cratedb.database.run_sql(operation.statement, operation.parameters)
cratedb.database.run_sql(operation.statement, operation.parameters)

# Verify data in target database.
Expand Down

0 comments on commit 671b47d

Please sign in to comment.