Skip to content

Commit

Permalink
Infer the interruption handler for a given integraion from the job it…
Browse files Browse the repository at this point in the history
…self

Instead of automatically attempting to load the interruption adapter implicitly this patch changes this to be discovered for a given worker based on the job itself based on its class adapter.

I believe this will work OK, and should allow a bit more flexibility around using JobIteration.

This patch also contains a bunch of renames, broadly from  `job-iteration` -> `job_iteration`.

This might be a bit contentious, but the current module naming breaks
autoloading a bit, which is not ideal, and I’d like to leverage it for loading integrations.
  • Loading branch information
plasticine committed May 27, 2021
1 parent 83e708e commit 47adc55
Show file tree
Hide file tree
Showing 23 changed files with 149 additions and 127 deletions.
2 changes: 1 addition & 1 deletion job-iteration.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

lib = File.expand_path("../lib", __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require "job-iteration/version"
require "job_iteration/version"

Gem::Specification.new do |spec|
spec.name = "job-iteration"
Expand Down
64 changes: 0 additions & 64 deletions lib/job-iteration.rb

This file was deleted.

18 changes: 0 additions & 18 deletions lib/job-iteration/integrations/resque.rb

This file was deleted.

15 changes: 0 additions & 15 deletions lib/job-iteration/integrations/sidekiq.rb

This file was deleted.

35 changes: 35 additions & 0 deletions lib/job_iteration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

require_relative "./job_iteration/version"
require_relative "./job_iteration/enumerator_builder"
require_relative "./job_iteration/iteration"
require_relative "./job_iteration/integrations"

module JobIteration
extend self

# Use this to _always_ interrupt the job after it's been running for more than N seconds.
# @example
#
# JobIteration.max_job_runtime = 5.minutes
#
# This setting will make it to always interrupt a job after it's been iterating for 5 minutes.
# Defaults to nil which means that jobs will not be interrupted except on termination signal.
attr_accessor :max_job_runtime

# Set if you want to use your own enumerator builder instead of default EnumeratorBuilder.
# @example
#
# class MyOwnBuilder < JobIteration::EnumeratorBuilder
# # ...
# end
#
# JobIteration.enumerator_builder = MyOwnBuilder
attr_accessor :enumerator_builder
self.enumerator_builder = JobIteration::EnumeratorBuilder

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
def self.load_interruption_integration(integration)
JobIteration::Integrations.load(integration)
end
end
File renamed without changes.
File renamed without changes.
File renamed without changes.
21 changes: 21 additions & 0 deletions lib/job_iteration/integrations.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

module JobIteration
module Integrations
IntegrationLoadError = Class.new(StandardError)

extend ActiveSupport::Autoload

autoload :SidekiqIntegration
autoload :ResqueIntegration

class << self
def load(integration)
integration = const_get(integration.to_s.camelize << "Integration")
integration.new
rescue NameError
raise IntegrationLoadError, "#{integration} integration is not supported."
end
end
end
end
23 changes: 23 additions & 0 deletions lib/job_iteration/integrations/resque_integration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

require "resque"

module JobIteration
module Integrations
class ResqueIntegration
module ResqueIterationExtension # @private
def initialize(*) # @private
$resque_worker = self
super
end
end

# The patch is required in order to call shutdown? on a Resque::Worker instance
Resque::Worker.prepend(ResqueIterationExtension)

def stopping?
$resque_worker.try!(:shutdown?)
end
end
end
end
17 changes: 17 additions & 0 deletions lib/job_iteration/integrations/sidekiq_integration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

require "sidekiq"

module JobIteration
module Integrations
class SidekiqIntegration
def stopping?
if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance
Sidekiq::CLI.instance.launcher.stopping?
else
false
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,16 @@ def output_interrupt_summary
logger.info(Kernel.format(message, times_interrupted, total_time))
end

def interruption_integration
JobIteration.load_interruption_integration(self.class.queue_adapter_name)
end

def job_should_exit?
if ::JobIteration.max_job_runtime && start_time && (Time.now.utc - start_time) > ::JobIteration.max_job_runtime
return true
end

JobIteration.interruption_adapter.call || (defined?(super) && super)
interruption_integration.stopping? || (defined?(super) && super)
end

def run_complete_callbacks?(completed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def initialize(stop_after_count)
@calls = 0
end

def call
def stopping?
@calls += 1
(@calls % @stop_after_count) == 0
end
Expand All @@ -23,7 +23,7 @@ def call
# MyJob.perform_now
# end
def iterate_exact_times(n_times)
JobIteration.stubs(:interruption_adapter).returns(StoppingSupervisor.new(n_times.size))
JobIteration.stubs(:load_interruption_integration).returns(StoppingSupervisor.new(n_times.size))
end

# Stubs interruption adapter to interrupt the job after every sing iteration.
Expand All @@ -46,8 +46,8 @@ def mark_job_worker_as_interrupted

def stub_shutdown_adapter_to_return(value)
adapter = mock
adapter.stubs(:call).returns(value)
JobIteration.stubs(:interruption_adapter).returns(adapter)
adapter.stubs(:stopping?).returns(value)
JobIteration.stubs(:load_interruption_integration).returns(adapter)
end
end
end
File renamed without changes.
File renamed without changes.
15 changes: 1 addition & 14 deletions test/integration/integrations_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,13 @@
require "open3"

class IntegrationsTest < ActiveSupport::TestCase
test "will prevent loading two integrations" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
require 'job-iteration'
RUBBY
_stdout, stderr, status = run_ruby(rubby)

assert_equal false, status.success?
assert_match(/resque integration has already been loaded, but sidekiq is also available/, stderr)
end
end

test "successfully loads one (resque) integration" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
# Remove sidekiq, only resque will be left
$LOAD_PATH.delete_if { |p| p =~ /sidekiq/ }
require 'job-iteration'
require 'job_iteration'
RUBBY
_stdout, _stderr, status = run_ruby(rubby)

Expand Down
14 changes: 14 additions & 0 deletions test/support/async_integration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

module JobIteration
module Integrations
# https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html
module AsyncIntegration
class << self
def interruption_adapter
false
end
end
end
end
end
5 changes: 3 additions & 2 deletions test/support/resque/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
# $LOAD_PATH.unshift File.dirname(__FILE__) unless $LOAD_PATH.include?(File.dirname(__FILE__))
require "resque/tasks"

require "job-iteration"
require "job-iteration/integrations/resque"
require "job_iteration"
require "job_iteration/integrations/resque_integration"
require "active_job"
require "i18n"

require_relative "../async_integration"
require_relative "../jobs"

redis_url = if ENV["USING_DEV"] == "1"
Expand Down
5 changes: 3 additions & 2 deletions test/support/sidekiq/init.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# frozen_string_literal: true

require "job-iteration"
require "job-iteration/integrations/sidekiq"
require "job_iteration"
require "job_iteration/integrations/sidekiq_integration"

require "active_job"
require "i18n"

require_relative "../async_integration"
require_relative "../jobs"

redis_host = if ENV["USING_DEV"] == "1"
Expand Down
24 changes: 20 additions & 4 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "minitest/autorun"

ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true"

require "job-iteration"
require "job-iteration/test_helper"
require "job_iteration"
require "job_iteration/test_helper"

require "globalid"
require "sidekiq"
Expand Down Expand Up @@ -40,6 +38,24 @@ def enqueue_at(job, _delay)
end
end

module JobIteration
module Integrations
class IterationTestIntegration
def stopping?
false
end
end

# https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html
class AsyncIntegration
def stopping?
false
end
end
end
end


ActiveJob::Base.queue_adapter = :iteration_test

class Product < ActiveRecord::Base
Expand Down
4 changes: 2 additions & 2 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -696,11 +696,11 @@ def test_respects_job_should_exit_from_parent_class
def test_mark_job_worker_as_interrupted
mark_job_worker_as_interrupted

assert_equal(true, JobIteration.interruption_adapter.call)
assert_equal(true, JobIteration.load_interruption_integration().stopping?)

continue_iterating

assert_equal(false, JobIteration.interruption_adapter.call)
assert_equal(false, JobIteration.load_interruption_integration().stopping?)
end

def test_reenqueue_self
Expand Down

0 comments on commit 47adc55

Please sign in to comment.