Skip to content

Commit

Permalink
Infer interruption handler from a job's queue adapter
Browse files Browse the repository at this point in the history
...and allow Iteration to be used with multiple job backends simultaneously

Removed test_mark_job_worker_as_interrupted since it was testing stubs

Co-authored-by: Justin Morris <[email protected]>
  • Loading branch information
bdewater and plasticine committed Apr 2, 2023
1 parent a8422fa commit 7f9a996
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 109 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [310](https://github.com/Shopify/job-iteration/pull/310) - Support nested iteration
- [338](https://github.com/Shopify/job-iteration/pull/338) - All logs are now `ActiveSupport::Notifications` events and logged using `ActiveSupport::LogSubscriber` to allow customization. Events now always include the `cursor_position` tag.
- [341](https://github.com/Shopify/job-iteration/pull/341) - Add `JobIteration.default_retry_backoff`, which sets a default delay when jobs are re-enqueued after being interrupted. Defaults to `nil`, meaning no delay, which matches the current behaviour.
- [367](https://github.com/Shopify/job-iteration/pull/367) - Interruption adapter is now inferred from the job's queue adapter instead of `JobIteration.interruption_adapter` (which has been removed). This allows using Iteration with multiple job backends simultaneously.

## v1.3.6 (Mar 9, 2022)

Expand Down
35 changes: 1 addition & 34 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

require "active_job"
require_relative "./job-iteration/version"
require_relative "./job-iteration/integrations"
require_relative "./job-iteration/enumerator_builder"
require_relative "./job-iteration/iteration"
require_relative "./job-iteration/log_subscriber"

module JobIteration
IntegrationLoadError = Class.new(StandardError)

INTEGRATIONS = [:resque, :sidekiq]

extend self

attr_accessor :logger
Expand Down Expand Up @@ -45,11 +42,6 @@ module JobIteration
# where the throttle backoff value will take precedence over this setting.
attr_accessor :default_retry_backoff

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
attr_accessor :interruption_adapter

self.interruption_adapter = -> { false }

# Set if you want to use your own enumerator builder instead of default EnumeratorBuilder.
# @example
#
Expand All @@ -61,29 +53,4 @@ module JobIteration
attr_accessor :enumerator_builder

self.enumerator_builder = JobIteration::EnumeratorBuilder

def load_integrations
loaded = nil
INTEGRATIONS.each do |integration|
load_integration(integration)
if loaded
raise IntegrationLoadError,
"#{loaded} integration has already been loaded, but #{integration} is also available. " \
"Iteration will only work with one integration."
end
loaded = integration
rescue LoadError
end
end

def load_integration(integration)
unless INTEGRATIONS.include?(integration)
raise IntegrationLoadError,
"#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}"
end

require_relative "./job-iteration/integrations/#{integration}"
end
end

JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"]
19 changes: 19 additions & 0 deletions lib/job-iteration/integrations.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module JobIteration
module Integrations # @private
IntegrationLoadError = Class.new(StandardError)

autoload :Fallback, "job-iteration/integrations/fallback"
autoload :Sidekiq, "job-iteration/integrations/sidekiq"
autoload :Resque, "job-iteration/integrations/resque"

extend self

def lookup(queue_adapter)
const_get(queue_adapter.to_s.camelize)
rescue NameError
Fallback
end
end
end
13 changes: 13 additions & 0 deletions lib/job-iteration/integrations/fallback.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

module JobIteration
module Integrations
module Fallback
extend self

def call
false
end
end
end
end
25 changes: 13 additions & 12 deletions lib/job-iteration/integrations/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@

module JobIteration
module Integrations
module ResqueIterationExtension # @private
def initialize(*) # @private
$resque_worker = self
super
module Resque
module IterationExtension
def initialize(*)
$resque_worker = self
super
end
end
end

# @private
module ::Resque
class Worker
# The patch is required in order to call shutdown? on a Resque::Worker instance
prepend(ResqueIterationExtension)
# The patch is required in order to call shutdown? on a Resque::Worker instance
::Resque::Worker.prepend(IterationExtension)

extend self

def call
$resque_worker.try!(:shutdown?)
end
end

JobIteration.interruption_adapter = -> { $resque_worker.try!(:shutdown?) }
end
end
16 changes: 10 additions & 6 deletions lib/job-iteration/integrations/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
require "sidekiq"

module JobIteration
module Integrations # @private
JobIteration.interruption_adapter = -> do
if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance
Sidekiq::CLI.instance.launcher.stopping?
else
false
module Integrations
module Sidekiq
extend self

def call
if defined?(::Sidekiq::CLI) && (instance = ::Sidekiq::CLI.instance)
instance.launcher.stopping?
else
false
end
end
end
end
Expand Down
6 changes: 5 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ def retry_job(*, **)

private

def interruption_adapter
@interruption_adapter ||= JobIteration::Integrations.lookup(self.class.queue_adapter_name)
end

def enumerator_builder
JobIteration.enumerator_builder.new(self)
end
Expand Down Expand Up @@ -293,7 +297,7 @@ def job_should_exit?
return true
end

JobIteration.interruption_adapter.call || (defined?(super) && super)
interruption_adapter.call || (defined?(super) && super)
end

def handle_completed(completed)
Expand Down
4 changes: 2 additions & 2 deletions lib/job-iteration/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def call
# MyJob.perform_now
# end
def iterate_exact_times(n_times)
JobIteration.stubs(:interruption_adapter).returns(StoppingSupervisor.new(n_times.size))
JobIteration::Integrations.stubs(:lookup).returns(StoppingSupervisor.new(n_times.size))
end

# Stubs interruption adapter to interrupt the job after every sing iteration.
Expand All @@ -47,7 +47,7 @@ def mark_job_worker_as_interrupted
def stub_shutdown_adapter_to_return(value)
adapter = mock
adapter.stubs(:call).returns(value)
JobIteration.stubs(:interruption_adapter).returns(adapter)
JobIteration::Integrations.stubs(:lookup).returns(adapter)
end
end
end
64 changes: 23 additions & 41 deletions test/integration/integrations_test.rb
Original file line number Diff line number Diff line change
@@ -1,55 +1,37 @@
# frozen_string_literal: true

require "test_helper"
require "open3"

class IntegrationsTest < ActiveSupport::TestCase
test "will prevent loading two integrations" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
require 'job-iteration'
RUBBY
_stdout, stderr, status = run_ruby(rubby)

assert_equal false, status.success?
assert_match(/resque integration has already been loaded, but sidekiq is also available/, stderr)

class IntegrationsTest < IterationUnitTest
class IterationJob < ActiveJob::Base
include JobIteration::Iteration

def build_enumerator(cursor:)
enumerator_builder.build_once_enumerator(cursor: cursor)
end
end

test "successfully loads one (resque) integration" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
# Remove sidekiq, only resque will be left
$LOAD_PATH.delete_if { |p| p =~ /sidekiq/ }
require 'job-iteration'
RUBBY
_stdout, _stderr, status = run_ruby(rubby)

assert_equal true, status.success?
def each_iteration(*)
end
end

private
class ResqueJob < IterationJob
self.queue_adapter = :resque
end

def run_ruby(body)
stdout, stderr, status = nil
Tempfile.open do |f|
f.write(body)
f.close
class SidekiqJob < IterationJob
self.queue_adapter = :sidekiq
end

command = "ruby #{f.path}"
stdout, stderr, status = Open3.capture3(command)
end
[stdout, stderr, status]
test "will load two integrations" do
resque_job = ResqueJob.new.serialize
ActiveJob::Base.execute(resque_job)

sidekiq_job = SidekiqJob.new.serialize
ActiveJob::Base.execute(sidekiq_job)
end

def with_env(variable, value)
original = ENV[variable]
ENV[variable] = value
yield
ensure
ENV[variable] = original
test "handles unknown Active Job queue adapater names" do
interruption_adapter = JobIteration::Integrations.lookup(:unknown)
assert_equal(false, interruption_adapter.call)
end
end
1 change: 0 additions & 1 deletion test/support/sidekiq/init.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# frozen_string_literal: true

require "job-iteration"
require "job-iteration/integrations/sidekiq"

require "active_job"
require "i18n"
Expand Down
14 changes: 12 additions & 2 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "minitest/autorun"

ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true"

require "job-iteration"
require "job-iteration/test_helper"

Expand Down Expand Up @@ -39,6 +37,18 @@ def enqueue_at(job, timestamp)
end
end

module JobIteration
module Integrations
module IterationTest
extend self

def call
false
end
end
end
end

ActiveJob::Base.queue_adapter = :iteration_test

class Product < ActiveRecord::Base
Expand Down
10 changes: 0 additions & 10 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -724,16 +724,6 @@ def test_on_shutdown_called_before_reenqueue
assert_jobs_in_queue(1)
end

def test_mark_job_worker_as_interrupted
mark_job_worker_as_interrupted

assert_equal(true, JobIteration.interruption_adapter.call)

continue_iterating

assert_equal(false, JobIteration.interruption_adapter.call)
end

def test_reenqueue_self
iterate_exact_times(2.times)

Expand Down

0 comments on commit 7f9a996

Please sign in to comment.