From 67a4b902c08252d220d5779d016e1fe353e55d28 Mon Sep 17 00:00:00 2001 From: T Savo Date: Thu, 27 Feb 2020 11:57:30 -0800 Subject: [PATCH] Change Poloniex to have a single subscription for Ticker which is then shared by all subscribers. Make it memoized so it gets lazy loaded on first request. [Gemini] Fix socket excessive close/reconnect 11:58:07.702[WARN ][opGroup-15-1][i.b.x.g.GeminiProductStreamingService] Resubscribing channels 11:58:08.703[INFO ][opGroup-16-1][i.b.x.s.n.WebSocketClientHandler] WebSocket Client connected! [id: 0x4fc02e39, L:/192.168.0.100:49178 - R:api.gemini.com/18.211.184.12:443] 11:58:08.703[WARN ][opGroup-16-1][i.b.x.g.GeminiProductStreamingService] Resubscribing channels 11:58:08.817[INFO ][opGroup-17-1][i.b.x.s.n.WebSocketClientHandler] WebSocket Client connected! [id: 0x41b1b382, L:/192.168.0.100:49185 - R:api.gemini.com/18.211.184.12:443] 11:58:08.818[WARN ][opGroup-17-1][i.b.x.g.GeminiProductStreamingService] Resubscribing channels [Kraken] use correct field for userRef dto.refId -> dto.userrRef https://api.kraken.com/0/private/OpenOrders Change the initialization of the stream to lazy by memoizing the result of creation/sharing. Make the result memoized so it gets lazy loaded on first request. Fixed the imports Revert this to a null change and squash Revert this to a null change and squash Bringing in changes from development and reformatting. --- xchange-poloniex/pom.xml | 2 -- .../PoloniexStreamingMarketDataService.java | 36 ++++++++++--------- xchange-poloniex2/pom.xml | 4 +++ .../poloniex2/PoloniexStreamingExchange.java | 15 ++++---- .../PoloniexStreamingMarketDataService.java | 31 +++++++++------- .../poloniex2/PoloniexManualExample.java | 10 ++++-- 6 files changed, 57 insertions(+), 41 deletions(-) diff --git a/xchange-poloniex/pom.xml b/xchange-poloniex/pom.xml index 0b4e95202..0f230920d 100644 --- a/xchange-poloniex/pom.xml +++ b/xchange-poloniex/pom.xml @@ -25,11 +25,9 @@ xchange-poloniex ${xchange.version} - com.google.guava guava - 28.2-jre \ No newline at end of file diff --git a/xchange-poloniex/src/main/java/info/bitrich/xchangestream/poloniex/PoloniexStreamingMarketDataService.java b/xchange-poloniex/src/main/java/info/bitrich/xchangestream/poloniex/PoloniexStreamingMarketDataService.java index 194f9a589..80c0a17c8 100644 --- a/xchange-poloniex/src/main/java/info/bitrich/xchangestream/poloniex/PoloniexStreamingMarketDataService.java +++ b/xchange-poloniex/src/main/java/info/bitrich/xchangestream/poloniex/PoloniexStreamingMarketDataService.java @@ -1,6 +1,8 @@ package info.bitrich.xchangestream.poloniex; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.MinMaxPriorityQueue; import info.bitrich.xchangestream.core.StreamingMarketDataService; import info.bitrich.xchangestream.poloniex.utils.MinMaxPriorityQueueUtils; @@ -22,10 +24,27 @@ import java.util.*; public class PoloniexStreamingMarketDataService implements StreamingMarketDataService { + public static final String TICKER_CHANNEL_NAME = "ticker"; private final WampStreamingService streamingService; + private final Supplier> streamingTickers; public PoloniexStreamingMarketDataService(WampStreamingService streamingService) { this.streamingService = streamingService; + this.streamingTickers = Suppliers.memoize(() -> streamingService.subscribeChannel(TICKER_CHANNEL_NAME) + .map(pubSubData -> { + PoloniexMarketData marketData = new PoloniexMarketData(); + marketData.setLast(new BigDecimal(pubSubData.arguments().get(1).asText())); + marketData.setLowestAsk(new BigDecimal(pubSubData.arguments().get(2).asText())); + marketData.setHighestBid(new BigDecimal(pubSubData.arguments().get(3).asText())); + marketData.setPercentChange(new BigDecimal(pubSubData.arguments().get(4).asText())); + marketData.setBaseVolume(new BigDecimal(pubSubData.arguments().get(5).asText())); + marketData.setQuoteVolume(new BigDecimal(pubSubData.arguments().get(6).asText())); + marketData.setHigh24hr(new BigDecimal(pubSubData.arguments().get(8).asText())); + marketData.setLow24hr(new BigDecimal(pubSubData.arguments().get(9).asText())); + + PoloniexTicker ticker = new PoloniexTicker(marketData, PoloniexUtils.toCurrencyPair(pubSubData.arguments().get(0).asText())); + return PoloniexAdapters.adaptPoloniexTicker(ticker, ticker.getCurrencyPair()); + }).share()); } private Map> orderBookBids = new HashMap<>(); @@ -90,22 +109,7 @@ public Observable getOrderBook(CurrencyPair currencyPair, Object... a @Override public Observable getTicker(CurrencyPair currencyPair, Object... args) { - return streamingService.subscribeChannel("ticker") - .map(pubSubData -> { - PoloniexMarketData marketData = new PoloniexMarketData(); - marketData.setLast(new BigDecimal(pubSubData.arguments().get(1).asText())); - marketData.setLowestAsk(new BigDecimal(pubSubData.arguments().get(2).asText())); - marketData.setHighestBid(new BigDecimal(pubSubData.arguments().get(3).asText())); - marketData.setPercentChange(new BigDecimal(pubSubData.arguments().get(4).asText())); - marketData.setBaseVolume(new BigDecimal(pubSubData.arguments().get(5).asText())); - marketData.setQuoteVolume(new BigDecimal(pubSubData.arguments().get(6).asText())); - marketData.setHigh24hr(new BigDecimal(pubSubData.arguments().get(8).asText())); - marketData.setLow24hr(new BigDecimal(pubSubData.arguments().get(9).asText())); - - PoloniexTicker ticker = new PoloniexTicker(marketData, PoloniexUtils.toCurrencyPair(pubSubData.arguments().get(0).asText())); - return PoloniexAdapters.adaptPoloniexTicker(ticker, ticker.getCurrencyPair()); - }) - .filter(ticker -> ticker.getCurrencyPair().equals(currencyPair)); + return streamingTickers.get().filter(ticker -> ticker.getCurrencyPair().equals(currencyPair)); } @Override diff --git a/xchange-poloniex2/pom.xml b/xchange-poloniex2/pom.xml index 8da494618..d2382cb06 100644 --- a/xchange-poloniex2/pom.xml +++ b/xchange-poloniex2/pom.xml @@ -25,5 +25,9 @@ service-netty ${project.parent.version} + + com.google.guava + guava + \ No newline at end of file diff --git a/xchange-poloniex2/src/main/java/info/bitrich/xchangestream/poloniex2/PoloniexStreamingExchange.java b/xchange-poloniex2/src/main/java/info/bitrich/xchangestream/poloniex2/PoloniexStreamingExchange.java index 9113e97ee..e45e5c560 100644 --- a/xchange-poloniex2/src/main/java/info/bitrich/xchangestream/poloniex2/PoloniexStreamingExchange.java +++ b/xchange-poloniex2/src/main/java/info/bitrich/xchangestream/poloniex2/PoloniexStreamingExchange.java @@ -37,26 +37,25 @@ public PoloniexStreamingExchange() { protected void initServices() { applyStreamingSpecification(getExchangeSpecification(), streamingService); super.initServices(); - Map currencyPairMap = getCurrencyPairMap(); + Map currencyPairMap = getCurrencyPairMap(); streamingMarketDataService = new PoloniexStreamingMarketDataService(streamingService, currencyPairMap); } - private Map getCurrencyPairMap() { - Map currencyPairMap = new HashMap<>(); + private Map getCurrencyPairMap() { + Map currencyPairMap = new HashMap<>(); final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); try { URL tickerUrl = new URL(TICKER_URL); JsonNode jsonRootTickers = mapper.readTree(tickerUrl); Iterator pairSymbols = jsonRootTickers.fieldNames(); - while (pairSymbols.hasNext()) { - String pairSymbol = pairSymbols.next(); + pairSymbols.forEachRemaining(pairSymbol ->{ String id = jsonRootTickers.get(pairSymbol).get("id").toString(); - String[] currencies = pairSymbol.split("_"); CurrencyPair currencyPair = new CurrencyPair(new Currency(currencies[1]), new Currency(currencies[0])); - currencyPairMap.put(currencyPair, Integer.valueOf(id)); - } + currencyPairMap.put(Integer.valueOf(id), currencyPair); + + }); } catch (IOException e) { e.printStackTrace(); } diff --git a/xchange-poloniex2/src/main/java/info/bitrich/xchangestream/poloniex2/PoloniexStreamingMarketDataService.java b/xchange-poloniex2/src/main/java/info/bitrich/xchangestream/poloniex2/PoloniexStreamingMarketDataService.java index a8386b95d..343d436a4 100644 --- a/xchange-poloniex2/src/main/java/info/bitrich/xchangestream/poloniex2/PoloniexStreamingMarketDataService.java +++ b/xchange-poloniex2/src/main/java/info/bitrich/xchangestream/poloniex2/PoloniexStreamingMarketDataService.java @@ -1,6 +1,8 @@ package info.bitrich.xchangestream.poloniex2; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import info.bitrich.xchangestream.core.StreamingMarketDataService; import info.bitrich.xchangestream.poloniex2.dto.*; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; @@ -13,7 +15,10 @@ import org.slf4j.LoggerFactory; import java.math.BigDecimal; -import java.util.*; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.SortedMap; import static org.knowm.xchange.poloniex.PoloniexAdapters.adaptPoloniexDepth; import static org.knowm.xchange.poloniex.PoloniexAdapters.adaptPoloniexTicker; @@ -23,13 +28,21 @@ */ public class PoloniexStreamingMarketDataService implements StreamingMarketDataService { private static final Logger LOG = LoggerFactory.getLogger(PoloniexStreamingMarketDataService.class); + private static final String TICKER_CHANNEL_ID = "1002"; private final PoloniexStreamingService service; - private final Map currencyPairMap; + private final Supplier> streamingTickers; - public PoloniexStreamingMarketDataService(PoloniexStreamingService service, Map currencyPairMap) { + public PoloniexStreamingMarketDataService(PoloniexStreamingService service, Map currencyIdMap) { this.service = service; - this.currencyPairMap = currencyPairMap; + final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); + + streamingTickers = Suppliers.memoize(() -> service.subscribeChannel(TICKER_CHANNEL_ID) + .map(s -> { + PoloniexWebSocketTickerTransaction ticker = mapper.readValue(s.toString(), PoloniexWebSocketTickerTransaction.class); + CurrencyPair currencyPair = currencyIdMap.get(ticker.getPairId()); + return adaptPoloniexTicker(ticker.toPoloniexTicker(currencyPair), currencyPair); + }).share()); } @Override @@ -59,15 +72,7 @@ public Observable getOrderBook(CurrencyPair currencyPair, Object... a @Override public Observable getTicker(CurrencyPair currencyPair, Object... args) { - final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); - - int currencyPairId = currencyPairMap.getOrDefault(currencyPair, 0); - Observable subscribedChannel = service.subscribeChannel("1002") - .map(s -> mapper.readValue(s.toString(), PoloniexWebSocketTickerTransaction.class)); - - return subscribedChannel - .filter(s -> s.getPairId() == currencyPairId) - .map(s -> adaptPoloniexTicker(s.toPoloniexTicker(currencyPair), currencyPair)); + return streamingTickers.get().filter(ticker -> ticker.getCurrencyPair().equals(currencyPair)); } @Override diff --git a/xchange-poloniex2/src/test/java/info/bitrich/xchangestream/poloniex2/PoloniexManualExample.java b/xchange-poloniex2/src/test/java/info/bitrich/xchangestream/poloniex2/PoloniexManualExample.java index 8816682a7..1eae518fe 100644 --- a/xchange-poloniex2/src/test/java/info/bitrich/xchangestream/poloniex2/PoloniexManualExample.java +++ b/xchange-poloniex2/src/test/java/info/bitrich/xchangestream/poloniex2/PoloniexManualExample.java @@ -13,7 +13,9 @@ public class PoloniexManualExample { public static void main(String[] args) throws Exception { CurrencyPair usdtBtc = new CurrencyPair(new Currency("BTC"), new Currency("USDT")); -// CertHelper.trustAllCerts(); + CurrencyPair ltcBtc = new CurrencyPair(new Currency("LTC"), new Currency("BTC")); +// + // CertHelper.trustAllCerts(); StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(PoloniexStreamingExchange.class.getName()); ExchangeSpecification defaultExchangeSpecification = exchange.getDefaultExchangeSpecification(); // defaultExchangeSpecification.setExchangeSpecificParametersItem(StreamingExchange.SOCKS_PROXY_HOST, "localhost"); @@ -31,7 +33,6 @@ public static void main(String[] args) throws Exception { exchange.connect().blockingAwait(); - exchange.getStreamingMarketDataService().getOrderBook(usdtBtc).subscribe(orderBook -> { LOG.info("First ask: {}", orderBook.getAsks().get(0)); LOG.info("First bid: {}", orderBook.getBids().get(0)); @@ -41,6 +42,11 @@ public static void main(String[] args) throws Exception { LOG.info("TICKER: {}", ticker); }, throwable -> LOG.error("ERROR in getting ticker: ", throwable)); + exchange.getStreamingMarketDataService().getTicker(ltcBtc).subscribe(ticker -> { + LOG.info("TICKER: {}", ticker); + }, throwable -> LOG.error("ERROR in getting ticker: ", throwable)); + + exchange.getStreamingMarketDataService().getTrades(usdtBtc).subscribe(trade -> { LOG.info("TRADE: {}", trade); }, throwable -> LOG.error("ERROR in getting trades: ", throwable));