Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer Not Terminating #25

Open
allcentury opened this issue Mar 19, 2015 · 4 comments
Open

Producer Not Terminating #25

allcentury opened this issue Mar 19, 2015 · 4 comments

Comments

@allcentury
Copy link

I'm having an issue when I try to write payloads in increments. The simplest setup I could come up with was this:

def run
  producer = Krakow::Producer.new(
    host: 'HOST',
    port: '4150',
    topic: 'process'
  )

  user_ids = []
  100_000.times { |i| user_ids << SecureRandom.hex(12) }

  user_ids.each_slice(50_000).with_index do |usrs, i|
    puts "Publishing #{i*50_000} - #{(i+1) * 50_000} users to NSQ"
    producer.write(*usrs)
  end

  producer.goodbye_my_love!
end

run

When I do this, the output looks something like:

I, [2015-03-19T12:18:14.505000 #46528]  INFO -- : <Krakow::Producer:2122 {HOST:PORT} T:preprocess>: Establishing connection to: HOST:PORT
D, [2015-03-19T12:18:16.269000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: Initializing connection
D, [2015-03-19T12:18:16.533000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: Read wait for frame start
D, [2015-03-19T12:18:16.562000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: <<< "\x00\x00\x01\f\x00\x00\x00\x00"
D, [2015-03-19T12:18:16.568000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: Decoded structure: {:size=>264, :type=>0}
D, [2015-03-19T12:18:16.569000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: <<< "{\"max_rdy_count\":2500,\"version\":\"0.3.2\",\"max_msg_timeout\":900000,\"msg_timeout\":60000,\"tls_v1\":false,\"deflate\":false,\"deflate_level\":0,\"max_deflate_level\":6,\"snappy\":false,\"sample_rate\":0,\"auth_required\":false,\"output_buffer_size\":16384,\"output_buffer_timeout\":250}"
D, [2015-03-19T12:18:16.571000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: Struct: {:size=>264, :type=>0, :data=>"{\"max_rdy_count\":2500,\"version\":\"0.3.2\",\"max_msg_timeout\":900000,\"msg_timeout\":60000,\"tls_v1\":false,\"deflate\":false,\"deflate_level\":0,\"max_deflate_level\":6,\"snappy\":false,\"sample_rate\":0,\"auth_required\":false,\"output_buffer_size\":16384,\"output_buffer_timeout\":250}"} Frame: <Krakow::FrameType::Response:2152 [{:response=>"{\"max_rdy_count\":2500,\"version\":\"0.3.2\",\"max_msg_timeout\":900000,\"msg_timeout\":60000,\"tls_v1\":false,\"deflate\":false,\"deflate_level\":0,\"max_deflate_level\":6,\"snappy\":false,\"sample_rate\":0,\"auth_required\":false,\"output_buffer_size\":16384,\"output_buffer_timeout\":250}"}]>
I, [2015-03-19T12:18:16.584000 #46528]  INFO -- : <Krakow::Connection:2150 {HOST:PORT}>: Connection settings: {:max_rdy_count=>2500, :version=>"0.3.2", :max_msg_timeout=>900000, :msg_timeout=>60000, :tls_v1=>false, :deflate=>false, :deflate_level=>0, :max_deflate_level=>6, :snappy=>false, :sample_rate=>0, :auth_required=>false, :output_buffer_size=>16384, :output_buffer_timeout=>250}
I, [2015-03-19T12:18:16.585000 #46528]  INFO -- : <Krakow::Connection:2150 {HOST:PORT}>: Connection initialized
D, [2015-03-19T12:18:16.590000 #46528] DEBUG -- : <Krakow::Connection:2150 {HOST:PORT}>: Read wait for frame start
I, [2015-03-19T12:18:16.601000 #46528]  INFO -- : <Krakow::Producer:2122 {HOST:PORT} T:preprocess>: Connection established: <Krakow::Connection:2150 {HOST:PORT}>
Publishing 0 - 50000 users to NSQ
D, [2015-03-19T12:18:16.740000 #46528] DEBUG -- : <Krakow::Producer:2122 {HOST:PORT} T:preprocess>: Multiple message publish

It will just hang and hang until I manually kill it from the terminal. It never reaches the loop the second time around (only 50k messages get pushed) and it never hits the goodbye_my_love! terminator. Any idea what I might be doing wrong?

I'm on JRuby 1.7.19, using 0.3.12 of the gem.

@chrisroberts
Copy link
Owner

Hi! That's the finalizer method and generally shouldn't be called directly. You want to terminate the actor directly:

producer.terminate

@chrisroberts
Copy link
Owner

Also, by default the producer will wait for a confirmation of delivery on write. If you are pushing that many messages at once, it may be that you are looking for more of a "fire and forget". You can accomplish that by disabling the wait on response:

Krakow::Producer.new(
  :host => HOST,
  ...,
  :connection_options => {
    :options => {
      :response_wait => 0
    }
  }
)

You can also try looping through and doing single writes through the producer, or reducing the number of messages going through the mpub at one time (just smaller reducing the number you slice on iteration).

@allcentury
Copy link
Author

Thanks @chrisroberts for the quick reply. Putting response_wait: 0 did work but is a little unsafe for us in a production environment. I was able to get it to work with slices of roughly 2500, which will have to do for now.

I'll dig through and see where the bottleneck is (maybe our nsqd instance?). For reference, without setting response_wait to 0 I'm able to push up over 150k messages in one write and watch it write to disk in nsq-admin but then the producer hangs indefinitely. Maybe it missed the confirmation or perhaps it never got sent...

@chrisroberts
Copy link
Owner

I added a test for large payload pushing via mpub based on your failing example. It may be an issue that you'll need to increase the response wait to allow for the nsqd to fully accept and respond confirmation of the receipt. If no response is provided (and the response_wait option is > 0) it will kick up an exception (which maybe is getting caught and suppressed some where in your use case?).

Tests are all happy on various mri and jruby versions, so if you are still seeing issues after tweaking the options, please let me know and we can see if I can locally reproduce the behavior you're encountering.

Cheers!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants