diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f6e75f2..59aef23f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [310](https://github.com/Shopify/job-iteration/pull/310) - Support nested iteration - [338](https://github.com/Shopify/job-iteration/pull/338) - All logs are now `ActiveSupport::Notifications` events and logged using `ActiveSupport::LogSubscriber` to allow customization. Events now always include the `cursor_position` tag. - [341](https://github.com/Shopify/job-iteration/pull/341) - Add `JobIteration.default_retry_backoff`, which sets a default delay when jobs are re-enqueued after being interrupted. Defaults to `nil`, meaning no delay, which matches the current behaviour. +- [367](https://github.com/Shopify/job-iteration/pull/367) - Interruption adapter is now inferred from the job's `queue_adapter_name` instead of `JobIteration.interruption_adapter` (which has been removed). This allows using Iteration with multiple job backends simultaneously. ## v1.3.6 (Mar 9, 2022) diff --git a/README.md b/README.md index 18cb49da..7c0321a9 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,7 @@ end ``` Iteration hooks into Sidekiq and Resque out of the box to support graceful interruption. No extra configuration is required. +Adapters for other Active Job backends can be registered with `JobIteration::Integrations.register("my_queue_adapter_name", object)`, where `object` must implement the `call` method returning `true` if the job must be interrupted and `false` otherwise. ## Guides @@ -183,7 +184,7 @@ There a few configuration assumptions that are required for Iteration to work wi **Why can't I just iterate in `#perform` method and do whatever I want?** You can, but then your job has to comply with a long list of requirements, such as the ones above. This creates leaky abstractions more easily, when instead we can expose a more powerful abstraction for developers--without exposing the underlying infrastructure. -**What happens when my job is interrupted?** A checkpoint will be persisted to Redis after the current `each_iteration`, and the job will be re-enqueued. Once it's popped off the queue, the worker will work off from the next iteration. +**What happens when my job is interrupted?** A checkpoint will be persisted after the current `each_iteration`, and the job will be re-enqueued. Once it's popped off the queue, the worker will work off from the next iteration. **What happens with retries?** An interruption of a job does not count as a retry. The iteration of job that caused the job to fail will be retried and progress will continue from there on. diff --git a/lib/job-iteration.rb b/lib/job-iteration.rb index 42fd6a10..270296b8 100644 --- a/lib/job-iteration.rb +++ b/lib/job-iteration.rb @@ -2,15 +2,12 @@ require "active_job" require_relative "./job-iteration/version" +require_relative "./job-iteration/integrations" require_relative "./job-iteration/enumerator_builder" require_relative "./job-iteration/iteration" require_relative "./job-iteration/log_subscriber" module JobIteration - IntegrationLoadError = Class.new(StandardError) - - INTEGRATIONS = [:resque, :sidekiq] - extend self attr_accessor :logger @@ -45,11 +42,6 @@ module JobIteration # where the throttle backoff value will take precedence over this setting. attr_accessor :default_retry_backoff - # Used internally for hooking into job processing frameworks like Sidekiq and Resque. - attr_accessor :interruption_adapter - - self.interruption_adapter = -> { false } - # Set if you want to use your own enumerator builder instead of default EnumeratorBuilder. # @example # @@ -61,29 +53,4 @@ module JobIteration attr_accessor :enumerator_builder self.enumerator_builder = JobIteration::EnumeratorBuilder - - def load_integrations - loaded = nil - INTEGRATIONS.each do |integration| - load_integration(integration) - if loaded - raise IntegrationLoadError, - "#{loaded} integration has already been loaded, but #{integration} is also available. " \ - "Iteration will only work with one integration." - end - loaded = integration - rescue LoadError - end - end - - def load_integration(integration) - unless INTEGRATIONS.include?(integration) - raise IntegrationLoadError, - "#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}" - end - - require_relative "./job-iteration/integrations/#{integration}" - end end - -JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"] diff --git a/lib/job-iteration/integrations.rb b/lib/job-iteration/integrations.rb new file mode 100644 index 00000000..6e3f00df --- /dev/null +++ b/lib/job-iteration/integrations.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +module JobIteration + # @api private + module Integrations + LoadError = Class.new(StandardError) + + extend self + + attr_accessor :registered_integrations + + self.registered_integrations = {} + + autoload :Sidekiq, "job-iteration/integrations/sidekiq" + autoload :Resque, "job-iteration/integrations/resque" + + # @api public + def register(name, callable) + raise ArgumentError, "Interruption adapter must respond to #call" unless callable.respond_to?(:call) + + registered_integrations[name] = callable + end + + def load(name) + if (callable = registered_integrations[name]) + callable + else + begin + klass = "#{self}::#{name.camelize}".constantize + register(name, klass) + rescue NameError + raise LoadError, "Could not find integration for '#{name}'" + end + end + end + end +end diff --git a/lib/job-iteration/integrations/resque.rb b/lib/job-iteration/integrations/resque.rb index 23bf41f6..efb363dd 100644 --- a/lib/job-iteration/integrations/resque.rb +++ b/lib/job-iteration/integrations/resque.rb @@ -4,21 +4,27 @@ module JobIteration module Integrations - module ResqueIterationExtension # @private - def initialize(*) # @private - $resque_worker = self - super + module Resque + module IterationExtension + def initialize(*) + $resque_worker = self + super + end end - end - # @private - module ::Resque - class Worker - # The patch is required in order to call shutdown? on a Resque::Worker instance - prepend(ResqueIterationExtension) + # @private + module ::Resque + class Worker + # The patch is required in order to call shutdown? on a Resque::Worker instance + prepend(IterationExtension) + end end - end - JobIteration.interruption_adapter = -> { $resque_worker.try!(:shutdown?) } + class << self + def call + $resque_worker.try!(:shutdown?) + end + end + end end end diff --git a/lib/job-iteration/integrations/sidekiq.rb b/lib/job-iteration/integrations/sidekiq.rb index d5ff2d3c..598fc901 100644 --- a/lib/job-iteration/integrations/sidekiq.rb +++ b/lib/job-iteration/integrations/sidekiq.rb @@ -1,14 +1,16 @@ # frozen_string_literal: true -require "sidekiq" - module JobIteration - module Integrations # @private - JobIteration.interruption_adapter = -> do - if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance - Sidekiq::CLI.instance.launcher.stopping? - else - false + module Integrations + module Sidekiq + class << self + def call + if defined?(::Sidekiq::CLI) && (instance = ::Sidekiq::CLI.instance) + instance.launcher.stopping? + else + false + end + end end end end diff --git a/lib/job-iteration/iteration.rb b/lib/job-iteration/iteration.rb index acccdc6b..6fc6d907 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job-iteration/iteration.rb @@ -134,6 +134,10 @@ def retry_job(*, **) private + def interruption_adapter + @interruption_adapter ||= JobIteration::Integrations.load(self.class.queue_adapter_name) + end + def enumerator_builder JobIteration.enumerator_builder.new(self) end @@ -293,7 +297,7 @@ def job_should_exit? return true end - JobIteration.interruption_adapter.call || (defined?(super) && super) + interruption_adapter.call || (defined?(super) && super) end def handle_completed(completed) diff --git a/lib/job-iteration/test_helper.rb b/lib/job-iteration/test_helper.rb index 951f29b2..d6c6ab0b 100644 --- a/lib/job-iteration/test_helper.rb +++ b/lib/job-iteration/test_helper.rb @@ -23,7 +23,7 @@ def call # MyJob.perform_now # end def iterate_exact_times(n_times) - JobIteration.stubs(:interruption_adapter).returns(StoppingSupervisor.new(n_times.size)) + JobIteration::Integrations.stubs(:load).returns(StoppingSupervisor.new(n_times.size)) end # Stubs interruption adapter to interrupt the job after every sing iteration. @@ -47,7 +47,7 @@ def mark_job_worker_as_interrupted def stub_shutdown_adapter_to_return(value) adapter = mock adapter.stubs(:call).returns(value) - JobIteration.stubs(:interruption_adapter).returns(adapter) + JobIteration::Integrations.stubs(:load).returns(adapter) end end end diff --git a/test/integration/integrations_test.rb b/test/integration/integrations_test.rb index 0b248389..94f7eb2e 100644 --- a/test/integration/integrations_test.rb +++ b/test/integration/integrations_test.rb @@ -1,55 +1,46 @@ # frozen_string_literal: true require "test_helper" -require "open3" - -class IntegrationsTest < ActiveSupport::TestCase - test "will prevent loading two integrations" do - with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do - rubby = <<~RUBBY - require 'bundler/setup' - require 'job-iteration' - RUBBY - _stdout, stderr, status = run_ruby(rubby) - - assert_equal false, status.success? - assert_match(/resque integration has already been loaded, but sidekiq is also available/, stderr) + +class IntegrationsTest < IterationUnitTest + class IterationJob < ActiveJob::Base + include JobIteration::Iteration + + def build_enumerator(cursor:) + enumerator_builder.build_once_enumerator(cursor: cursor) end - end - test "successfully loads one (resque) integration" do - with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do - rubby = <<~RUBBY - require 'bundler/setup' - # Remove sidekiq, only resque will be left - $LOAD_PATH.delete_if { |p| p =~ /sidekiq/ } - require 'job-iteration' - RUBBY - _stdout, _stderr, status = run_ruby(rubby) - - assert_equal true, status.success? + def each_iteration(*) end end - private + class ResqueJob < IterationJob + self.queue_adapter = :resque + end - def run_ruby(body) - stdout, stderr, status = nil - Tempfile.open do |f| - f.write(body) - f.close + class SidekiqJob < IterationJob + self.queue_adapter = :sidekiq + end - command = "ruby #{f.path}" - stdout, stderr, status = Open3.capture3(command) + test "loads multiple integrations" do + resque_job = ResqueJob.new.serialize + ActiveJob::Base.execute(resque_job) + + sidekiq_job = SidekiqJob.new.serialize + ActiveJob::Base.execute(sidekiq_job) + end + + test ".register raises when the callable object does not implement #call" do + error = assert_raises(ArgumentError) do + JobIteration::Integrations.register("foo", "bar") end - [stdout, stderr, status] + assert_equal("Interruption adapter must respond to #call", error.message) end - def with_env(variable, value) - original = ENV[variable] - ENV[variable] = value - yield - ensure - ENV[variable] = original + test "raises for unknown Active Job queue adapter names" do + error = assert_raises(JobIteration::Integrations::LoadError) do + JobIteration::Integrations.load("unknown") + end + assert_equal("Could not find integration for 'unknown'", error.message) end end diff --git a/test/support/sidekiq/init.rb b/test/support/sidekiq/init.rb index 0c546146..47b54c82 100644 --- a/test/support/sidekiq/init.rb +++ b/test/support/sidekiq/init.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true require "job-iteration" -require "job-iteration/integrations/sidekiq" require "active_job" require "i18n" diff --git a/test/test_helper.rb b/test/test_helper.rb index 828f18be..8b847db7 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -3,8 +3,6 @@ $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__)) require "minitest/autorun" -ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true" - require "job-iteration" require "job-iteration/test_helper" @@ -40,6 +38,7 @@ def enqueue_at(job, timestamp) end ActiveJob::Base.queue_adapter = :iteration_test +JobIteration::Integrations.register("iteration_test", -> { false }) class Product < ActiveRecord::Base has_many :comments diff --git a/test/unit/active_job_iteration_test.rb b/test/unit/active_job_iteration_test.rb index ba118c21..0178a6e3 100644 --- a/test/unit/active_job_iteration_test.rb +++ b/test/unit/active_job_iteration_test.rb @@ -724,16 +724,6 @@ def test_on_shutdown_called_before_reenqueue assert_jobs_in_queue(1) end - def test_mark_job_worker_as_interrupted - mark_job_worker_as_interrupted - - assert_equal(true, JobIteration.interruption_adapter.call) - - continue_iterating - - assert_equal(false, JobIteration.interruption_adapter.call) - end - def test_reenqueue_self iterate_exact_times(2.times)