Skip to content

Commit

Permalink
Issue fridujo#127 - it is illegal to bind to a consistent hash exchan…
Browse files Browse the repository at this point in the history
…ge with a non integer binding key
  • Loading branch information
RedMu committed Jul 21, 2020
1 parent 8930b81 commit c543dcf
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private int routingKeyToWeight(String routingKey) {
try {
return Integer.parseInt(routingKey);
} catch (NumberFormatException e) {
return routingKey.hashCode();
throw new IllegalArgumentException("The binding key must be an integer");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,33 @@
package com.github.fridujo.rabbitmq.mock.exchange;

import static com.github.fridujo.rabbitmq.mock.AmqArguments.empty;
import static com.github.fridujo.rabbitmq.mock.exchange.MockExchangeCreator.creatorWithExchangeType;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
import static org.mockito.Mockito.mock;
import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
import com.github.fridujo.rabbitmq.mock.ReceiverRegistry;
import com.github.fridujo.rabbitmq.mock.configuration.Configuration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.junit.jupiter.api.Test;

import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
import com.github.fridujo.rabbitmq.mock.ReceiverRegistry;
import com.github.fridujo.rabbitmq.mock.configuration.Configuration;
import static com.github.fridujo.rabbitmq.mock.AmqArguments.empty;
import static com.github.fridujo.rabbitmq.mock.exchange.MockExchangeCreator.creatorWithExchangeType;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForClassTypes.within;
import static org.mockito.Mockito.mock;

class ConsistentHashExchangeTests {

Expand Down Expand Up @@ -47,30 +56,68 @@ void same_routing_key_dispatch_to_same_queue() {
assertThat(consistentHashEx.selectReceiver(firstRoutingKey, null)).contains(firstReceiverPointerSelected);
}

@Test
void dispatch_respects_queue_weight() {
@ParameterizedTest
@MethodSource("buildWeightArguments")
void dispatch_respects_queue_weight(List<Integer> weights, List<Double> distributions) {
SingleReceiverExchange consistentHashEx = (SingleReceiverExchange) mockExchangeFactory.build("test", "x-consistent-hash", empty(), mock(ReceiverRegistry.class));

ReceiverPointer q1 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q1");
consistentHashEx.bind(q1, "32", emptyMap());
ReceiverPointer q2 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q2");
consistentHashEx.bind(q2, "64", emptyMap());
ReceiverPointer q3 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q3");
consistentHashEx.bind(q3, " ", emptyMap());
ReceiverPointer q4 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q4");
consistentHashEx.bind(q4, "AA", emptyMap());
consistentHashEx.unbind(q4, "AA");

int messagesCount = 1_000_000;
Map<ReceiverPointer, Long> deliveriesByReceiver = IntStream.range(0, messagesCount)
List<ReceiverPointer> receiverPointers = new ArrayList<>();
for(int i = 0; i < weights.size(); i++) {
ReceiverPointer receiverPointer = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q" + i);
receiverPointers.add(receiverPointer);
consistentHashEx.bind(receiverPointer, weights.get(i).toString(), emptyMap());
}

final int minMessagesCount = 1_000;
final int maxMessagesCount = 1_000_000;
final int messageCount = ThreadLocalRandom.current().nextInt(minMessagesCount, maxMessagesCount);

Map<ReceiverPointer, Long> deliveriesByReceiver = IntStream.range(0, messageCount)
.mapToObj(i -> consistentHashEx.selectReceiver(UUID.randomUUID().toString(), null))
.map(Optional::get)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

assertThat(deliveriesByReceiver).containsOnlyKeys(q1, q2, q3);
assertThat(deliveriesByReceiver).containsOnlyKeys(receiverPointers);

for(int i = 0; i < receiverPointers.size(); i++) {
assertThat(Long.valueOf(deliveriesByReceiver.get(receiverPointers.get(i))).doubleValue() / messageCount).isCloseTo(distributions.get(i), within(0.02));
}
}

private static Stream<Arguments> buildWeightArguments() {
final int maxArguments = 25;
final int maxReceivers = 10;
final int maxWeight = 100;

Function<Integer, List<Integer>> getWeightsForReceiver = r -> IntStream.range(0, r)
.mapToObj(i -> ThreadLocalRandom.current().nextInt(1, maxWeight))
.collect(toList());

Function<List<Integer>, List<Double>> calculateDistribution = w -> {
Integer sum = w.stream().mapToInt(Integer::intValue).sum();
return w.stream()
.map(x -> x.doubleValue() / sum)
.collect(toList());
};

Function<List<Integer>, Arguments> getDispatchDistributionForReceiver = w -> Arguments.of(w, calculateDistribution.apply(w));

return IntStream.range(1, maxArguments)
.mapToObj(i -> ThreadLocalRandom.current().nextInt(1, maxReceivers))
.map(getWeightsForReceiver)
.map(getDispatchDistributionForReceiver);
}


@ParameterizedTest(name = "Binding Consistent hash exchange with binding key \"{0}\" throws IllegalArgumentException")
@ValueSource(strings = {"", "string", "#"})
void binding_with_non_integer_key_throws_exception(String bindingKey) {
SingleReceiverExchange consistentHashEx = (SingleReceiverExchange) mockExchangeFactory.build("test", "x-consistent-hash", empty(), mock(ReceiverRegistry.class));

ReceiverPointer q1 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q1");

assertThat(Long.valueOf(deliveriesByReceiver.get(q1)).doubleValue() / messagesCount).isCloseTo(0.25D, within(0.01));
assertThat(Long.valueOf(deliveriesByReceiver.get(q2)).doubleValue() / messagesCount).isCloseTo(0.5D, within(0.01));
assertThat(Long.valueOf(deliveriesByReceiver.get(q3)).doubleValue() / messagesCount).isCloseTo(0.25D, within(0.01));
assertThatThrownBy(() -> consistentHashEx.bind(q1, bindingKey, emptyMap()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("The binding key must be an integer");
}
}

0 comments on commit c543dcf

Please sign in to comment.