Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infer interruption handler from a job's queue adapter #367

Merged
merged 1 commit into from
Aug 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### Main (unreleased)

Nil
- [367](https://github.com/Shopify/job-iteration/pull/367) - Iteration can use multiple Active Job backends simultaneously by inferring the interruption adapter from the job's `queue_adapter_name`. `JobIteration.interruption_adapter` and `.load_integrations` have been removed. `JobIteration::Integrations.register` has been added.

## v1.4.0 (Aug 23, 2023)

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ end
```

Iteration hooks into Sidekiq and Resque out of the box to support graceful interruption. No extra configuration is required.
Adapters for other Active Job backends can be registered with `JobIteration::Integrations.register("my_queue_adapter_name", object)`, where `object` must implement the `call` method returning `true` if the job must be interrupted and `false` otherwise.

## Guides

Expand Down Expand Up @@ -183,7 +184,7 @@ There a few configuration assumptions that are required for Iteration to work wi

**Why can't I just iterate in `#perform` method and do whatever I want?** You can, but then your job has to comply with a long list of requirements, such as the ones above. This creates leaky abstractions more easily, when instead we can expose a more powerful abstraction for developers--without exposing the underlying infrastructure.

**What happens when my job is interrupted?** A checkpoint will be persisted to Redis after the current `each_iteration`, and the job will be re-enqueued. Once it's popped off the queue, the worker will work off from the next iteration.
**What happens when my job is interrupted?** A checkpoint will be persisted after the current `each_iteration`, and the job will be re-enqueued. Once it's popped off the queue, the worker will work off from the next iteration.

**What happens with retries?** An interruption of a job does not count as a retry. The iteration of job that caused the job to fail will be retried and progress will continue from there on.

Expand Down
35 changes: 1 addition & 34 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

require "active_job"
require_relative "./job-iteration/version"
require_relative "./job-iteration/integrations"
require_relative "./job-iteration/enumerator_builder"
require_relative "./job-iteration/iteration"
require_relative "./job-iteration/log_subscriber"

module JobIteration
IntegrationLoadError = Class.new(StandardError)

INTEGRATIONS = [:resque, :sidekiq]

extend self

attr_accessor :logger
Expand Down Expand Up @@ -45,11 +42,6 @@ module JobIteration
# where the throttle backoff value will take precedence over this setting.
attr_accessor :default_retry_backoff

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
attr_accessor :interruption_adapter

self.interruption_adapter = -> { false }

# Set if you want to use your own enumerator builder instead of default EnumeratorBuilder.
# @example
#
Expand All @@ -61,29 +53,4 @@ module JobIteration
attr_accessor :enumerator_builder

self.enumerator_builder = JobIteration::EnumeratorBuilder

def load_integrations
loaded = nil
INTEGRATIONS.each do |integration|
load_integration(integration)
if loaded
raise IntegrationLoadError,
"#{loaded} integration has already been loaded, but #{integration} is also available. " \
"Iteration will only work with one integration."
end
loaded = integration
rescue LoadError
end
end

def load_integration(integration)
unless INTEGRATIONS.include?(integration)
raise IntegrationLoadError,
"#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}"
end

require_relative "./job-iteration/integrations/#{integration}"
end
end

JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"]
bdewater marked this conversation as resolved.
Show resolved Hide resolved
37 changes: 37 additions & 0 deletions lib/job-iteration/integrations.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

module JobIteration
# @api private
module Integrations
LoadError = Class.new(StandardError)

extend self

attr_accessor :registered_integrations

self.registered_integrations = {}

autoload :Sidekiq, "job-iteration/integrations/sidekiq"
autoload :Resque, "job-iteration/integrations/resque"

# @api public
def register(name, callable)
raise ArgumentError, "Interruption adapter must respond to #call" unless callable.respond_to?(:call)

registered_integrations[name] = callable
end

def load(name)
if (callable = registered_integrations[name])
callable
else
begin
klass = "#{self}::#{name.camelize}".constantize
register(name, klass)
rescue NameError
raise LoadError, "Could not find integration for '#{name}'"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should register a dummy handler, not raise.
Otherwise, this results in an exception for any queue adapter that isn't recognized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was by design, see the discussion above regarding a Fallback adapter I originally included but then removed.

The problem with the old behaviour is that (depending on the queue adapter) it can lead to lost jobs if the process cannot re-enqueue jobs and shutdown gracefully, since it would never interrupt the job. This is no longer silently allowed to happen. If you need this to work as it used to, you can register your own interruption adapter with: JobIteration::Integrations.register("queue_adapter_name", -> { false })

It is a breaking change that should be called out explicitly in the changelog, I opened for that: #425

Copy link

@lavoiesl lavoiesl Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this breaks the users (our team uses qless (yes, I know, it's unmaintained, we're moving away from it))

How about this:

  • Add a graceful fallback, preserving the current behaviour, but with a deprecated notice
  • Release a patch version on the current minor version
  • Plan to remove the graceful fallback in the next major version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR will be part of a new major release to my understanding, because there are other breaking changes: if you had written your own interruption adapter for active_qless you can't set it via JobIteration.interruption_adapter anymore. Makes sense to break related things together :)

Is there a reason you can't register your own adapter like in my previous message if you need to be on latest main? Alternatively, cut a 1-0-stable branch starting from 7e6c385 and cherry-pick non-breaking changes from main and cut a new point release if for some reason you absolutely have to use Iteration from Rubygems?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's pretty much what I ended up doing, yes, but in general, I think it'd be good to keep a non-breaking 1.x.

Merging to main without already having a plan to make a new major release means that the releaser will have a tougher challenge on their hands, having to disentangle which changes are breaking and which aren't.

Perhaps we're already at a point where we should be starting a 1.x branch?

I assume that after we release 2.x, we'll likely want to backport some fixes to 1.x

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that after we release 2.x, we'll likely want to backport some fixes to 1.x

Because the breaking change is fairly minimal (primarily replacing JobIteration.interruption_adapter with JobIteration::Integrations.register), I'm not sure it's worth the effort to maintain a separate 1.x branch and backport fixes to there. I was planning to release 2.0 pretty soon, without cutting another minor version in-between.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright then 👍🏼

end
end
end
end
end
30 changes: 18 additions & 12 deletions lib/job-iteration/integrations/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,27 @@

module JobIteration
module Integrations
module ResqueIterationExtension # @private
def initialize(*) # @private
$resque_worker = self
super
module Resque
module IterationExtension
def initialize(*)
$resque_worker = self
super
end
end
end

# @private
module ::Resque
class Worker
# The patch is required in order to call shutdown? on a Resque::Worker instance
prepend(ResqueIterationExtension)
# @private
module ::Resque
class Worker
# The patch is required in order to call shutdown? on a Resque::Worker instance
prepend(IterationExtension)
end
Mangara marked this conversation as resolved.
Show resolved Hide resolved
end
end

JobIteration.interruption_adapter = -> { $resque_worker.try!(:shutdown?) }
class << self
def call
$resque_worker.try!(:shutdown?)
end
end
end
end
end
18 changes: 10 additions & 8 deletions lib/job-iteration/integrations/sidekiq.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# frozen_string_literal: true

require "sidekiq"

module JobIteration
module Integrations # @private
JobIteration.interruption_adapter = -> do
if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance
Sidekiq::CLI.instance.launcher.stopping?
else
false
module Integrations
module Sidekiq
class << self
def call
if defined?(::Sidekiq::CLI) && (instance = ::Sidekiq::CLI.instance)
instance.launcher.stopping?
else
false
end
end
end
end
end
Expand Down
6 changes: 5 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def retry_job(*, **)

private

def interruption_adapter
@interruption_adapter ||= JobIteration::Integrations.load(self.class.queue_adapter_name)
end

def enumerator_builder
JobIteration.enumerator_builder.new(self)
end
Expand Down Expand Up @@ -295,7 +299,7 @@ def job_should_exit?
return true
end

JobIteration.interruption_adapter.call || (defined?(super) && super)
interruption_adapter.call || (defined?(super) && super)
end

def handle_completed(completed)
Expand Down
4 changes: 2 additions & 2 deletions lib/job-iteration/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def call
# MyJob.perform_now
# end
def iterate_exact_times(n_times)
JobIteration.stubs(:interruption_adapter).returns(StoppingSupervisor.new(n_times.size))
JobIteration::Integrations.stubs(:load).returns(StoppingSupervisor.new(n_times.size))
end

# Stubs interruption adapter to interrupt the job after every sing iteration.
Expand All @@ -47,7 +47,7 @@ def mark_job_worker_as_interrupted
def stub_shutdown_adapter_to_return(value)
adapter = mock
adapter.stubs(:call).returns(value)
JobIteration.stubs(:interruption_adapter).returns(adapter)
JobIteration::Integrations.stubs(:load).returns(adapter)
end
end
end
77 changes: 37 additions & 40 deletions test/integration/integrations_test.rb
Original file line number Diff line number Diff line change
@@ -1,55 +1,52 @@
# frozen_string_literal: true

require "test_helper"
require "open3"

class IntegrationsTest < ActiveSupport::TestCase
test "will prevent loading two integrations" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
require 'job-iteration'
RUBBY
_stdout, stderr, status = run_ruby(rubby)

assert_equal false, status.success?
assert_match(/resque integration has already been loaded, but sidekiq is also available/, stderr)

class IntegrationsTest < IterationUnitTest
class IterationJob < ActiveJob::Base
include JobIteration::Iteration

def build_enumerator(cursor:)
enumerator_builder.build_once_enumerator(cursor: cursor)
end
end

test "successfully loads one (resque) integration" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
# Remove sidekiq, only resque will be left
$LOAD_PATH.delete_if { |p| p =~ /sidekiq/ }
require 'job-iteration'
RUBBY
_stdout, _stderr, status = run_ruby(rubby)

assert_equal true, status.success?
def each_iteration(*)
end
end

private
class ResqueJob < IterationJob
self.queue_adapter = :resque
end

class SidekiqJob < IterationJob
self.queue_adapter = :sidekiq
end

test "loads multiple integrations" do
resque_job = ResqueJob.new.serialize
ActiveJob::Base.execute(resque_job)

sidekiq_job = SidekiqJob.new.serialize
ActiveJob::Base.execute(sidekiq_job)
end

test ".register accepts an object does implementing #call" do
JobIteration::Integrations.register(:registration_test, -> { true })

def run_ruby(body)
stdout, stderr, status = nil
Tempfile.open do |f|
f.write(body)
f.close
assert(JobIteration::Integrations.registered_integrations[:registration_test].call)
end

command = "ruby #{f.path}"
stdout, stderr, status = Open3.capture3(command)
test ".register raises when the callable object does not implement #call" do
error = assert_raises(ArgumentError) do
JobIteration::Integrations.register("foo", "bar")
end
[stdout, stderr, status]
assert_equal("Interruption adapter must respond to #call", error.message)
end

def with_env(variable, value)
original = ENV[variable]
ENV[variable] = value
yield
ensure
ENV[variable] = original
test "raises for unknown Active Job queue adapter names" do
error = assert_raises(JobIteration::Integrations::LoadError) do
JobIteration::Integrations.load("unknown")
end
assert_equal("Could not find integration for 'unknown'", error.message)
bdewater marked this conversation as resolved.
Show resolved Hide resolved
end
end
1 change: 0 additions & 1 deletion test/support/sidekiq/init.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# frozen_string_literal: true

require "job-iteration"
require "job-iteration/integrations/sidekiq"

require "active_job"
require "i18n"
Expand Down
3 changes: 1 addition & 2 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "minitest/autorun"

ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true"

require "job-iteration"
require "job-iteration/test_helper"

Expand Down Expand Up @@ -40,6 +38,7 @@ def enqueue_at(job, timestamp)
end

ActiveJob::Base.queue_adapter = :iteration_test
JobIteration::Integrations.register("iteration_test", -> { false })

class Product < ActiveRecord::Base
has_many :comments
Expand Down
10 changes: 0 additions & 10 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -724,16 +724,6 @@ def test_on_shutdown_called_before_reenqueue
assert_jobs_in_queue(1)
end

def test_mark_job_worker_as_interrupted
mark_job_worker_as_interrupted

assert_equal(true, JobIteration.interruption_adapter.call)

continue_iterating

assert_equal(false, JobIteration.interruption_adapter.call)
end

def test_reenqueue_self
iterate_exact_times(2.times)

Expand Down