diff --git a/src/sentry/ingest/consumer/processors.py b/src/sentry/ingest/consumer/processors.py index b4dc48ae05f5c..a386d94ec9240 100644 --- a/src/sentry/ingest/consumer/processors.py +++ b/src/sentry/ingest/consumer/processors.py @@ -25,6 +25,7 @@ from sentry.utils import metrics from sentry.utils.cache import cache_key_for_event from sentry.utils.dates import to_datetime +from sentry.utils.event_tracker import EventStageStatus, record_sampled_event_stage_status from sentry.utils.sdk import set_current_event_project from sentry.utils.snuba import RateLimitExceeded @@ -202,6 +203,8 @@ def process_event( else: with metrics.timer("ingest_consumer._store_event"): cache_key = processing_store.store(data) + record_sampled_event_stage_status(data["event_id"], EventStageStatus.REDIS_PUT) + save_attachments(attachments, cache_key) try: diff --git a/src/sentry/utils/event_tracker.py b/src/sentry/utils/event_tracker.py new file mode 100644 index 0000000000000..15c8cfdefdc04 --- /dev/null +++ b/src/sentry/utils/event_tracker.py @@ -0,0 +1,55 @@ +import logging +from enum import IntEnum + +from sentry.utils.hashlib import md5_text + + +class EventStageStatus(IntEnum): + REDIS_PUT = 1 + """ + redis_put + + save_event_started + + save_event_finished + + snuba_topic_put + + commit_log_topic_put + + ppf_topic_put + + post_process_started + + post_process_finished / the same as redis_deleted + """ + + +logger = logging.getLogger("EventTracker") + + +def is_sampled_to_track(event_id: str, sample_rate) -> bool: + """ + Normalize the integer to a float in the range [0, 1) + The md5 hashing algorithm is consistent and will make sure the same event_id's are sampled, + and provide all or nothing logging. + """ + hash_float = int(md5_text(event_id).hexdigest(), 16) / (2**128 - 1) + if hash_float < sample_rate: + return True + return False + + +def record_sampled_event_stage_status( + event_id: str, status: EventStageStatus, sample_rate: float = 0.01 +) -> None: + """ + Records how far an event has made it through the ingestion pipeline. + """ + if is_sampled_to_track(event_id, sample_rate): + extra = {"event_id": event_id, "status": status} + _do_record(extra) + + +def _do_record(extra): + logger.info("EventTracker.recorded", extra=extra) diff --git a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py index e4cd69ed814f0..b774f9f8aea65 100644 --- a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py +++ b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py @@ -161,6 +161,86 @@ def test_transactions_spawn_save_event_transaction( ) +@django_db_all +def test_transactions_save_event_transaction_is_tracked(default_project): + with patch("sentry.utils.event_tracker.is_sampled_to_track", return_value=True): + + project_id = default_project.id + now = datetime.datetime.now() + event = { + "type": "transaction", + "timestamp": now.isoformat(), + "start_timestamp": now.isoformat(), + "spans": [], + "contexts": { + "trace": { + "parent_span_id": "8988cec7cc0779c1", + "type": "trace", + "op": "foobar", + "trace_id": "a7d67cf796774551a95be6543cacd459", + "span_id": "babaae0d4b7512d9", + "status": "ok", + } + }, + } + payload = get_normalized_event(event, default_project) + event_id = payload["event_id"] + start_time = time.time() - 3600 + with patch("sentry.utils.event_tracker._do_record") as mock_record: + process_event( + ConsumerType.Transactions, + message={ + "payload": orjson.dumps(payload).decode(), + "start_time": start_time, + "event_id": event_id, + "project_id": project_id, + "remote_addr": "127.0.0.1", + }, + project=default_project, + ) + mock_record.assert_called_once() + + +@django_db_all +def test_transactions_save_event_transaction_is_not_tracked(default_project): + with patch("sentry.utils.event_tracker.is_sampled_to_track", return_value=False): + + project_id = default_project.id + now = datetime.datetime.now() + event = { + "type": "transaction", + "timestamp": now.isoformat(), + "start_timestamp": now.isoformat(), + "spans": [], + "contexts": { + "trace": { + "parent_span_id": "8988cec7cc0779c1", + "type": "trace", + "op": "foobar", + "trace_id": "a7d67cf796774551a95be6543cacd459", + "span_id": "babaae0d4b7512d9", + "status": "ok", + } + }, + } + payload = get_normalized_event(event, default_project) + event_id = payload["event_id"] + start_time = time.time() - 3600 + with patch("sentry.utils.event_tracker._do_record") as mock_record: + process_event( + ConsumerType.Transactions, + message={ + "payload": orjson.dumps(payload).decode(), + "start_time": start_time, + "event_id": event_id, + "project_id": project_id, + "remote_addr": "127.0.0.1", + }, + project=default_project, + ) + mock_record.assert_not_called() + + @django_db_all def test_accountant_transaction(default_project): storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()