Skip to content

Commit

Permalink
Merge pull request #409 from Shopify/batch-collections-w-batch-enumer…
Browse files Browse the repository at this point in the history
…ator

Support `ActiveRecord::Batches::BatchEnumerator`  collections
  • Loading branch information
adrianna-chang-shopify authored Jun 3, 2021
2 parents 66cc13e + d7b7dc5 commit 16b1070
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 7 deletions.
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,46 @@ title,content
My Title,Hello World!
```

### Processing Batch Collections

The Maintenance Tasks gem supports processing Active Records in batches. This
can reduce the number of calls your Task makes to the database. Use
`ActiveRecord::Batches#in_batches` on the relation returned by your collection to specify that your Task should process
batches instead of records. Active Record defaults to 1000 records by batch, but a custom size can be
specified.

```ruby
# app/tasks/maintenance/update_posts_in_batches_task.rb
module Maintenance
class UpdatePostsInBatchesTask < MaintenanceTasks::Task
def collection
Post.in_batches
end

def process(batch_of_posts)
batch_of_posts.update_all(content: "New content added on #{Time.now.utc}")
end
end
end
```

Ensure that you've implemented the following methods:

* `collection`: return an `ActiveRecord::Batches::BatchEnumerator`.
* `process`: do the work of your Task on a batch (`ActiveRecord::Relation`).

Note that `#count` is calculated automatically based on the number of batches in
your collection, and your Task's progress will be displayed in terms of batches
(not the number of records in the relation).

**Important!** Batches should only be used if `#process` is performing a batch
operation such as `#update_all` or `#delete_all`. If you need to iterate over
individual records, you should define a collection that [returns an
`ActiveRecord::Relation`](#creating-a-task). This uses batching
internally, but loads the records with one SQL query. Conversely, batch
collections load the primary keys of the records of the batch first, and then perform an additional query to load the
records when calling `each` (or any `Enumerable` method) inside `#process`.

### Throttling

Maintenance Tasks often modify a lot of data and can be taxing on your database.
Expand Down
26 changes: 23 additions & 3 deletions app/jobs/concerns/maintenance_tasks/task_job_concern.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,35 @@ def retry_on(*, **)
def build_enumerator(_run, cursor:)
cursor ||= @run.cursor
collection = @task.collection
@enumerator = nil

collection_enum = case collection
when ActiveRecord::Relation
enumerator_builder.active_record_on_records(collection, cursor: cursor)
when ActiveRecord::Batches::BatchEnumerator
if collection.start || collection.finish
raise ArgumentError, <<~MSG.squish
#{@task.class.name}#collection cannot support
a batch enumerator with the "start" or "finish" options.
MSG
end
# For now, only support automatic count based on the enumerator for
# batches
@enumerator = enumerator_builder.active_record_on_batch_relations(
collection.relation,
cursor: cursor,
batch_size: collection.batch_size,
)
when Array
enumerator_builder.build_array_enumerator(collection, cursor: cursor)
when CSV
JobIteration::CsvEnumerator.new(collection).rows(cursor: cursor)
else
raise ArgumentError, "#{@task.class.name}#collection must be either "\
"an Active Record Relation, Array, or CSV."
raise ArgumentError, <<~MSG.squish
#{@task.class.name}#collection must be either an
Active Record Relation, ActiveRecord::Batches::BatchEnumerator,
Array, or CSV.
MSG
end

@task.throttle_conditions.reduce(collection_enum) do |enum, condition|
Expand Down Expand Up @@ -85,7 +103,9 @@ def before_perform
end

def on_start
@run.update!(started_at: Time.now, tick_total: @task.count)
count = @task.count
count = @enumerator&.size if count == :no_count
@run.update!(started_at: Time.now, tick_total: count)
end

def on_complete
Expand Down
1 change: 1 addition & 0 deletions app/tasks/maintenance_tasks/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def process(_item)
#
# @return [Integer, nil]
def count
:no_count
end
end
end
2 changes: 2 additions & 0 deletions lib/maintenance_tasks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
require "job-iteration"
require "maintenance_tasks/engine"

require "patches/active_record_batch_enumerator"

# The engine's namespace module. It provides isolation between the host
# application's code and the engine-specific code. Top-level engine constants
# and variables are defined under this module.
Expand Down
23 changes: 23 additions & 0 deletions lib/patches/active_record_batch_enumerator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

# TODO: Remove this patch once all supported Rails versions include the changes
# upstream - https://github.com/rails/rails/pull/42312/commits/a031a43d969c87542c4ee8d0d338d55fcbb53376
module ActiveRecordBatchEnumerator
# The primary key value from which the BatchEnumerator starts,
# inclusive of the value.
attr_reader :start

# The primary key value at which the BatchEnumerator ends,
# inclusive of the value.
attr_reader :finish

# The relation from which the BatchEnumerator yields batches.
attr_reader :relation

# The size of the batches yielded by the BatchEnumerator.
def batch_size
@of
end
end

ActiveRecord::Batches::BatchEnumerator.include(ActiveRecordBatchEnumerator)
12 changes: 12 additions & 0 deletions test/dummy/app/tasks/maintenance/update_posts_in_batches_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true
module Maintenance
class UpdatePostsInBatchesTask < MaintenanceTasks::Task
def collection
Post.in_batches(of: 5)
end

def process(batch_of_posts)
batch_of_posts.update_all(content: "New content added on #{Time.now.utc}")
end
end
end
41 changes: 39 additions & 2 deletions test/jobs/maintenance_tasks/task_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,11 @@ class << self
assert_predicate @run, :errored?
assert_equal "ArgumentError", @run.error_class
assert_empty @run.backtrace
expected_message = "Maintenance::TestTask#collection "\
"must be either an Active Record Relation, Array, or CSV."
expected_message = <<~MSG.squish
Maintenance::TestTask#collection must be either an
Active Record Relation, ActiveRecord::Batches::BatchEnumerator,
Array, or CSV.
MSG
assert_equal expected_message, @run.error_message
end

Expand Down Expand Up @@ -368,5 +371,39 @@ class << self

assert_predicate run.reload, :succeeded?
end

test ".perform_now handles batch relation tasks" do
5.times do |i|
Post.create!(title: "Another Post ##{i}", content: "Content ##{i}")
end
# We expect 2 batches (7 posts => 5 + 2)
Maintenance::UpdatePostsInBatchesTask.any_instance.expects(:process).twice

run = Run.create!(task_name: "Maintenance::UpdatePostsInBatchesTask")
TaskJob.perform_now(run)

run.reload
assert_equal 2, run.tick_total
assert_equal 2, run.tick_count
end

test ".perform_now raises if +start+ or +finish+ options are used on batch enumerator" do
batch_enumerator = Post.in_batches(of: 5, start: 1, finish: 10)

Maintenance::UpdatePostsInBatchesTask.any_instance
.expects(:collection).returns(batch_enumerator)

run = Run.create!(task_name: "Maintenance::UpdatePostsInBatchesTask")
TaskJob.perform_now(run)

assert_predicate run.reload, :errored?
assert_equal "ArgumentError", run.error_class
assert_empty run.backtrace
expected_message = <<~MSG.squish
Maintenance::UpdatePostsInBatchesTask#collection cannot support a batch
enumerator with the "start" or "finish" options.
MSG
assert_equal expected_message, run.error_message
end
end
end
1 change: 1 addition & 0 deletions test/models/maintenance_tasks/task_data_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TaskDataTest < ActiveSupport::TestCase
"Maintenance::ImportPostsTask",
"Maintenance::ParamsTask",
"Maintenance::TestTask",
"Maintenance::UpdatePostsInBatchesTask",
"Maintenance::UpdatePostsTask",
"Maintenance::UpdatePostsThrottledTask",
]
Expand Down
1 change: 1 addition & 0 deletions test/system/maintenance_tasks/tasks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class TasksTest < ApplicationSystemTestCase
"Maintenance::ImportPostsTask\nNew",
"Maintenance::ParamsTask\nNew",
"Maintenance::TestTask\nNew",
"Maintenance::UpdatePostsInBatchesTask\nNew",
"Maintenance::UpdatePostsThrottledTask\nNew",
"Completed Tasks",
"Maintenance::UpdatePostsTask\nSucceeded",
Expand Down
5 changes: 3 additions & 2 deletions test/tasks/maintenance_tasks/task_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class TaskTest < ActiveSupport::TestCase
"Maintenance::ImportPostsTask",
"Maintenance::ParamsTask",
"Maintenance::TestTask",
"Maintenance::UpdatePostsInBatchesTask",
"Maintenance::UpdatePostsTask",
"Maintenance::UpdatePostsThrottledTask",
]
Expand Down Expand Up @@ -53,9 +54,9 @@ class TaskTest < ActiveSupport::TestCase
assert_equal 2, Maintenance::TestTask.count
end

test "#count is nil by default" do
test "#count is :no_count by default" do
task = Task.new
assert_nil task.count
assert_equal(:no_count, task.count)
end

test "#collection raises NoMethodError" do
Expand Down

0 comments on commit 16b1070

Please sign in to comment.