From e367b1ea33d756ee95e2f68265b774d8a294f9c9 Mon Sep 17 00:00:00 2001 From: Daniel Schierbeck Date: Wed, 26 Aug 2020 15:05:36 +0200 Subject: [PATCH] Refactor a bit --- spec/integration/consumer_spec.rb | 78 +++++++++++++++---------------- 1 file changed, 38 insertions(+), 40 deletions(-) diff --git a/spec/integration/consumer_spec.rb b/spec/integration/consumer_spec.rb index 81641f58..3815d643 100644 --- a/spec/integration/consumer_spec.rb +++ b/spec/integration/consumer_spec.rb @@ -2,43 +2,43 @@ require "racecar/cli" require "racecar/ctl" +def generate_token + SecureRandom.hex(8) +end + class EchoConsumer < Racecar::Consumer subscribes_to "input" - self.group_id = "test-consumer-#{SecureRandom.hex(8)}" + self.group_id = "echo-consumer-#{generate_token}" def process(message) produce message.value, key: message.key, topic: "output" end end -module IntegrationSupport - INCOMING_MESSAGES = [] - - CONSUMER = Thread.new do - consumer = Rdkafka::Config.new({ - "bootstrap.servers": Racecar.config.brokers.join(","), - "client.id": Racecar.config.client_id, - "group.id": "racecar-tests", - }.merge(Racecar.config.rdkafka_consumer)).consumer +class ResultsConsumer < Racecar::Consumer + subscribes_to "output" - consumer.subscribe("output") + self.group_id = "results-consumer-#{generate_token}" - consumer.each do |message| - puts "Received message #{message}" - INCOMING_MESSAGES << message - end - end + MESSAGES = [] - def self.incoming_messages - INCOMING_MESSAGES + def process(message) + puts "Got result #{message.inspect}" + MESSAGES << message end end RSpec.context "Integrating with a real Kafka cluster" do + before :all do + Thread.new do + Racecar::Cli.main(["ResultsConsumer"]) + end + end + describe "Single-consumer groups" do it "can consume and produce messages" do - token = SecureRandom.hex(8) + token = generate_token worker = Thread.new do Racecar::Cli.main(["EchoConsumer"]) @@ -48,26 +48,18 @@ def self.incoming_messages produce -t input -v #{token} -k greetings ) - message = nil - attempt = 1 - - while message.nil? && attempt <= 10 - puts "Waiting for message..." - sleep 2 ** attempt - message = IntegrationSupport.incoming_messages.last - attempt += 1 - end + message = wait_for_token(token) expect(message).not_to be_nil expect(message.topic).to eq "output" - expect(message.payload).to eq token + expect(message.value).to eq token expect(message.key).to eq "greetings" end end describe "Multi-consumer groups" do it "can consume and produce messages" do - token = SecureRandom.hex(8) + token = generate_token worker1 = Thread.new do Racecar::Cli.main(["EchoConsumer"]) @@ -85,20 +77,26 @@ def self.incoming_messages produce -t input -v #{token} -k greetings ) - message = nil - attempt = 1 - - while message.nil? && attempt <= 10 - puts "Waiting for message..." - sleep 2 ** attempt - message = IntegrationSupport.incoming_messages.last - attempt += 1 - end + message = wait_for_token(token) expect(message).not_to be_nil expect(message.topic).to eq "output" - expect(message.payload).to eq token + expect(message.value).to eq token expect(message.key).to eq "greetings" end end + + def wait_for_token(token) + message = nil + attempt = 1 + + while message.nil? && attempt <= 10 + puts "Waiting for token #{token}..." + sleep 2 ** attempt + message = ResultsConsumer::MESSAGES.find {|m| m.value == token } + attempt += 1 + end + + message + end end