diff --git a/example_publisher/provider.py b/example_publisher/provider.py index 650b9a9..b28e772 100644 --- a/example_publisher/provider.py +++ b/example_publisher/provider.py @@ -3,13 +3,15 @@ from dataclasses import dataclass from typing import List, Optional -Symbol = str +Symbol = str +UnixTimestamp = int @dataclass class Price: price: float conf: float + timestamp: UnixTimestamp class Provider(ABC): diff --git a/example_publisher/providers/coin_gecko.py b/example_publisher/providers/coin_gecko.py index 18d728b..fe48add 100644 --- a/example_publisher/providers/coin_gecko.py +++ b/example_publisher/providers/coin_gecko.py @@ -1,4 +1,6 @@ import asyncio +from math import floor +import time from typing import Dict, List, Optional from pycoingecko import CoinGeckoAPI from structlog import get_logger @@ -16,7 +18,7 @@ class CoinGecko(Provider): def __init__(self, config: CoinGeckoConfig) -> None: self._api: CoinGeckoAPI = CoinGeckoAPI() - self._prices: Dict[Id, float] = {} + self._prices: Dict[Id, Price] = {} self._symbol_to_id: Dict[Symbol, Id] = { product.symbol: product.coin_gecko_id for product in config.products } @@ -45,7 +47,8 @@ def _update_prices(self) -> None: ids=list(self._prices.keys()), vs_currencies=USD, precision=18 ) for id_, prices in result.items(): - self._prices[id_] = prices[USD] + price = prices[USD] + self._prices[id_] = Price(price, price * self._config.confidence_ratio_bps / 10000, floor(time.time())) log.info("updated prices from CoinGecko", prices=self._prices) def _get_price(self, id: Id) -> float: @@ -53,10 +56,4 @@ def _get_price(self, id: Id) -> float: def latest_price(self, symbol: Symbol) -> Optional[Price]: id = self._symbol_to_id.get(symbol) - if not id: - return None - - price = self._get_price(id) - if not price: - return None - return Price(price, price * self._config.confidence_ratio_bps / 10000) + return self._get_price(id) diff --git a/example_publisher/providers/pyth_replicator.py b/example_publisher/providers/pyth_replicator.py index c4d1a61..d832d58 100644 --- a/example_publisher/providers/pyth_replicator.py +++ b/example_publisher/providers/pyth_replicator.py @@ -7,13 +7,12 @@ from structlog import get_logger -from example_publisher.provider import Price, Provider, Symbol +from example_publisher.provider import Price, Provider, Symbol, UnixTimestamp from ..config import PythReplicatorConfig log = get_logger() -UnixTimestamp = int # Any feed with >= this number of min publishers is considered "coming soon". COMING_SOON_MIN_PUB_THRESHOLD = 10 @@ -51,14 +50,14 @@ async def _update_loop(self) -> None: symbol = update.product.symbol if self._prices.get(symbol) is None: - self._prices[symbol] = [None, None, None] + self._prices[symbol] = (None, None, None) if update.aggregate_price_status == PythPriceStatus.TRADING: - self._prices[symbol] = [ + self._prices[symbol] = ( update.aggregate_price, update.aggregate_price_confidence_interval, update.timestamp, - ] + ) elif ( self._config.manual_agg_enabled and update.min_publishers >= COMING_SOON_MIN_PUB_THRESHOLD @@ -93,11 +92,11 @@ async def _update_loop(self) -> None: if prices: agg_price, agg_confidence_interval = manual_aggregate(prices) - self._prices[symbol] = [ + self._prices[symbol] = ( agg_price, agg_confidence_interval, update.timestamp, - ] + ) log.info( "Received a price update", symbol=symbol, price=self._prices[symbol] @@ -132,7 +131,7 @@ def latest_price(self, symbol: Symbol) -> Optional[Price]: if time.time() - timestamp > self._config.staleness_time_in_secs: return None - return Price(price, conf) + return Price(price, conf, timestamp) def manual_aggregate(prices: List[float]) -> Tuple[float, float]: diff --git a/example_publisher/publisher.py b/example_publisher/publisher.py index 8ef0fbe..23c1107 100644 --- a/example_publisher/publisher.py +++ b/example_publisher/publisher.py @@ -45,6 +45,7 @@ def __init__(self, config: Config) -> None: ) self.subscriptions: Dict[SubscriptionId, Product] = {} self.products: List[Product] = [] + self.last_successful_update: Optional[float] = None async def start(self): await self.pythd.connect() @@ -61,6 +62,7 @@ async def _start_product_update_loop(self): await self._upd_products() await self._subscribe_notify_price_sched() await asyncio.sleep(self.config.product_update_interval_secs) + print(self.last_successful_update) async def _upd_products(self): log.debug("fetching product accounts from Pythd") @@ -141,6 +143,7 @@ async def on_notify_price_sched(self, subscription: int) -> None: await self.pythd.update_price( product.price_account, scaled_price, scaled_conf, TRADING ) + self.last_successful_update = price.timestamp def apply_exponent(self, x: float, exp: int) -> int: return int(x * (10 ** (-exp)))