Skip to content

Commit

Permalink
Merge pull request bitrich-info#526 from lepus/develop
Browse files Browse the repository at this point in the history
[BINANCE] Add specific param: use higher update frequency
  • Loading branch information
badgerwithagun authored Mar 1, 2020
2 parents c3d6d46 + 9988509 commit e749233
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
package info.bitrich.xchangestream.binance;

import com.google.common.base.MoreObjects;
import info.bitrich.xchangestream.binance.BinanceUserDataChannel.NoActiveChannelException;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.util.Events;

import io.reactivex.Completable;
import io.reactivex.Observable;

import si.mazi.rescu.RestProxyFactory;

import org.knowm.xchange.binance.BinanceAuthenticated;
import org.knowm.xchange.binance.BinanceExchange;
import org.knowm.xchange.binance.service.BinanceMarketDataService;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.service.BaseExchangeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import si.mazi.rescu.RestProxyFactory;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -27,6 +25,8 @@ public class BinanceStreamingExchange extends BinanceExchange implements Streami

private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingExchange.class);
private static final String API_BASE_URI = "wss://stream.binance.com:9443/";
protected final static String USE_HIGHER_UPDATE_FREQUENCY = "Binance_Orderbook_Use_Higher_Frequency";


private BinanceStreamingService streamingService;
private BinanceUserDataStreamingService userDataStreamingService;
Expand All @@ -37,11 +37,19 @@ public class BinanceStreamingExchange extends BinanceExchange implements Streami

private BinanceUserDataChannel userDataChannel;
private Runnable onApiCall;
private String orderBookUpdateFrequencyParameter = "";

@Override
protected void initServices() {
super.initServices();
this.onApiCall = Events.onApiCall(exchangeSpecification);
Boolean userHigherFrequency = MoreObjects.firstNonNull(
(Boolean) exchangeSpecification.getExchangeSpecificParametersItem(USE_HIGHER_UPDATE_FREQUENCY),
Boolean.FALSE);

if (userHigherFrequency) {
orderBookUpdateFrequencyParameter = "@100ms";
}
}

/**
Expand Down Expand Up @@ -88,7 +96,8 @@ public Completable connect(ProductSubscription... args) {
}
}

streamingMarketDataService = new BinanceStreamingMarketDataService(streamingService, (BinanceMarketDataService) marketDataService, onApiCall);
streamingMarketDataService = new BinanceStreamingMarketDataService(streamingService,
(BinanceMarketDataService) marketDataService, onApiCall, orderBookUpdateFrequencyParameter);
streamingAccountService = new BinanceStreamingAccountService(userDataStreamingService);
streamingTradeService = new BinanceStreamingTradeService(userDataStreamingService);

Expand Down Expand Up @@ -165,16 +174,23 @@ private BinanceStreamingService createStreamingService(ProductSubscription subsc
return new BinanceStreamingService(path, subscription);
}

public static String buildSubscriptionStreams(ProductSubscription subscription) {
public String buildSubscriptionStreams(ProductSubscription subscription) {
return Stream.of(buildSubscriptionStrings(subscription.getTicker(), "ticker"),
buildSubscriptionStrings(subscription.getOrderBook(), "depth"),
buildSubscriptionStrings(subscription.getTrades(), "trade"))
.filter(s -> !s.isEmpty())
.collect(Collectors.joining("/"));
}

private static String buildSubscriptionStrings(List<CurrencyPair> currencyPairs, String subscriptionType) {
return subscriptionStrings(currencyPairs).map( s -> s + "@" + subscriptionType).collect(Collectors.joining("/"));
private String buildSubscriptionStrings(List<CurrencyPair> currencyPairs, String subscriptionType) {
if ("depth".equals(subscriptionType)) {
return subscriptionStrings(currencyPairs)
.map(s -> s + "@" + subscriptionType + orderBookUpdateFrequencyParameter)
.collect(Collectors.joining("/"));
} else {
return subscriptionStrings(currencyPairs).map(s -> s + "@" + subscriptionType)
.collect(Collectors.joining("/"));
}
}

private static Stream<String> subscriptionStrings(List<CurrencyPair> currencyPairs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;

import info.bitrich.xchangestream.binance.dto.BinanceRawTrade;
import info.bitrich.xchangestream.binance.dto.BinanceWebsocketTransaction;
import info.bitrich.xchangestream.binance.dto.DepthBinanceWebSocketTransaction;
Expand All @@ -14,10 +13,8 @@
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;

import io.reactivex.Observable;
import io.reactivex.functions.Consumer;

import org.knowm.xchange.binance.BinanceAdapters;
import org.knowm.xchange.binance.BinanceErrorAdapter;
import org.knowm.xchange.binance.dto.BinanceException;
Expand Down Expand Up @@ -59,6 +56,7 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer
.constructType(new TypeReference<BinanceWebsocketTransaction<DepthBinanceWebSocketTransaction>>() {});

private final BinanceStreamingService service;
private final String orderBookUpdateFrequencyParameter;

private final Map<CurrencyPair, OrderbookSubscription> orderbooks = new HashMap<>();
private final Map<CurrencyPair, Observable<BinanceTicker24h>> tickerSubscriptions = new HashMap<>();
Expand All @@ -72,8 +70,10 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer
private final AtomicBoolean fallenBack = new AtomicBoolean();
private final AtomicReference<Runnable> fallbackOnApiCall = new AtomicReference<>(() -> {});

public BinanceStreamingMarketDataService(BinanceStreamingService service, BinanceMarketDataService marketDataService, Runnable onApiCall) {
public BinanceStreamingMarketDataService(BinanceStreamingService service, BinanceMarketDataService marketDataService, Runnable onApiCall,
final String orderBookUpdateFrequencyParameter) {
this.service = service;
this.orderBookUpdateFrequencyParameter = orderBookUpdateFrequencyParameter;
this.marketDataService = marketDataService;
this.onApiCall = onApiCall;
}
Expand Down Expand Up @@ -119,9 +119,15 @@ public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
));
}

private static String channelFromCurrency(CurrencyPair currencyPair, String subscriptionType) {
private String channelFromCurrency(CurrencyPair currencyPair, String subscriptionType) {
String currency = String.join("", currencyPair.toString().split("/")).toLowerCase();
return currency + "@" + subscriptionType;
String currencyChannel = currency + "@" + subscriptionType;

if ("depth".equals(subscriptionType)) {
return currencyChannel + orderBookUpdateFrequencyParameter;
} else {
return currencyChannel;
}
}

/**
Expand Down Expand Up @@ -210,6 +216,7 @@ private OrderbookSubscription connectOrderBook(CurrencyPair currencyPair) {

// 1. Open a stream to wss://stream.binance.com:9443/ws/bnbbtc@depth
// 2. Buffer the events you receive from the stream.

subscription.stream = service.subscribeChannel(channelFromCurrency(currencyPair, "depth"))
.map(this::depthTransaction)
.filter(transaction -> transaction.getData().getCurrencyPair().equals(currencyPair));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,41 @@
package info.bitrich.xchangestream.binance;

import info.bitrich.xchangestream.core.ProductSubscription;

import info.bitrich.xchangestream.core.StreamingExchangeFactory;
import org.junit.Assert;
import org.junit.Test;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.currency.CurrencyPair;

import static info.bitrich.xchangestream.binance.BinanceStreamingExchange.USE_HIGHER_UPDATE_FREQUENCY;

public class BinanceTest {

@Test
public void channelCreateUrlTest() {
BinanceStreamingExchange exchange = (BinanceStreamingExchange) StreamingExchangeFactory.INSTANCE
.createExchange(BinanceStreamingExchange.class.getName());
ProductSubscription.ProductSubscriptionBuilder builder = ProductSubscription.create();
builder.addTicker(CurrencyPair.BTC_USD).addTicker(CurrencyPair.DASH_BTC);
String buildSubscriptionStreams = BinanceStreamingExchange.buildSubscriptionStreams(builder.build());
String buildSubscriptionStreams = exchange.buildSubscriptionStreams(builder.build());
Assert.assertEquals("btcusd@ticker/dashbtc@ticker", buildSubscriptionStreams);

ProductSubscription.ProductSubscriptionBuilder builder2 = ProductSubscription.create();
builder2.addTicker(CurrencyPair.BTC_USD).addTicker(CurrencyPair.DASH_BTC).addOrderbook(CurrencyPair.ETH_BTC);
String buildSubscriptionStreams2 = BinanceStreamingExchange.buildSubscriptionStreams(builder2.build());
String buildSubscriptionStreams2 = exchange.buildSubscriptionStreams(builder2.build());
Assert.assertEquals("btcusd@ticker/dashbtc@ticker/ethbtc@depth", buildSubscriptionStreams2);
}


@Test
public void channelCreateUrlWithUpdateFrequencyTest() {
ProductSubscription.ProductSubscriptionBuilder builder = ProductSubscription.create();
builder.addTicker(CurrencyPair.BTC_USD).addTicker(CurrencyPair.DASH_BTC).addOrderbook(CurrencyPair.ETH_BTC);
ExchangeSpecification spec = StreamingExchangeFactory.INSTANCE.createExchange(
BinanceStreamingExchange.class.getName()).getDefaultExchangeSpecification();
spec.setExchangeSpecificParametersItem(USE_HIGHER_UPDATE_FREQUENCY, true);
BinanceStreamingExchange exchange = (BinanceStreamingExchange) StreamingExchangeFactory.INSTANCE
.createExchange(spec);
String buildSubscriptionStreams = exchange.buildSubscriptionStreams(builder.build());
Assert.assertEquals("btcusd@ticker/dashbtc@ticker/ethbtc@depth@100ms", buildSubscriptionStreams);
}
}

0 comments on commit e749233

Please sign in to comment.