From 47adc55c40d2f95761f1bd7c49f9c935d4f2583e Mon Sep 17 00:00:00 2001 From: Justin Morris Date: Tue, 18 May 2021 11:22:46 +1000 Subject: [PATCH] Infer the interruption handler for a given integraion from the job itself MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of automatically attempting to load the interruption adapter implicitly this patch changes this to be discovered for a given worker based on the job itself based on its class adapter. I believe this will work OK, and should allow a bit more flexibility around using JobIteration. This patch also contains a bunch of renames, broadly from `job-iteration` -> `job_iteration`. This might be a bit contentious, but the current module naming breaks autoloading a bit, which is not ideal, and I’d like to leverage it for loading integrations. --- job-iteration.gemspec | 2 +- lib/job-iteration.rb | 64 ------------------- lib/job-iteration/integrations/resque.rb | 18 ------ lib/job-iteration/integrations/sidekiq.rb | 15 ----- lib/job_iteration.rb | 35 ++++++++++ .../active_record_batch_enumerator.rb | 0 .../active_record_cursor.rb | 0 .../active_record_enumerator.rb | 0 .../csv_enumerator.rb | 0 .../enumerator_builder.rb | 0 lib/job_iteration/integrations.rb | 21 ++++++ .../integrations/resque_integration.rb | 23 +++++++ .../integrations/sidekiq_integration.rb | 17 +++++ .../iteration.rb | 6 +- .../test_helper.rb | 8 +-- .../throttle_enumerator.rb | 0 .../version.rb | 0 test/integration/integrations_test.rb | 15 +---- test/support/async_integration.rb | 14 ++++ test/support/resque/Rakefile | 5 +- test/support/sidekiq/init.rb | 5 +- test/test_helper.rb | 24 +++++-- test/unit/active_job_iteration_test.rb | 4 +- 23 files changed, 149 insertions(+), 127 deletions(-) delete mode 100644 lib/job-iteration.rb delete mode 100644 lib/job-iteration/integrations/resque.rb delete mode 100644 lib/job-iteration/integrations/sidekiq.rb create mode 100644 lib/job_iteration.rb rename lib/{job-iteration => job_iteration}/active_record_batch_enumerator.rb (100%) rename lib/{job-iteration => job_iteration}/active_record_cursor.rb (100%) rename lib/{job-iteration => job_iteration}/active_record_enumerator.rb (100%) rename lib/{job-iteration => job_iteration}/csv_enumerator.rb (100%) rename lib/{job-iteration => job_iteration}/enumerator_builder.rb (100%) create mode 100644 lib/job_iteration/integrations.rb create mode 100644 lib/job_iteration/integrations/resque_integration.rb create mode 100644 lib/job_iteration/integrations/sidekiq_integration.rb rename lib/{job-iteration => job_iteration}/iteration.rb (97%) rename lib/{job-iteration => job_iteration}/test_helper.rb (84%) rename lib/{job-iteration => job_iteration}/throttle_enumerator.rb (100%) rename lib/{job-iteration => job_iteration}/version.rb (100%) create mode 100644 test/support/async_integration.rb diff --git a/job-iteration.gemspec b/job-iteration.gemspec index 1240999b..844a6225 100644 --- a/job-iteration.gemspec +++ b/job-iteration.gemspec @@ -2,7 +2,7 @@ lib = File.expand_path("../lib", __FILE__) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) -require "job-iteration/version" +require "job_iteration/version" Gem::Specification.new do |spec| spec.name = "job-iteration" diff --git a/lib/job-iteration.rb b/lib/job-iteration.rb deleted file mode 100644 index 863fa832..00000000 --- a/lib/job-iteration.rb +++ /dev/null @@ -1,64 +0,0 @@ -# frozen_string_literal: true - -require_relative "./job-iteration/version" -require_relative "./job-iteration/enumerator_builder" -require_relative "./job-iteration/iteration" - -module JobIteration - IntegrationLoadError = Class.new(StandardError) - - INTEGRATIONS = [:resque, :sidekiq] - - extend self - - # Use this to _always_ interrupt the job after it's been running for more than N seconds. - # @example - # - # JobIteration.max_job_runtime = 5.minutes - # - # This setting will make it to always interrupt a job after it's been iterating for 5 minutes. - # Defaults to nil which means that jobs will not be interrupted except on termination signal. - attr_accessor :max_job_runtime - - # Used internally for hooking into job processing frameworks like Sidekiq and Resque. - attr_accessor :interruption_adapter - self.interruption_adapter = -> { false } - - # Set if you want to use your own enumerator builder instead of default EnumeratorBuilder. - # @example - # - # class MyOwnBuilder < JobIteration::EnumeratorBuilder - # # ... - # end - # - # JobIteration.enumerator_builder = MyOwnBuilder - attr_accessor :enumerator_builder - self.enumerator_builder = JobIteration::EnumeratorBuilder - - def load_integrations - loaded = nil - INTEGRATIONS.each do |integration| - begin - load_integration(integration) - if loaded - raise IntegrationLoadError, - "#{loaded} integration has already been loaded, but #{integration} is also available. " \ - "Iteration will only work with one integration." - end - loaded = integration - rescue LoadError - end - end - end - - def load_integration(integration) - unless INTEGRATIONS.include?(integration) - raise IntegrationLoadError, - "#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}" - end - - require_relative "./job-iteration/integrations/#{integration}" - end -end - -JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"] diff --git a/lib/job-iteration/integrations/resque.rb b/lib/job-iteration/integrations/resque.rb deleted file mode 100644 index bfdb37d9..00000000 --- a/lib/job-iteration/integrations/resque.rb +++ /dev/null @@ -1,18 +0,0 @@ -# frozen_string_literal: true - -require "resque" - -module JobIteration - module Integrations - module ResqueIterationExtension # @private - def initialize(*) # @private - $resque_worker = self - super - end - end - # The patch is required in order to call shutdown? on a Resque::Worker instance - Resque::Worker.prepend(ResqueIterationExtension) - - JobIteration.interruption_adapter = -> { $resque_worker.try!(:shutdown?) } - end -end diff --git a/lib/job-iteration/integrations/sidekiq.rb b/lib/job-iteration/integrations/sidekiq.rb deleted file mode 100644 index d5ff2d3c..00000000 --- a/lib/job-iteration/integrations/sidekiq.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -require "sidekiq" - -module JobIteration - module Integrations # @private - JobIteration.interruption_adapter = -> do - if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance - Sidekiq::CLI.instance.launcher.stopping? - else - false - end - end - end -end diff --git a/lib/job_iteration.rb b/lib/job_iteration.rb new file mode 100644 index 00000000..6388e494 --- /dev/null +++ b/lib/job_iteration.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +require_relative "./job_iteration/version" +require_relative "./job_iteration/enumerator_builder" +require_relative "./job_iteration/iteration" +require_relative "./job_iteration/integrations" + +module JobIteration + extend self + + # Use this to _always_ interrupt the job after it's been running for more than N seconds. + # @example + # + # JobIteration.max_job_runtime = 5.minutes + # + # This setting will make it to always interrupt a job after it's been iterating for 5 minutes. + # Defaults to nil which means that jobs will not be interrupted except on termination signal. + attr_accessor :max_job_runtime + + # Set if you want to use your own enumerator builder instead of default EnumeratorBuilder. + # @example + # + # class MyOwnBuilder < JobIteration::EnumeratorBuilder + # # ... + # end + # + # JobIteration.enumerator_builder = MyOwnBuilder + attr_accessor :enumerator_builder + self.enumerator_builder = JobIteration::EnumeratorBuilder + + # Used internally for hooking into job processing frameworks like Sidekiq and Resque. + def self.load_interruption_integration(integration) + JobIteration::Integrations.load(integration) + end +end diff --git a/lib/job-iteration/active_record_batch_enumerator.rb b/lib/job_iteration/active_record_batch_enumerator.rb similarity index 100% rename from lib/job-iteration/active_record_batch_enumerator.rb rename to lib/job_iteration/active_record_batch_enumerator.rb diff --git a/lib/job-iteration/active_record_cursor.rb b/lib/job_iteration/active_record_cursor.rb similarity index 100% rename from lib/job-iteration/active_record_cursor.rb rename to lib/job_iteration/active_record_cursor.rb diff --git a/lib/job-iteration/active_record_enumerator.rb b/lib/job_iteration/active_record_enumerator.rb similarity index 100% rename from lib/job-iteration/active_record_enumerator.rb rename to lib/job_iteration/active_record_enumerator.rb diff --git a/lib/job-iteration/csv_enumerator.rb b/lib/job_iteration/csv_enumerator.rb similarity index 100% rename from lib/job-iteration/csv_enumerator.rb rename to lib/job_iteration/csv_enumerator.rb diff --git a/lib/job-iteration/enumerator_builder.rb b/lib/job_iteration/enumerator_builder.rb similarity index 100% rename from lib/job-iteration/enumerator_builder.rb rename to lib/job_iteration/enumerator_builder.rb diff --git a/lib/job_iteration/integrations.rb b/lib/job_iteration/integrations.rb new file mode 100644 index 00000000..271b25ad --- /dev/null +++ b/lib/job_iteration/integrations.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module JobIteration + module Integrations + IntegrationLoadError = Class.new(StandardError) + + extend ActiveSupport::Autoload + + autoload :SidekiqIntegration + autoload :ResqueIntegration + + class << self + def load(integration) + integration = const_get(integration.to_s.camelize << "Integration") + integration.new + rescue NameError + raise IntegrationLoadError, "#{integration} integration is not supported." + end + end + end +end diff --git a/lib/job_iteration/integrations/resque_integration.rb b/lib/job_iteration/integrations/resque_integration.rb new file mode 100644 index 00000000..e68099f1 --- /dev/null +++ b/lib/job_iteration/integrations/resque_integration.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +require "resque" + +module JobIteration + module Integrations + class ResqueIntegration + module ResqueIterationExtension # @private + def initialize(*) # @private + $resque_worker = self + super + end + end + + # The patch is required in order to call shutdown? on a Resque::Worker instance + Resque::Worker.prepend(ResqueIterationExtension) + + def stopping? + $resque_worker.try!(:shutdown?) + end + end + end +end diff --git a/lib/job_iteration/integrations/sidekiq_integration.rb b/lib/job_iteration/integrations/sidekiq_integration.rb new file mode 100644 index 00000000..c94e7092 --- /dev/null +++ b/lib/job_iteration/integrations/sidekiq_integration.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +require "sidekiq" + +module JobIteration + module Integrations + class SidekiqIntegration + def stopping? + if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance + Sidekiq::CLI.instance.launcher.stopping? + else + false + end + end + end + end +end diff --git a/lib/job-iteration/iteration.rb b/lib/job_iteration/iteration.rb similarity index 97% rename from lib/job-iteration/iteration.rb rename to lib/job_iteration/iteration.rb index e0f263b7..1d8685d3 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job_iteration/iteration.rb @@ -255,12 +255,16 @@ def output_interrupt_summary logger.info(Kernel.format(message, times_interrupted, total_time)) end + def interruption_integration + JobIteration.load_interruption_integration(self.class.queue_adapter_name) + end + def job_should_exit? if ::JobIteration.max_job_runtime && start_time && (Time.now.utc - start_time) > ::JobIteration.max_job_runtime return true end - JobIteration.interruption_adapter.call || (defined?(super) && super) + interruption_integration.stopping? || (defined?(super) && super) end def run_complete_callbacks?(completed) diff --git a/lib/job-iteration/test_helper.rb b/lib/job_iteration/test_helper.rb similarity index 84% rename from lib/job-iteration/test_helper.rb rename to lib/job_iteration/test_helper.rb index 951f29b2..66c23f03 100644 --- a/lib/job-iteration/test_helper.rb +++ b/lib/job_iteration/test_helper.rb @@ -9,7 +9,7 @@ def initialize(stop_after_count) @calls = 0 end - def call + def stopping? @calls += 1 (@calls % @stop_after_count) == 0 end @@ -23,7 +23,7 @@ def call # MyJob.perform_now # end def iterate_exact_times(n_times) - JobIteration.stubs(:interruption_adapter).returns(StoppingSupervisor.new(n_times.size)) + JobIteration.stubs(:load_interruption_integration).returns(StoppingSupervisor.new(n_times.size)) end # Stubs interruption adapter to interrupt the job after every sing iteration. @@ -46,8 +46,8 @@ def mark_job_worker_as_interrupted def stub_shutdown_adapter_to_return(value) adapter = mock - adapter.stubs(:call).returns(value) - JobIteration.stubs(:interruption_adapter).returns(adapter) + adapter.stubs(:stopping?).returns(value) + JobIteration.stubs(:load_interruption_integration).returns(adapter) end end end diff --git a/lib/job-iteration/throttle_enumerator.rb b/lib/job_iteration/throttle_enumerator.rb similarity index 100% rename from lib/job-iteration/throttle_enumerator.rb rename to lib/job_iteration/throttle_enumerator.rb diff --git a/lib/job-iteration/version.rb b/lib/job_iteration/version.rb similarity index 100% rename from lib/job-iteration/version.rb rename to lib/job_iteration/version.rb diff --git a/test/integration/integrations_test.rb b/test/integration/integrations_test.rb index 0b248389..2a4a5fd4 100644 --- a/test/integration/integrations_test.rb +++ b/test/integration/integrations_test.rb @@ -4,26 +4,13 @@ require "open3" class IntegrationsTest < ActiveSupport::TestCase - test "will prevent loading two integrations" do - with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do - rubby = <<~RUBBY - require 'bundler/setup' - require 'job-iteration' - RUBBY - _stdout, stderr, status = run_ruby(rubby) - - assert_equal false, status.success? - assert_match(/resque integration has already been loaded, but sidekiq is also available/, stderr) - end - end - test "successfully loads one (resque) integration" do with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do rubby = <<~RUBBY require 'bundler/setup' # Remove sidekiq, only resque will be left $LOAD_PATH.delete_if { |p| p =~ /sidekiq/ } - require 'job-iteration' + require 'job_iteration' RUBBY _stdout, _stderr, status = run_ruby(rubby) diff --git a/test/support/async_integration.rb b/test/support/async_integration.rb new file mode 100644 index 00000000..f2d291f6 --- /dev/null +++ b/test/support/async_integration.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module JobIteration + module Integrations + # https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html + module AsyncIntegration + class << self + def interruption_adapter + false + end + end + end + end +end diff --git a/test/support/resque/Rakefile b/test/support/resque/Rakefile index cdea6332..2557f006 100644 --- a/test/support/resque/Rakefile +++ b/test/support/resque/Rakefile @@ -4,11 +4,12 @@ # $LOAD_PATH.unshift File.dirname(__FILE__) unless $LOAD_PATH.include?(File.dirname(__FILE__)) require "resque/tasks" -require "job-iteration" -require "job-iteration/integrations/resque" +require "job_iteration" +require "job_iteration/integrations/resque_integration" require "active_job" require "i18n" +require_relative "../async_integration" require_relative "../jobs" redis_url = if ENV["USING_DEV"] == "1" diff --git a/test/support/sidekiq/init.rb b/test/support/sidekiq/init.rb index 0c546146..988e87dd 100644 --- a/test/support/sidekiq/init.rb +++ b/test/support/sidekiq/init.rb @@ -1,11 +1,12 @@ # frozen_string_literal: true -require "job-iteration" -require "job-iteration/integrations/sidekiq" +require "job_iteration" +require "job_iteration/integrations/sidekiq_integration" require "active_job" require "i18n" +require_relative "../async_integration" require_relative "../jobs" redis_host = if ENV["USING_DEV"] == "1" diff --git a/test/test_helper.rb b/test/test_helper.rb index b8f1a600..2b705255 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -3,10 +3,8 @@ $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__)) require "minitest/autorun" -ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true" - -require "job-iteration" -require "job-iteration/test_helper" +require "job_iteration" +require "job_iteration/test_helper" require "globalid" require "sidekiq" @@ -40,6 +38,24 @@ def enqueue_at(job, _delay) end end +module JobIteration + module Integrations + class IterationTestIntegration + def stopping? + false + end + end + + # https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html + class AsyncIntegration + def stopping? + false + end + end + end +end + + ActiveJob::Base.queue_adapter = :iteration_test class Product < ActiveRecord::Base diff --git a/test/unit/active_job_iteration_test.rb b/test/unit/active_job_iteration_test.rb index 64e88855..c392786c 100644 --- a/test/unit/active_job_iteration_test.rb +++ b/test/unit/active_job_iteration_test.rb @@ -696,11 +696,11 @@ def test_respects_job_should_exit_from_parent_class def test_mark_job_worker_as_interrupted mark_job_worker_as_interrupted - assert_equal(true, JobIteration.interruption_adapter.call) + assert_equal(true, JobIteration.load_interruption_integration().stopping?) continue_iterating - assert_equal(false, JobIteration.interruption_adapter.call) + assert_equal(false, JobIteration.load_interruption_integration().stopping?) end def test_reenqueue_self