From 4caab12566060b1e425556025b036e187f2db63a Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Mon, 17 Jun 2024 13:32:24 -0700 Subject: [PATCH 01/10] CV2-4719 initial idea saround caching layer in presto --- lib/cache.py | 51 +++++++++++++++++++++++++++++++++++++++++ lib/model/model.py | 7 +++++- lib/schemas.py | 1 + test/lib/test_cache.py | 52 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 lib/cache.py create mode 100644 test/lib/test_cache.py diff --git a/lib/cache.py b/lib/cache.py new file mode 100644 index 0000000..51d7962 --- /dev/null +++ b/lib/cache.py @@ -0,0 +1,51 @@ +import redis +import json +from typing import Any, Optional +from lib.helpers import get_environment_setting + +REDIS_URL = get_environment_setting("REDIS_URL") +DEFAULT_TTL = int(get_environment_setting("CACHE_DEFAULT_TTL") or 24*60*60) +class Cache: + @staticmethod + def get_client() -> redis.Redis: + """ + Get a Redis client instance using the provided REDIS_URL. + + Returns: + redis.Redis: Redis client instance. + """ + return redis.Redis.from_url(REDIS_URL) + + @staticmethod + def get_cached_result(content_hash: str, reset_ttl: bool = True, ttl: int = DEFAULT_TTL) -> Optional[Any]: + """ + Retrieve the cached result for the given content hash. By default, reset the TTL to 24 hours. + + Args: + content_hash (str): The key for the cached content. + reset_ttl (bool): Whether to reset the TTL upon access. Default is True. + ttl (int): Time-to-live for the cache in seconds. Default is 86400 seconds (24 hours). + + Returns: + Optional[Any]: The cached result, or None if the key does not exist. + """ + client = Cache.get_client() + cached_result = client.get(content_hash) + if cached_result is not None: + if reset_ttl: + client.expire(content_hash, ttl) + return json.loads(cached_result) + return None + + @staticmethod + def set_cached_result(content_hash: str, result: Any, ttl: int = DEFAULT_TTL) -> None: + """ + Store the result in the cache with the given content hash and TTL. + + Args: + content_hash (str): The key for the cached content. + result (Any): The result to cache. + ttl (int): Time-to-live for the cache in seconds. Default is 86400 seconds (24 hours). + """ + client = Cache.get_client() + client.setex(content_hash, ttl, json.dumps(result)) diff --git a/lib/model/model.py b/lib/model/model.py index 3c50e0b..9cd9244 100644 --- a/lib/model/model.py +++ b/lib/model/model.py @@ -46,7 +46,12 @@ def respond(self, messages: Union[List[schemas.Message], schemas.Message]) -> Li if not isinstance(messages, list): messages = [messages] for message in messages: - message.body.result = self.process(message) + existing = Cache.get_cached_result(message.body.content_hash) + if existing: + message.body.result = existing + else: + message.body.result = self.process(message) + Cache.set_cached_result(message.body.content_hash, message.body.result) return messages @classmethod diff --git a/lib/schemas.py b/lib/schemas.py index fac654c..a25e682 100644 --- a/lib/schemas.py +++ b/lib/schemas.py @@ -13,6 +13,7 @@ class YakeKeywordsResponse(BaseModel): class GenericItem(BaseModel): id: Union[str, int, float] + content_hash: Optional[str] = None callback_url: Optional[str] = None url: Optional[str] = None text: Optional[str] = None diff --git a/test/lib/test_cache.py b/test/lib/test_cache.py new file mode 100644 index 0000000..b675a1b --- /dev/null +++ b/test/lib/test_cache.py @@ -0,0 +1,52 @@ +import pytest +from unittest.mock import patch, MagicMock +from lib.cache import Cache + +# Mock the Redis client and its methods +@pytest.fixture +def mock_redis_client(): + with patch('lib.cache.redis.Redis') as mock_redis: + yield mock_redis + +def test_set_cached_result(mock_redis_client): + mock_instance = mock_redis_client.from_url.return_value + content_hash = "test_hash" + result = {"data": "example"} + ttl = 3600 + + Cache.set_cached_result(content_hash, result, ttl) + + mock_instance.setex.assert_called_once_with(content_hash, ttl, '{"data": "example"}') + +def test_get_cached_result_exists(mock_redis_client): + mock_instance = mock_redis_client.from_url.return_value + content_hash = "test_hash" + ttl = 3600 + cached_data = '{"data": "example"}' + mock_instance.get.return_value = cached_data + + result = Cache.get_cached_result(content_hash, reset_ttl=True, ttl=ttl) + + assert result == {"data": "example"} + mock_instance.expire.assert_called_once_with(content_hash, ttl) + +def test_get_cached_result_not_exists(mock_redis_client): + mock_instance = mock_redis_client.from_url.return_value + content_hash = "test_hash" + mock_instance.get.return_value = None + + result = Cache.get_cached_result(content_hash) + + assert result is None + mock_instance.expire.assert_not_called() + +def test_get_cached_result_no_ttl_reset(mock_redis_client): + mock_instance = mock_redis_client.from_url.return_value + content_hash = "test_hash" + cached_data = '{"data": "example"}' + mock_instance.get.return_value = cached_data + + result = Cache.get_cached_result(content_hash, reset_ttl=False) + + assert result == {"data": "example"} + mock_instance.expire.assert_not_called() From 4561f6c2e244e5d67c5c511909b38f0c2446a9ac Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Mon, 17 Jun 2024 14:03:58 -0700 Subject: [PATCH 02/10] add redis as requirement --- docker-compose.yml | 18 ++++++++++++++++++ requirements.txt | 3 ++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1fb4e27..e52e842 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,6 +23,8 @@ services: depends_on: elasticmq: condition: service_healthy + redis: + condition: service_healthy links: - elasticmq volumes: @@ -41,6 +43,16 @@ services: interval: 10s timeout: 5s retries: 10 + redis: + image: redis:latest + hostname: presto-redis + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 image: build: . platform: linux/amd64 @@ -54,6 +66,8 @@ services: depends_on: elasticmq: condition: service_healthy + redis: + condition: service_healthy audio: build: . platform: linux/amd64 @@ -67,6 +81,8 @@ services: depends_on: elasticmq: condition: service_healthy + redis: + condition: service_healthy yake: build: . platform: linux/amd64 @@ -80,3 +96,5 @@ services: depends_on: elasticmq: condition: service_healthy + redis: + condition: service_healthy diff --git a/requirements.txt b/requirements.txt index e4d2f1e..cf42623 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,4 +16,5 @@ sentry-sdk==1.30.0 yake==0.4.8 opentelemetry-api==1.24.0 opentelemetry-exporter-otlp-proto-http==1.24.0 -opentelemetry-sdk==1.24.0 \ No newline at end of file +opentelemetry-sdk==1.24.0 +redis==5.0.6 \ No newline at end of file From eb67d130c761373e7f633203f48abb10bf02a91c Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Mon, 17 Jun 2024 14:11:13 -0700 Subject: [PATCH 03/10] more tweaking for env vars --- .env_file.example | 4 +++- .env_file.test | 4 +++- docker-compose.yml | 2 ++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.env_file.example b/.env_file.example index d487f9b..7059e79 100644 --- a/.env_file.example +++ b/.env_file.example @@ -11,4 +11,6 @@ OTEL_SERVICE_NAME=my-service-name OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf OTEL_EXPORTER_OTLP_ENDPOINT="https://api.honeycomb.io" OTEL_EXPORTER_OTLP_HEADERS="x-honeycomb-team=XXX" -HONEYCOMB_API_ENDPOINT="https://api.honeycomb.io" \ No newline at end of file +HONEYCOMB_API_ENDPOINT="https://api.honeycomb.io" +REDIS_URL="redis://redis:6379/0" +CACHE_DEFAULT_TTL=86400 \ No newline at end of file diff --git a/.env_file.test b/.env_file.test index d487f9b..7059e79 100644 --- a/.env_file.test +++ b/.env_file.test @@ -11,4 +11,6 @@ OTEL_SERVICE_NAME=my-service-name OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf OTEL_EXPORTER_OTLP_ENDPOINT="https://api.honeycomb.io" OTEL_EXPORTER_OTLP_HEADERS="x-honeycomb-team=XXX" -HONEYCOMB_API_ENDPOINT="https://api.honeycomb.io" \ No newline at end of file +HONEYCOMB_API_ENDPOINT="https://api.honeycomb.io" +REDIS_URL="redis://redis:6379/0" +CACHE_DEFAULT_TTL=86400 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index e52e842..2922586 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,6 +18,8 @@ services: - OTEL_EXPORTER_OTLP_HEADERS=${OTEL_EXPORTER_OTLP_HEADERS} - HONEYCOMB_API_KEY=${HONEYCOMB_API_KEY} - HONEYCOMB_API_ENDPOINT=${HONEYCOMB_API_ENDPOINT} + - REDIS_URL=${REDIS_URL} + - CACHE_DEFAULT_TTL=${CACHE_DEFAULT_TTL} env_file: - ./.env_file depends_on: From 5dd53eff6e238cbd0c861ce552ad0c8135aaee15 Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Tue, 18 Jun 2024 05:45:56 -0700 Subject: [PATCH 04/10] fix broken tests --- lib/model/model.py | 1 + test/lib/model/test_video.py | 5 ++++- test/lib/queue/test_queue.py | 8 ++++---- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/model/model.py b/lib/model/model.py index 9cd9244..61e4308 100644 --- a/lib/model/model.py +++ b/lib/model/model.py @@ -7,6 +7,7 @@ from lib.helpers import get_class from lib import schemas +from lib.cache import Cache class Model(ABC): BATCH_SIZE = 1 def __init__(self): diff --git a/test/lib/model/test_video.py b/test/lib/model/test_video.py index a62e2cc..35c0b60 100644 --- a/test/lib/model/test_video.py +++ b/test/lib/model/test_video.py @@ -56,7 +56,10 @@ def test_respond_with_single_video(self): mock_process.assert_called_once_with(video) self.assertEqual(result, [video]) - def test_respond_with_multiple_videos(self): + @patch('lib.cache.Cache') + def test_respond_with_multiple_videos(self, mock_cache): + mock_cache.get_cached_result.return_value = None + mock_cache.set_cached_result.return_value = True videos = [schemas.parse_message({"body": {"id": "123", "callback_url": "http://blah.com?callback_id=123", "url": "http://example.com/video.mp4"}, "model_name": "video__Model"}), schemas.parse_message({"body": {"id": "123", "callback_url": "http://blah.com?callback_id=123", "url": "http://example.com/video2.mp4"}, "model_name": "video__Model"})] mock_process = MagicMock() self.video_model.process = mock_process diff --git a/test/lib/queue/test_queue.py b/test/lib/queue/test_queue.py index c53df7b..ff1e536 100644 --- a/test/lib/queue/test_queue.py +++ b/test/lib/queue/test_queue.py @@ -147,19 +147,19 @@ def test_delete_messages_from_queue(self, mock_logger): mock_logger.assert_called_with(f"Deleting message: {mock_messages[-1]}") def test_push_message(self): - message_to_push = schemas.parse_message({"body": {"id": 1, "callback_url": "http://example.com", "text": "This is a test"}, "model_name": "mean_tokens__Model"}) + message_to_push = schemas.parse_message({"body": {"id": 1, "content_hash": None, "callback_url": "http://example.com", "text": "This is a test"}, "model_name": "mean_tokens__Model"}) # Call push_message returned_message = self.queue.push_message(self.queue_name_output, message_to_push) # Check if the message was correctly serialized and sent - self.mock_output_queue.send_message.assert_called_once_with(MessageBody='{"body": {"id": 1, "callback_url": "http://example.com", "url": null, "text": "This is a test", "raw": {}, "parameters": {}, "result": {"hash_value": null}}, "model_name": "mean_tokens__Model", "retry_count": 0}') + self.mock_output_queue.send_message.assert_called_once_with(MessageBody='{"body": {"id": 1, "content_hash": null, "callback_url": "http://example.com", "url": null, "text": "This is a test", "raw": {}, "parameters": {}, "result": {"hash_value": null}}, "model_name": "mean_tokens__Model", "retry_count": 0}') self.assertEqual(returned_message, message_to_push) def test_push_to_dead_letter_queue(self): - message_to_push = schemas.parse_message({"body": {"id": 1, "callback_url": "http://example.com", "text": "This is a test"}, "model_name": "mean_tokens__Model"}) + message_to_push = schemas.parse_message({"body": {"id": 1, "content_hash": None, "callback_url": "http://example.com", "text": "This is a test"}, "model_name": "mean_tokens__Model"}) # Call push_to_dead_letter_queue self.queue.push_to_dead_letter_queue(message_to_push) # Check if the message was correctly serialized and sent to the DLQ - self.mock_dlq_queue.send_message.assert_called_once_with(MessageBody='{"body": {"id": 1, "callback_url": "http://example.com", "url": null, "text": "This is a test", "raw": {}, "parameters": {}, "result": {"hash_value": null}}, "model_name": "mean_tokens__Model", "retry_count": 0}') + self.mock_dlq_queue.send_message.assert_called_once_with(MessageBody='{"body": {"id": 1,"content_hash": null, "callback_url": "http://example.com", "url": null, "text": "This is a test", "raw": {}, "parameters": {}, "result": {"hash_value": null}}, "model_name": "mean_tokens__Model", "retry_count": 0}') def test_increment_message_error_counts_exceed_max_retries(self): message_body = { From a6dc8573d616ade4ae3b5089220837320fa2f867 Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Tue, 18 Jun 2024 07:01:49 -0700 Subject: [PATCH 05/10] fix tests --- test/lib/model/test_video.py | 15 ++++++++++----- test/lib/queue/test_queue.py | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/test/lib/model/test_video.py b/test/lib/model/test_video.py index 35c0b60..e8c802c 100644 --- a/test/lib/model/test_video.py +++ b/test/lib/model/test_video.py @@ -48,7 +48,11 @@ def test_tmk_program_name(self): result = self.video_model.tmk_program_name() self.assertEqual(result, "PrestoVideoEncoder") - def test_respond_with_single_video(self): + @patch('lib.cache.Cache.get_cached_result') + @patch('lib.cache.Cache.set_cached_result') + def test_respond_with_single_video(self, mock_cache_set, mock_cache_get): + mock_cache_get.return_value = None + mock_cache_set.return_value = True video = schemas.parse_message({"body": {"id": "123", "callback_url": "http://blah.com?callback_id=123", "url": "http://example.com/video.mp4"}, "model_name": "video__Model"}) mock_process = MagicMock() self.video_model.process = mock_process @@ -56,10 +60,11 @@ def test_respond_with_single_video(self): mock_process.assert_called_once_with(video) self.assertEqual(result, [video]) - @patch('lib.cache.Cache') - def test_respond_with_multiple_videos(self, mock_cache): - mock_cache.get_cached_result.return_value = None - mock_cache.set_cached_result.return_value = True + @patch('lib.cache.Cache.get_cached_result') + @patch('lib.cache.Cache.set_cached_result') + def test_respond_with_multiple_videos(self, mock_cache_set, mock_cache_get): + mock_cache_get.return_value = None + mock_cache_set.return_value = True videos = [schemas.parse_message({"body": {"id": "123", "callback_url": "http://blah.com?callback_id=123", "url": "http://example.com/video.mp4"}, "model_name": "video__Model"}), schemas.parse_message({"body": {"id": "123", "callback_url": "http://blah.com?callback_id=123", "url": "http://example.com/video2.mp4"}, "model_name": "video__Model"})] mock_process = MagicMock() self.video_model.process = mock_process diff --git a/test/lib/queue/test_queue.py b/test/lib/queue/test_queue.py index ff1e536..0961802 100644 --- a/test/lib/queue/test_queue.py +++ b/test/lib/queue/test_queue.py @@ -159,7 +159,7 @@ def test_push_to_dead_letter_queue(self): # Call push_to_dead_letter_queue self.queue.push_to_dead_letter_queue(message_to_push) # Check if the message was correctly serialized and sent to the DLQ - self.mock_dlq_queue.send_message.assert_called_once_with(MessageBody='{"body": {"id": 1,"content_hash": null, "callback_url": "http://example.com", "url": null, "text": "This is a test", "raw": {}, "parameters": {}, "result": {"hash_value": null}}, "model_name": "mean_tokens__Model", "retry_count": 0}') + self.mock_dlq_queue.send_message.assert_called_once_with(MessageBody='{"body": {"id": 1, "content_hash": null, "callback_url": "http://example.com", "url": null, "text": "This is a test", "raw": {}, "parameters": {}, "result": {"hash_value": null}}, "model_name": "mean_tokens__Model", "retry_count": 0}') def test_increment_message_error_counts_exceed_max_retries(self): message_body = { From bb6080e91ac29ba41c9abb1eaf99270093fb546d Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Tue, 18 Jun 2024 08:11:28 -0700 Subject: [PATCH 06/10] minor refactor for readability and bypass when no content hash is sent --- lib/cache.py | 13 +++++++------ lib/model/model.py | 17 +++++++++++------ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/lib/cache.py b/lib/cache.py index 51d7962..bb1f857 100644 --- a/lib/cache.py +++ b/lib/cache.py @@ -29,12 +29,13 @@ def get_cached_result(content_hash: str, reset_ttl: bool = True, ttl: int = DEFA Returns: Optional[Any]: The cached result, or None if the key does not exist. """ - client = Cache.get_client() - cached_result = client.get(content_hash) - if cached_result is not None: - if reset_ttl: - client.expire(content_hash, ttl) - return json.loads(cached_result) + if content_hash: + client = Cache.get_client() + cached_result = client.get(content_hash) + if cached_result is not None: + if reset_ttl: + client.expire(content_hash, ttl) + return json.loads(cached_result) return None @staticmethod diff --git a/lib/model/model.py b/lib/model/model.py index 61e4308..d7c6ee4 100644 --- a/lib/model/model.py +++ b/lib/model/model.py @@ -40,6 +40,16 @@ def get_tempfile(self) -> Any: def process(self, messages: Union[List[schemas.Message], schemas.Message]) -> List[schemas.Message]: return [] + def get_response(self, message: schemas.Message) -> schemas.GenericItem: + """ + Perform a lookup on the cache for a message, and if found, return that cached value. + """ + result = Cache.get_cached_result(message.body.content_hash) + if not result: + result = self.process(message) + Cache.set_cached_result(message.body.content_hash, message.body.result) + return result + def respond(self, messages: Union[List[schemas.Message], schemas.Message]) -> List[schemas.Message]: """ Force messages as list of messages in case we get a singular item. Then, run fingerprint routine. @@ -47,12 +57,7 @@ def respond(self, messages: Union[List[schemas.Message], schemas.Message]) -> Li if not isinstance(messages, list): messages = [messages] for message in messages: - existing = Cache.get_cached_result(message.body.content_hash) - if existing: - message.body.result = existing - else: - message.body.result = self.process(message) - Cache.set_cached_result(message.body.content_hash, message.body.result) + message.body.result = self.get_response(message) return messages @classmethod From 5f59278539313b629ef4bf0494ceffeab4ad04b9 Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Thu, 20 Jun 2024 06:02:15 -0700 Subject: [PATCH 07/10] tweaks after local testing --- lib/cache.py | 5 +++-- lib/model/model.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/cache.py b/lib/cache.py index bb1f857..41a3518 100644 --- a/lib/cache.py +++ b/lib/cache.py @@ -48,5 +48,6 @@ def set_cached_result(content_hash: str, result: Any, ttl: int = DEFAULT_TTL) -> result (Any): The result to cache. ttl (int): Time-to-live for the cache in seconds. Default is 86400 seconds (24 hours). """ - client = Cache.get_client() - client.setex(content_hash, ttl, json.dumps(result)) + if content_hash: + client = Cache.get_client() + client.setex(content_hash, ttl, json.dumps(result)) diff --git a/lib/model/model.py b/lib/model/model.py index d7c6ee4..8472193 100644 --- a/lib/model/model.py +++ b/lib/model/model.py @@ -47,7 +47,7 @@ def get_response(self, message: schemas.Message) -> schemas.GenericItem: result = Cache.get_cached_result(message.body.content_hash) if not result: result = self.process(message) - Cache.set_cached_result(message.body.content_hash, message.body.result) + Cache.set_cached_result(message.body.content_hash, result) return result def respond(self, messages: Union[List[schemas.Message], schemas.Message]) -> List[schemas.Message]: From 8b67507730551896df0a5625c1c781be9af876db Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Thu, 20 Jun 2024 06:14:26 -0700 Subject: [PATCH 08/10] Add prefix to presto cache --- lib/cache.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/cache.py b/lib/cache.py index 41a3518..4e4d809 100644 --- a/lib/cache.py +++ b/lib/cache.py @@ -5,6 +5,7 @@ REDIS_URL = get_environment_setting("REDIS_URL") DEFAULT_TTL = int(get_environment_setting("CACHE_DEFAULT_TTL") or 24*60*60) +CACHE_PREFIX = "presto_media_cache:" class Cache: @staticmethod def get_client() -> redis.Redis: @@ -31,10 +32,10 @@ def get_cached_result(content_hash: str, reset_ttl: bool = True, ttl: int = DEFA """ if content_hash: client = Cache.get_client() - cached_result = client.get(content_hash) + cached_result = client.get(CACHE_PREFIX+content_hash) if cached_result is not None: if reset_ttl: - client.expire(content_hash, ttl) + client.expire(CACHE_PREFIX+content_hash, ttl) return json.loads(cached_result) return None @@ -50,4 +51,4 @@ def set_cached_result(content_hash: str, result: Any, ttl: int = DEFAULT_TTL) -> """ if content_hash: client = Cache.get_client() - client.setex(content_hash, ttl, json.dumps(result)) + client.setex(CACHE_PREFIX+content_hash, ttl, json.dumps(result)) From b18e080bf9385263d72fe2d54a7c3e6afe984b48 Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Thu, 20 Jun 2024 06:25:56 -0700 Subject: [PATCH 09/10] update test --- test/lib/test_cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/lib/test_cache.py b/test/lib/test_cache.py index b675a1b..47ff9a2 100644 --- a/test/lib/test_cache.py +++ b/test/lib/test_cache.py @@ -16,7 +16,7 @@ def test_set_cached_result(mock_redis_client): Cache.set_cached_result(content_hash, result, ttl) - mock_instance.setex.assert_called_once_with(content_hash, ttl, '{"data": "example"}') + mock_instance.setex.assert_called_once_with('presto_media_cache:'+content_hash, ttl, '{"data": "example"}') def test_get_cached_result_exists(mock_redis_client): mock_instance = mock_redis_client.from_url.return_value @@ -28,7 +28,7 @@ def test_get_cached_result_exists(mock_redis_client): result = Cache.get_cached_result(content_hash, reset_ttl=True, ttl=ttl) assert result == {"data": "example"} - mock_instance.expire.assert_called_once_with(content_hash, ttl) + mock_instance.expire.assert_called_once_with('presto_media_cache:'+content_hash, ttl) def test_get_cached_result_not_exists(mock_redis_client): mock_instance = mock_redis_client.from_url.return_value From 3bb86dba025ffd1e3f13d73abcac7f11059ee99f Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Mon, 24 Jun 2024 06:27:08 -0700 Subject: [PATCH 10/10] Add logging for cache hit/miss --- lib/cache.py | 7 ++++++- lib/model/model.py | 1 + lib/telemetry.py | 10 ++++++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/lib/cache.py b/lib/cache.py index 4e4d809..e0da115 100644 --- a/lib/cache.py +++ b/lib/cache.py @@ -2,7 +2,9 @@ import json from typing import Any, Optional from lib.helpers import get_environment_setting +from lib.telemetry import OpenTelemetryExporter +OPEN_TELEMETRY_EXPORTER = OpenTelemetryExporter(service_name="QueueWorkerService", local_debug=False) REDIS_URL = get_environment_setting("REDIS_URL") DEFAULT_TTL = int(get_environment_setting("CACHE_DEFAULT_TTL") or 24*60*60) CACHE_PREFIX = "presto_media_cache:" @@ -36,7 +38,9 @@ def get_cached_result(content_hash: str, reset_ttl: bool = True, ttl: int = DEFA if cached_result is not None: if reset_ttl: client.expire(CACHE_PREFIX+content_hash, ttl) - return json.loads(cached_result) + response = json.loads(cached_result) + OPEN_TELEMETRY_EXPORTER.log_execution_status("cache_hit_response", "cache_hit_response") + return response return None @staticmethod @@ -52,3 +56,4 @@ def set_cached_result(content_hash: str, result: Any, ttl: int = DEFAULT_TTL) -> if content_hash: client = Cache.get_client() client.setex(CACHE_PREFIX+content_hash, ttl, json.dumps(result)) + OPEN_TELEMETRY_EXPORTER.log_execution_status("cache_miss_response", "cache_hit_response") diff --git a/lib/model/model.py b/lib/model/model.py index 8472193..7a0a87d 100644 --- a/lib/model/model.py +++ b/lib/model/model.py @@ -8,6 +8,7 @@ from lib.helpers import get_class from lib import schemas from lib.cache import Cache + class Model(ABC): BATCH_SIZE = 1 def __init__(self): diff --git a/lib/telemetry.py b/lib/telemetry.py index 1460082..40916fe 100644 --- a/lib/telemetry.py +++ b/lib/telemetry.py @@ -75,6 +75,16 @@ def __init__(self, service_name: str, local_debug=False) -> None: unit="s", description="Errored Message Response" ) + self.cache_hit_response = self.meter.create_counter( + name="cache_hit_response", + unit="s", + description="Returned cached response" + ) + self.cache_miss_response = self.meter.create_counter( + name="cache_miss_response", + unit="s", + description="Returned non-cached response" + ) def log_execution_time(self, func_name: str, execution_time: float): env_name = os.getenv("DEPLOY_ENV", "development")