Skip to content

Commit

Permalink
add timestamp to prices and keep last updated at in publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
Keyvan committed Mar 12, 2024
1 parent 665350d commit 1f7b7ca
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 18 deletions.
4 changes: 3 additions & 1 deletion example_publisher/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
from dataclasses import dataclass
from typing import List, Optional

Symbol = str

Symbol = str
UnixTimestamp = int

@dataclass
class Price:
price: float
conf: float
timestamp: UnixTimestamp


class Provider(ABC):
Expand Down
15 changes: 6 additions & 9 deletions example_publisher/providers/coin_gecko.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from math import floor
import time
from typing import Dict, List, Optional
from pycoingecko import CoinGeckoAPI
from structlog import get_logger
Expand All @@ -16,7 +18,7 @@
class CoinGecko(Provider):
def __init__(self, config: CoinGeckoConfig) -> None:
self._api: CoinGeckoAPI = CoinGeckoAPI()
self._prices: Dict[Id, float] = {}
self._prices: Dict[Id, Price] = {}
self._symbol_to_id: Dict[Symbol, Id] = {
product.symbol: product.coin_gecko_id for product in config.products
}
Expand Down Expand Up @@ -45,18 +47,13 @@ def _update_prices(self) -> None:
ids=list(self._prices.keys()), vs_currencies=USD, precision=18
)
for id_, prices in result.items():
self._prices[id_] = prices[USD]
price = prices[USD]
self._prices[id_] = Price(price, price * self._config.confidence_ratio_bps / 10000, floor(time.time()))
log.info("updated prices from CoinGecko", prices=self._prices)

def _get_price(self, id: Id) -> float:
return self._prices.get(id, None)

def latest_price(self, symbol: Symbol) -> Optional[Price]:
id = self._symbol_to_id.get(symbol)
if not id:
return None

price = self._get_price(id)
if not price:
return None
return Price(price, price * self._config.confidence_ratio_bps / 10000)
return self._get_price(id)
15 changes: 7 additions & 8 deletions example_publisher/providers/pyth_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@

from structlog import get_logger

from example_publisher.provider import Price, Provider, Symbol
from example_publisher.provider import Price, Provider, Symbol, UnixTimestamp

from ..config import PythReplicatorConfig

log = get_logger()

UnixTimestamp = int

# Any feed with >= this number of min publishers is considered "coming soon".
COMING_SOON_MIN_PUB_THRESHOLD = 10
Expand Down Expand Up @@ -51,14 +50,14 @@ async def _update_loop(self) -> None:
symbol = update.product.symbol

if self._prices.get(symbol) is None:
self._prices[symbol] = [None, None, None]
self._prices[symbol] = (None, None, None)

if update.aggregate_price_status == PythPriceStatus.TRADING:
self._prices[symbol] = [
self._prices[symbol] = (
update.aggregate_price,
update.aggregate_price_confidence_interval,
update.timestamp,
]
)
elif (
self._config.manual_agg_enabled
and update.min_publishers >= COMING_SOON_MIN_PUB_THRESHOLD
Expand Down Expand Up @@ -93,11 +92,11 @@ async def _update_loop(self) -> None:
if prices:
agg_price, agg_confidence_interval = manual_aggregate(prices)

self._prices[symbol] = [
self._prices[symbol] = (
agg_price,
agg_confidence_interval,
update.timestamp,
]
)

log.info(
"Received a price update", symbol=symbol, price=self._prices[symbol]
Expand Down Expand Up @@ -132,7 +131,7 @@ def latest_price(self, symbol: Symbol) -> Optional[Price]:
if time.time() - timestamp > self._config.staleness_time_in_secs:
return None

return Price(price, conf)
return Price(price, conf, timestamp)


def manual_aggregate(prices: List[float]) -> Tuple[float, float]:
Expand Down
3 changes: 3 additions & 0 deletions example_publisher/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, config: Config) -> None:
)
self.subscriptions: Dict[SubscriptionId, Product] = {}
self.products: List[Product] = []
self.last_successful_update: Optional[float] = None

async def start(self):
await self.pythd.connect()
Expand All @@ -61,6 +62,7 @@ async def _start_product_update_loop(self):
await self._upd_products()
await self._subscribe_notify_price_sched()
await asyncio.sleep(self.config.product_update_interval_secs)
print(self.last_successful_update)

async def _upd_products(self):
log.debug("fetching product accounts from Pythd")
Expand Down Expand Up @@ -141,6 +143,7 @@ async def on_notify_price_sched(self, subscription: int) -> None:
await self.pythd.update_price(
product.price_account, scaled_price, scaled_conf, TRADING
)
self.last_successful_update = price.timestamp

def apply_exponent(self, x: float, exp: int) -> int:
return int(x * (10 ** (-exp)))

0 comments on commit 1f7b7ca

Please sign in to comment.