diff --git a/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/ConsistentHashExchange.java b/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/ConsistentHashExchange.java index d979b659..d7398fa1 100644 --- a/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/ConsistentHashExchange.java +++ b/src/main/java/com/github/fridujo/rabbitmq/mock/exchange/ConsistentHashExchange.java @@ -55,7 +55,7 @@ private int routingKeyToWeight(String routingKey) { try { return Integer.parseInt(routingKey); } catch (NumberFormatException e) { - return routingKey.hashCode(); + throw new IllegalArgumentException("The binding key must be an integer"); } } diff --git a/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ConsistentHashExchangeTests.java b/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ConsistentHashExchangeTests.java index eec29ab3..99a0668f 100644 --- a/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ConsistentHashExchangeTests.java +++ b/src/test/java/com/github/fridujo/rabbitmq/mock/exchange/ConsistentHashExchangeTests.java @@ -1,24 +1,33 @@ package com.github.fridujo.rabbitmq.mock.exchange; -import static com.github.fridujo.rabbitmq.mock.AmqArguments.empty; -import static com.github.fridujo.rabbitmq.mock.exchange.MockExchangeCreator.creatorWithExchangeType; -import static java.util.Collections.emptyMap; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.within; -import static org.mockito.Mockito.mock; +import com.github.fridujo.rabbitmq.mock.ReceiverPointer; +import com.github.fridujo.rabbitmq.mock.ReceiverRegistry; +import com.github.fridujo.rabbitmq.mock.configuration.Configuration; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; -import org.junit.jupiter.api.Test; - -import com.github.fridujo.rabbitmq.mock.ReceiverPointer; -import com.github.fridujo.rabbitmq.mock.ReceiverRegistry; -import com.github.fridujo.rabbitmq.mock.configuration.Configuration; +import static com.github.fridujo.rabbitmq.mock.AmqArguments.empty; +import static com.github.fridujo.rabbitmq.mock.exchange.MockExchangeCreator.creatorWithExchangeType; +import static java.util.Collections.emptyMap; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForClassTypes.within; +import static org.mockito.Mockito.mock; class ConsistentHashExchangeTests { @@ -47,30 +56,68 @@ void same_routing_key_dispatch_to_same_queue() { assertThat(consistentHashEx.selectReceiver(firstRoutingKey, null)).contains(firstReceiverPointerSelected); } - @Test - void dispatch_respects_queue_weight() { + @ParameterizedTest + @MethodSource("buildWeightArguments") + void dispatch_respects_queue_weight(List weights, List distributions) { SingleReceiverExchange consistentHashEx = (SingleReceiverExchange) mockExchangeFactory.build("test", "x-consistent-hash", empty(), mock(ReceiverRegistry.class)); - ReceiverPointer q1 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q1"); - consistentHashEx.bind(q1, "32", emptyMap()); - ReceiverPointer q2 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q2"); - consistentHashEx.bind(q2, "64", emptyMap()); - ReceiverPointer q3 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q3"); - consistentHashEx.bind(q3, " ", emptyMap()); - ReceiverPointer q4 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q4"); - consistentHashEx.bind(q4, "AA", emptyMap()); - consistentHashEx.unbind(q4, "AA"); - - int messagesCount = 1_000_000; - Map deliveriesByReceiver = IntStream.range(0, messagesCount) + List receiverPointers = new ArrayList<>(); + for(int i = 0; i < weights.size(); i++) { + ReceiverPointer receiverPointer = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q" + i); + receiverPointers.add(receiverPointer); + consistentHashEx.bind(receiverPointer, weights.get(i).toString(), emptyMap()); + } + + final int minMessagesCount = 1_000; + final int maxMessagesCount = 1_000_000; + final int messageCount = ThreadLocalRandom.current().nextInt(minMessagesCount, maxMessagesCount); + + Map deliveriesByReceiver = IntStream.range(0, messageCount) .mapToObj(i -> consistentHashEx.selectReceiver(UUID.randomUUID().toString(), null)) .map(Optional::get) .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); - assertThat(deliveriesByReceiver).containsOnlyKeys(q1, q2, q3); + assertThat(deliveriesByReceiver).containsOnlyKeys(receiverPointers); + + for(int i = 0; i < receiverPointers.size(); i++) { + assertThat(Long.valueOf(deliveriesByReceiver.get(receiverPointers.get(i))).doubleValue() / messageCount).isCloseTo(distributions.get(i), within(0.02)); + } + } + + private static Stream buildWeightArguments() { + final int maxArguments = 25; + final int maxReceivers = 10; + final int maxWeight = 100; + + Function> getWeightsForReceiver = r -> IntStream.range(0, r) + .mapToObj(i -> ThreadLocalRandom.current().nextInt(1, maxWeight)) + .collect(toList()); + + Function, List> calculateDistribution = w -> { + Integer sum = w.stream().mapToInt(Integer::intValue).sum(); + return w.stream() + .map(x -> x.doubleValue() / sum) + .collect(toList()); + }; + + Function, Arguments> getDispatchDistributionForReceiver = w -> Arguments.of(w, calculateDistribution.apply(w)); + + return IntStream.range(1, maxArguments) + .mapToObj(i -> ThreadLocalRandom.current().nextInt(1, maxReceivers)) + .map(getWeightsForReceiver) + .map(getDispatchDistributionForReceiver); + } + + + @ParameterizedTest(name = "Binding Consistent hash exchange with binding key \"{0}\" throws IllegalArgumentException") + @ValueSource(strings = {"", "string", "#"}) + void binding_with_non_integer_key_throws_exception(String bindingKey) { + SingleReceiverExchange consistentHashEx = (SingleReceiverExchange) mockExchangeFactory.build("test", "x-consistent-hash", empty(), mock(ReceiverRegistry.class)); + + ReceiverPointer q1 = new ReceiverPointer(ReceiverPointer.Type.QUEUE, "Q1"); - assertThat(Long.valueOf(deliveriesByReceiver.get(q1)).doubleValue() / messagesCount).isCloseTo(0.25D, within(0.01)); - assertThat(Long.valueOf(deliveriesByReceiver.get(q2)).doubleValue() / messagesCount).isCloseTo(0.5D, within(0.01)); - assertThat(Long.valueOf(deliveriesByReceiver.get(q3)).doubleValue() / messagesCount).isCloseTo(0.25D, within(0.01)); + assertThatThrownBy(() -> consistentHashEx.bind(q1, bindingKey, emptyMap())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("The binding key must be an integer"); } }