Skip to content

Commit

Permalink
feat: cache get_latest_market_data
Browse files Browse the repository at this point in the history
  • Loading branch information
fkondej committed Oct 11, 2023
1 parent 459623a commit b366055
Showing 1 changed file with 47 additions and 9 deletions.
56 changes: 47 additions & 9 deletions vega_sim/service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations

import threading
import copy
import datetime
import logging
Expand Down Expand Up @@ -165,6 +165,15 @@ def __init__(

self.governance_symbol = governance_symbol

self._latest_market_data = {}
self._latest_market_data_lock = threading.Lock()
self._latest_market_data_thread = threading.Thread(
target=self._latest_market_data_refresh,
#args=(),
daemon=True,
)
self._latest_market_data_thread.start()

@property
def market_price_decimals(self) -> int:
if self._market_price_decimals is None:
Expand Down Expand Up @@ -1300,21 +1309,50 @@ def market_accounts(
data_client=self.trading_data_client_v2,
)

def _latest_market_data_refresh(self):
step_length_seconds = 2
while True:
try:
t_start = time.time()
market_id_list = []
with self._latest_market_data_lock:
market_id_list = list(self._latest_market_data.keys())
for market_id in market_id_list:
self._latest_market_data[market_id] = data.get_latest_market_data(
market_id=market_id,
data_client=self.trading_data_client_v2,
market_price_decimals_map=self.market_price_decimals,
market_position_decimals_map=self.market_pos_decimals,
market_to_asset_map=self.market_to_asset,
asset_decimals_map=self.asset_decimals,
)
t_elapsed = time.time() - t_start
if t_elapsed <= step_length_seconds:
logger.debug(f"refreshed vega.latest_market_data for {len(market_id_list)} markets: {round(t_elapsed,2)}s")
time.sleep(step_length_seconds - t_elapsed)
else:
logger.warning(f"SLOW: refreshed vega.latest_market_data for {len(market_id_list)} markets: {round(t_elapsed,2)}s (limit: {step_length_seconds}s)")
except Exception as e:
logger.error(f"Failed to refresh vega.latest_market_data {e}")

def get_latest_market_data(
self,
market_id: str,
) -> vega_protos.vega.MarketData:
"""
Output market info.
"""
return data.get_latest_market_data(
market_id=market_id,
data_client=self.trading_data_client_v2,
market_price_decimals_map=self.market_price_decimals,
market_position_decimals_map=self.market_pos_decimals,
market_to_asset_map=self.market_to_asset,
asset_decimals_map=self.asset_decimals,
)
latest_market_data = self._latest_market_data.get(market_id, "==empty==")
if latest_market_data == "==empty==":
with self._latest_market_data_lock:
latest_market_data = self._latest_market_data.get(market_id, "==empty==")
if latest_market_data == "==empty==":
self._latest_market_data[market_id] = None
latest_market_data = None
while latest_market_data is None:
time.sleep(0.33)
latest_market_data = self._latest_market_data[market_id]
return latest_market_data

def market_account(
self,
Expand Down

0 comments on commit b366055

Please sign in to comment.