Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: BoundedQueue, and restart thread / keep alive on errors #38

Merged
merged 24 commits into from
Oct 30, 2024

Conversation

aryascripts
Copy link
Collaborator

@aryascripts aryascripts commented Oct 28, 2024

Why

OOMs for GraphQL Hive Reporter due to @queue never flushing after an error occurs in the thread.

What is happening:

  1. Thread only processes until @Queue returns something
  2. When @Queue returns an operation that has an error, the thread ends and the while loop ends with it
  3. add_operation can still be called, even though nothing is processing any operations
  4. @queue is unbounded
  5. @queue size keeps increasing, causing a memory leak

What

  • Create a bounded queue to avoid OOMs
  • Restart / keep thread alive when there are errors, so we can keep processing

@aryascripts aryascripts changed the title Aplt 606 oom on errors hive buffer fix: BoundedQueue, and restart thread / keep alive on errors Oct 28, 2024
Comment on lines 13 to 16
if size >= @bound
@logger.error("BoundedQueue is full, discarding operation")
return
end
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's maybe a problem with the way it's being done here because this is not safe from concurrency. There could be many puma threads adding to this queue, and potentially they both get the same number for size, and add to the queue. Though this may be safe because we always break the cycle whenever the size is >= bound

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can write a test that uses threads to push to the queue.

subject(:queue) { GraphQL::Hive::BoundedQueue.new(bound: 4, logger: logger) }

it "should discard items and log when full" do
  2.times do |i|
    Thread.new do
      3.times do |ii|
        queue.push("Thread #{i} operation {ii}")
      end
    end
  end


    expect(queue.size).to eq(4)
    expect(logger).to have_received(:error).with("BoundedQueue is full, discarding operation").twice
  end

Copy link
Collaborator Author

@aryascripts aryascripts Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TY! I Just added a test very similar to this one
#38 (comment)

Comment on lines 13 to 16
if size >= @bound
@logger.error("BoundedQueue is full, discarding operation")
return
end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can write a test that uses threads to push to the queue.

subject(:queue) { GraphQL::Hive::BoundedQueue.new(bound: 4, logger: logger) }

it "should discard items and log when full" do
  2.times do |i|
    Thread.new do
      3.times do |ii|
        queue.push("Thread #{i} operation {ii}")
      end
    end
  end


    expect(queue.size).to eq(4)
    expect(logger).to have_received(:error).with("BoundedQueue is full, discarding operation").twice
  end

@@ -36,6 +36,7 @@ class Hive < GraphQL::Tracing::PlatformTracing
read_operations: true,
report_schema: true,
buffer_size: 50,
bounded_queue_multiple: 5,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing that you added this so that we don't drop operations when the queue is being flushed. I don't think we need a new configuration value here, though. We want to drop the buffer array anyway in the future.

Suggested change
bounded_queue_multiple: 5,

Copy link
Collaborator Author

@aryascripts aryascripts Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is for how big is the size of the @queue in UsageReporter, and it's different from the buffer size. Maybe it needs a better name!

If we have a buffer size of 5, we would have @queue of size 25 by default. And the puma thread could add max of 25 before it starts dropping operations.

While puma is adding operations, we would be flushing the @queue continuously in the reporting thread, and making room for puma to add more. But if things get too fast (higher rps), we would start dropping since there's no more room in the queue. I've tested this locally with the k6 code.

Some services could update the bounded_queue_multiple to get a higher queue size if they're OK with the memory usage of the queue. I just thought 5 was a good number to start with, but I'm open to discussion!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand that. My comment here is that I am doubtful that we want to expose this as a configuration value. What benefit do users get by configuring the bounded queue size? Why is it a multiplier? For example.

I would recommend that we do not expose the bounded_queue size and, for now, make it a slightly larger number than the buffer size. After all, we have discussed dropping the buffer and just using the queue.

I would prefer to keep configuration values consistent so we don't have to deprecate and remove them in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what you mean, ty for clarifying! I might be giving too much control here to the consumer of this gem.

Copy link
Collaborator Author

@aryascripts aryascripts Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this now, to remove the bounded multiple config value, and just using the buffer_size. We discussed that this should never practically reach anyways since we pop after every operation. But if it is reached, then we probably have a larger problem to solve (like this OOM we're trying to fix)

Comment on lines 25 to 26
queue_bound = (options[:buffer_size] * options[:bounded_queue_multiple]).to_int
@queue = BoundedQueue.new(bound: queue_bound, logger: options[:logger])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
queue_bound = (options[:buffer_size] * options[:bounded_queue_multiple]).to_int
@queue = BoundedQueue.new(bound: queue_bound, logger: options[:logger])
queue_bound = (options[:buffer_size].to_i * 5)
@queue = BoundedQueue.new(bound: queue_bound, logger: options[:logger])

because

irb(main):003> ("5" * 5).to_i
=> 55555
irb(main):004> ("5".to_i * 5)
=> 25

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thank you!

Copy link
Collaborator Author

@aryascripts aryascripts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just leaving some comments for reviewers, and also parts that I want some feedback on.

end

def push(item)
@lock.synchronize do
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also synchronize the pop here? That would mean that we sync between the Hive Reporting Thread (pop) and the main Thread for puma (push).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would make size accurate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I thought I'd comment here where I'm at the moment. I'm having some deadlock issues with the way it is currently, because @lock.synchronize on pop makes it so that pop happens first, and the thread while(operation) instantly ends, and there are no more threads available to process.

In tests, this means that the test instantly fails since the processing thread dies. And in the server, we seem to be running into the puma main thread never being able to get the lock 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The puma main thread isn't able to get the lock because we're constantly pop'ing in our thread. We discussed that this refactor can be part of or after #33 so we don't make too many changes here.

Comment on lines 48 to 60
it "should be thsead-safe and discard items when full" do
threads = []
20.times do |i|
threads << Thread.new do
queue.push(i)
end
end

threads.each(&:join)

expect(queue.size).to eq(2)
expect(logger).to have_received(:error).with("BoundedQueue is full, discarding operation").exactly(18).times
end
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to get some opinions on this test here!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should test cases where the queue is popped after hitting the bound. I know you test that in a single thread on line 36 but we should demonstrate the lock and bound work as expected in a multi-threaded scenario too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I added a new test case for this

@aryascripts aryascripts marked this pull request as ready for review October 29, 2024 15:39
@sampler = Sampler.new(options[:collect_usage_sampling], options[:logger]) # NOTE: logs for deprecated field
@queue = BoundedQueue.new(bound: options[:buffer_size], logger: options[:logger])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know that the bounded_queue_multiple config was removed, but should we still have the bound as the buffer size * 5?

Suggested change
@queue = BoundedQueue.new(bound: options[:buffer_size], logger: options[:logger])
@queue = BoundedQueue.new(bound: options[:buffer_size].to_i * 5, logger: options[:logger])

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a discussion here about this! #38 (comment)

We don't need this to be considerably larger than the buffer size because the thread wakes up for every single operation, and calls pop.

So ideally, this won't every reach larger than size of ~1. But to be safe, we can provide a larger queue (buffer size). If we reach this limit, we have other things that are going wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense! i thought we still wanted it to be considerably larger. thanks for clearing that up!

spec/graphql/graphql-hive/usage_reporter_spec.rb Outdated Show resolved Hide resolved
end

def push(item)
@lock.synchronize do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would make size accurate.

@options[:logger].debug("processing operation from queue: #{operation}")
buffer << operation if @sampler.sample?(operation)

@options_mutex.synchronize do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps out of scope because you are fixing the OOM issue. But I don't think this mutex does anything. There isn't another thread accessing this code.

@aryascripts aryascripts requested review from rperryng and removed request for al-yanna October 30, 2024 15:31
@aryascripts aryascripts merged commit 06bfa53 into master Oct 30, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants