Skip to content

Commit

Permalink
Merge pull request #4850 from systeminit/jkeiser/lago-upload-metrics
Browse files Browse the repository at this point in the history
Billing lambda: log upload stats using INFO
  • Loading branch information
jkeiser authored Oct 22, 2024
2 parents 085a3df + 42c1019 commit 0ee941c
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
2 changes: 1 addition & 1 deletion component/lambda/functions/si_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)
logger.setLevel("INFO")
14 changes: 11 additions & 3 deletions component/lambda/functions/si_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from botocore.client import Config
import time
from si_logger import logger
import logging


class DatabaseConnectParams(TypedDict):
Expand Down Expand Up @@ -54,12 +55,15 @@ def from_env(
def __init__(
self,
session: boto3.Session,
*,
wait_interval_seconds: float = 0.25,
report_interval_seconds: float = 5,
**database_params: Unpack[DatabaseConnectParams],
):
self._session = session
self._database_params = database_params
self._wait_interval_seconds = wait_interval_seconds
self._report_interval_seconds = report_interval_seconds
self._client = self._connect()

def query(self, Sql: str, **Parameters: object):
Expand Down Expand Up @@ -94,6 +98,8 @@ def execute(self, Sql: str, **Parameters: object):
)
)

last_report = time.time()

while True:
response = self.with_client(
lambda client: client.describe_statement(Id=statement["Id"])
Expand All @@ -110,9 +116,11 @@ def execute(self, Sql: str, **Parameters: object):
case "ABORTED":
raise Exception(f"Query aborted (Id={statement['Id']})")

logger.debug(
f"Query status: {status}. Waiting {self._wait_interval_seconds}s for completion... (Id={statement['Id']})"
)
if time.time() - last_report >= self._report_interval_seconds:
last_report = time.time()
logger.log(logging.INFO,
f"Query status: {status}. Waiting {self._wait_interval_seconds}s for completion... (Id={statement['Id']})"
)

time.sleep(self._wait_interval_seconds)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def lambda_handler(lambda_event={}, _context=None):
last_hour_end = None
break

logger.warning(
logger.info(
f"Uploaded {uploaded_events} events in {batch_hours}-hour batch starting {-first_hour_start} hours ago!"
)
last_hour_end = first_hour_start if uploaded_events > 0 else None
Expand Down

0 comments on commit 0ee941c

Please sign in to comment.