Skip to content

Commit

Permalink
Merge pull request bitrich-info#521 from TSavo/PoloniexSharedTickerOb…
Browse files Browse the repository at this point in the history
…servable

Change Poloniex to have a single subscription for Ticker. Fixes bitrich-info#124 and bitrich-info#98
  • Loading branch information
badgerwithagun authored Mar 1, 2020
2 parents c94e259 + c715ee3 commit c3d6d46
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 41 deletions.
2 changes: 0 additions & 2 deletions xchange-poloniex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
<artifactId>xchange-poloniex</artifactId>
<version>${xchange.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package info.bitrich.xchangestream.poloniex;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.MinMaxPriorityQueue;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.poloniex.utils.MinMaxPriorityQueueUtils;
Expand All @@ -22,10 +24,27 @@
import java.util.*;

public class PoloniexStreamingMarketDataService implements StreamingMarketDataService {
public static final String TICKER_CHANNEL_NAME = "ticker";
private final WampStreamingService streamingService;
private final Supplier<Observable<Ticker>> streamingTickers;

public PoloniexStreamingMarketDataService(WampStreamingService streamingService) {
this.streamingService = streamingService;
this.streamingTickers = Suppliers.memoize(() -> streamingService.subscribeChannel(TICKER_CHANNEL_NAME)
.map(pubSubData -> {
PoloniexMarketData marketData = new PoloniexMarketData();
marketData.setLast(new BigDecimal(pubSubData.arguments().get(1).asText()));
marketData.setLowestAsk(new BigDecimal(pubSubData.arguments().get(2).asText()));
marketData.setHighestBid(new BigDecimal(pubSubData.arguments().get(3).asText()));
marketData.setPercentChange(new BigDecimal(pubSubData.arguments().get(4).asText()));
marketData.setBaseVolume(new BigDecimal(pubSubData.arguments().get(5).asText()));
marketData.setQuoteVolume(new BigDecimal(pubSubData.arguments().get(6).asText()));
marketData.setHigh24hr(new BigDecimal(pubSubData.arguments().get(8).asText()));
marketData.setLow24hr(new BigDecimal(pubSubData.arguments().get(9).asText()));

PoloniexTicker ticker = new PoloniexTicker(marketData, PoloniexUtils.toCurrencyPair(pubSubData.arguments().get(0).asText()));
return PoloniexAdapters.adaptPoloniexTicker(ticker, ticker.getCurrencyPair());
}).share());
}

private Map<CurrencyPair, MinMaxPriorityQueue<LimitOrder>> orderBookBids = new HashMap<>();
Expand Down Expand Up @@ -90,22 +109,7 @@ public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... a

@Override
public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
return streamingService.subscribeChannel("ticker")
.map(pubSubData -> {
PoloniexMarketData marketData = new PoloniexMarketData();
marketData.setLast(new BigDecimal(pubSubData.arguments().get(1).asText()));
marketData.setLowestAsk(new BigDecimal(pubSubData.arguments().get(2).asText()));
marketData.setHighestBid(new BigDecimal(pubSubData.arguments().get(3).asText()));
marketData.setPercentChange(new BigDecimal(pubSubData.arguments().get(4).asText()));
marketData.setBaseVolume(new BigDecimal(pubSubData.arguments().get(5).asText()));
marketData.setQuoteVolume(new BigDecimal(pubSubData.arguments().get(6).asText()));
marketData.setHigh24hr(new BigDecimal(pubSubData.arguments().get(8).asText()));
marketData.setLow24hr(new BigDecimal(pubSubData.arguments().get(9).asText()));

PoloniexTicker ticker = new PoloniexTicker(marketData, PoloniexUtils.toCurrencyPair(pubSubData.arguments().get(0).asText()));
return PoloniexAdapters.adaptPoloniexTicker(ticker, ticker.getCurrencyPair());
})
.filter(ticker -> ticker.getCurrencyPair().equals(currencyPair));
return streamingTickers.get().filter(ticker -> ticker.getCurrencyPair().equals(currencyPair));
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions xchange-poloniex2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,9 @@
<artifactId>service-netty</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,25 @@ public PoloniexStreamingExchange() {
protected void initServices() {
applyStreamingSpecification(getExchangeSpecification(), streamingService);
super.initServices();
Map<CurrencyPair, Integer> currencyPairMap = getCurrencyPairMap();
Map<Integer, CurrencyPair> currencyPairMap = getCurrencyPairMap();
streamingMarketDataService = new PoloniexStreamingMarketDataService(streamingService, currencyPairMap);
}

private Map<CurrencyPair, Integer> getCurrencyPairMap() {
Map<CurrencyPair, Integer> currencyPairMap = new HashMap<>();
private Map<Integer, CurrencyPair> getCurrencyPairMap() {
Map<Integer, CurrencyPair> currencyPairMap = new HashMap<>();
final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

try {
URL tickerUrl = new URL(TICKER_URL);
JsonNode jsonRootTickers = mapper.readTree(tickerUrl);
Iterator<String> pairSymbols = jsonRootTickers.fieldNames();
while (pairSymbols.hasNext()) {
String pairSymbol = pairSymbols.next();
pairSymbols.forEachRemaining(pairSymbol ->{
String id = jsonRootTickers.get(pairSymbol).get("id").toString();

String[] currencies = pairSymbol.split("_");
CurrencyPair currencyPair = new CurrencyPair(new Currency(currencies[1]), new Currency(currencies[0]));
currencyPairMap.put(currencyPair, Integer.valueOf(id));
}
currencyPairMap.put(Integer.valueOf(id), currencyPair);

});
} catch (IOException e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package info.bitrich.xchangestream.poloniex2;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.poloniex2.dto.*;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
Expand All @@ -13,7 +15,10 @@
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;

import static org.knowm.xchange.poloniex.PoloniexAdapters.adaptPoloniexDepth;
import static org.knowm.xchange.poloniex.PoloniexAdapters.adaptPoloniexTicker;
Expand All @@ -23,13 +28,21 @@
*/
public class PoloniexStreamingMarketDataService implements StreamingMarketDataService {
private static final Logger LOG = LoggerFactory.getLogger(PoloniexStreamingMarketDataService.class);
private static final String TICKER_CHANNEL_ID = "1002";

private final PoloniexStreamingService service;
private final Map<CurrencyPair, Integer> currencyPairMap;
private final Supplier<Observable<Ticker>> streamingTickers;

public PoloniexStreamingMarketDataService(PoloniexStreamingService service, Map<CurrencyPair, Integer> currencyPairMap) {
public PoloniexStreamingMarketDataService(PoloniexStreamingService service, Map<Integer, CurrencyPair> currencyIdMap) {
this.service = service;
this.currencyPairMap = currencyPairMap;
final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

streamingTickers = Suppliers.memoize(() -> service.subscribeChannel(TICKER_CHANNEL_ID)
.map(s -> {
PoloniexWebSocketTickerTransaction ticker = mapper.readValue(s.toString(), PoloniexWebSocketTickerTransaction.class);
CurrencyPair currencyPair = currencyIdMap.get(ticker.getPairId());
return adaptPoloniexTicker(ticker.toPoloniexTicker(currencyPair), currencyPair);
}).share());
}

@Override
Expand Down Expand Up @@ -59,15 +72,7 @@ public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... a

@Override
public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

int currencyPairId = currencyPairMap.getOrDefault(currencyPair, 0);
Observable<PoloniexWebSocketTickerTransaction> subscribedChannel = service.subscribeChannel("1002")
.map(s -> mapper.readValue(s.toString(), PoloniexWebSocketTickerTransaction.class));

return subscribedChannel
.filter(s -> s.getPairId() == currencyPairId)
.map(s -> adaptPoloniexTicker(s.toPoloniexTicker(currencyPair), currencyPair));
return streamingTickers.get().filter(ticker -> ticker.getCurrencyPair().equals(currencyPair));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ public class PoloniexManualExample {

public static void main(String[] args) throws Exception {
CurrencyPair usdtBtc = new CurrencyPair(new Currency("BTC"), new Currency("USDT"));
// CertHelper.trustAllCerts();
CurrencyPair ltcBtc = new CurrencyPair(new Currency("LTC"), new Currency("BTC"));
//
// CertHelper.trustAllCerts();
StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(PoloniexStreamingExchange.class.getName());
ExchangeSpecification defaultExchangeSpecification = exchange.getDefaultExchangeSpecification();
// defaultExchangeSpecification.setExchangeSpecificParametersItem(StreamingExchange.SOCKS_PROXY_HOST, "localhost");
Expand All @@ -31,7 +33,6 @@ public static void main(String[] args) throws Exception {
exchange.connect().blockingAwait();



exchange.getStreamingMarketDataService().getOrderBook(usdtBtc).subscribe(orderBook -> {
LOG.info("First ask: {}", orderBook.getAsks().get(0));
LOG.info("First bid: {}", orderBook.getBids().get(0));
Expand All @@ -41,6 +42,11 @@ public static void main(String[] args) throws Exception {
LOG.info("TICKER: {}", ticker);
}, throwable -> LOG.error("ERROR in getting ticker: ", throwable));

exchange.getStreamingMarketDataService().getTicker(ltcBtc).subscribe(ticker -> {
LOG.info("TICKER: {}", ticker);
}, throwable -> LOG.error("ERROR in getting ticker: ", throwable));


exchange.getStreamingMarketDataService().getTrades(usdtBtc).subscribe(trade -> {
LOG.info("TRADE: {}", trade);
}, throwable -> LOG.error("ERROR in getting trades: ", throwable));
Expand Down

0 comments on commit c3d6d46

Please sign in to comment.