Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB: Use ON CONFLICT DO NOTHING clause on INSERT CDC operations #77

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased
- DynamoDB/Testing: Use CrateDB nightly again
- DynamoDB: Use `ON CONFLICT DO NOTHING` clause on CDC operations
of type `INSERT`, to mitigate errors when events are relayed
redundantly from retries after partially failed batches on the
Lambda processor.

## 2024/10/09 v0.0.21
- MongoDB: Fixed BSON decoding of `{"$date": 1180690093000}` timestamps
Expand Down
3 changes: 2 additions & 1 deletion src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
f") VALUES ("
f":pk, "
f":typed, "
f":untyped);"
f":untyped) "
f"ON CONFLICT DO NOTHING;"
)
parameters = record.to_dict()

Expand Down
39 changes: 36 additions & 3 deletions tests/transform/test_dynamodb_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest

from commons_codec.model import SQLOperation, UniversalRecord
from commons_codec.transform.dynamodb import CrateDBTypeDeserializer
from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoDBCDCTranslator, DynamoDBFullLoadTranslator

pytestmark = pytest.mark.dynamodb

Expand Down Expand Up @@ -200,7 +200,7 @@ def test_decode_cdc_unknown_event(dynamodb_cdc_translator_foo):

def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo):
assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_BASIC) == SQLOperation(
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);",
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;",
parameters={
"pk": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand All @@ -221,7 +221,7 @@ def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo):

def test_decode_cdc_insert_nested(dynamodb_cdc_translator_foo):
assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_NESTED) == SQLOperation(
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);",
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;",
parameters={
"pk": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand Down Expand Up @@ -344,3 +344,36 @@ def test_deserialize_list_objects():
]
}
)


@pytest.mark.integration
def test_to_sql_cratedb(caplog, cratedb, dynamodb_full_translator_foo):
"""
Verify converging NewImage CDC INSERT events to CrateDB.
"""

# Use the full-load translator only for running the SQL DDL.
translator = DynamoDBFullLoadTranslator(
table_name="from.dynamodb", primary_key_schema=dynamodb_full_translator_foo.primary_key_schema
)
cratedb.database.run_sql(translator.sql_ddl)

# Compute CrateDB operation (SQL+parameters) from DynamoDB record.
translator = DynamoDBCDCTranslator(table_name="from.dynamodb")
operation = translator.to_sql(MSG_INSERT_NESTED)

# Insert into CrateDB. Running the INSERT operation twice proves it
# does not raise any `DuplicateKeyException`, because CDC INSERT
# statements use `ON CONFLICT DO NOTHING`.
cratedb.database.run_sql(operation.statement, operation.parameters)
cratedb.database.run_sql(operation.statement, operation.parameters)

# Verify data in target database.
assert cratedb.database.table_exists("from.dynamodb") is True
assert cratedb.database.refresh_table("from.dynamodb") is True
assert cratedb.database.count_records("from.dynamodb") == 1

results = cratedb.database.run_sql('SELECT * FROM "from".dynamodb;', records=True) # noqa: S608
assert results[0]["pk"]["id"] == MSG_INSERT_NESTED["dynamodb"]["NewImage"]["id"]["S"]
assert results[0]["data"]["meta"]["timestamp"] == "2024-07-12T01:17:42"
assert results[0]["aux"] == {}