Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap built-in enums and add deprecation warning for unwrapped enums #513

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### Main (unreleased)

Nil
- [513](https://github.com/Shopify/job-iteration/pull/513) Deprecate returning enumerators from `build_enumerator` that are not wrapped with `enumerator_builder.wrap`. The built-in enumerator builders now always wrap.

## v1.7.0 (Oct 11, 2024)

Expand Down
22 changes: 13 additions & 9 deletions guides/custom-enumerator.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ class LoadRefundsForChargeJob < ActiveJob::Base
# Use an exponential back-off strategy when Stripe's API returns errors.

def build_enumerator(charge_id, cursor:)
StripeListEnumerator.new(
Stripe::Refund,
params: { charge: charge_id}, # "charge_id" will be a prefixed Stripe ID such as "chrg_123"
options: { api_key: "sk_test_123", stripe_version: "2018-01-18" },
cursor: cursor
).to_enumerator
enumerator_builder.wrap(
StripeListEnumerator.new(
Stripe::Refund,
params: { charge: charge_id}, # "charge_id" will be a prefixed Stripe ID such as "chrg_123"
options: { api_key: "sk_test_123", stripe_version: "2018-01-18" },
cursor: cursor
).to_enumerator
)
end

# Note that in this case `each_iteration` will only receive one positional argument per iteration.
Expand Down Expand Up @@ -114,9 +116,11 @@ class RedisPopListJob < ActiveJob::Base
# @see https://redis.io/commands/lpop/
def build_enumerator(*)
@redis = Redis.new
Enumerator.new do |yielder|
yielder.yield @redis.lpop(key), nil
end
enumerator_builder.wrap(
Enumerator.new do |yielder|
yielder.yield @redis.lpop(key), nil
end
)
end

def each_iteration(item_from_redis)
Expand Down
24 changes: 23 additions & 1 deletion guides/throttling.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,31 @@ def build_enumerator(_params, cursor:)
end
```

If you want to apply throttling on all jobs, you can subclass your own EnumeratorBuilder and override the default
enumerator builder. The builder always wraps the returned enumerators from `build_enumerator`

```ruby
class MyOwnBuilder < JobIteration::EnumeratorBuilder
class Wrapper < Enumerator
class << self
def wrap(_builder, enum)
ThrottleEnumerator.new(
enum,
nil,
throttle_on: -> { DatabaseStatus.unhealthy? },
backoff: 30.seconds
)
end
end
end
end

JobIteration.enumerator_builder = MyOwnBuilder
```

Note that it's up to you to implement `DatabaseStatus.unhealthy?` that works for your database choice. At Shopify, a helper like `DatabaseStatus` checks the following MySQL metrics:

* Replication lag across all regions
* DB threads
* DB is available for writes (otherwise indicates a failover happening)
* [Semian](https://github.com/shopify/semian) open circuits
* [Semian](https://github.com/shopify/semian) open circuits
19 changes: 11 additions & 8 deletions lib/job-iteration/enumerator_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ class EnumeratorBuilder
# `enumerator_builder` is _always_ the type that is returned from
# `build_enumerator`. This prevents people from implementing custom
# Enumerators without wrapping them in
# `enumerator_builder.wrap(custom_enum)`. We don't do this yet for backwards
# compatibility with raw calls to EnumeratorBuilder. Think of these wrappers
# `enumerator_builder.wrap(custom_enum)`. Think of these wrappers
# the way you should a middleware.
class Wrapper < Enumerator
class << self
Expand Down Expand Up @@ -131,21 +130,24 @@ def build_active_record_enumerator_on_batch_relations(scope, wrap: true, cursor:
enum
end

def build_throttle_enumerator(enum, throttle_on:, backoff:)
JobIteration::ThrottleEnumerator.new(
enum,
def build_throttle_enumerator(enumerable, throttle_on:, backoff:)
enum = JobIteration::ThrottleEnumerator.new(
enumerable,
@job,
throttle_on: throttle_on,
backoff: backoff,
).to_enum
wrap(self, enum)
end

def build_csv_enumerator(enumerable, cursor:)
CsvEnumerator.new(enumerable).rows(cursor: cursor)
enum = CsvEnumerator.new(enumerable).rows(cursor: cursor)
wrap(self, enum)
end

def build_csv_enumerator_on_batches(enumerable, cursor:, batch_size: 100)
CsvEnumerator.new(enumerable).batches(cursor: cursor, batch_size: batch_size)
enum = CsvEnumerator.new(enumerable).batches(cursor: cursor, batch_size: batch_size)
wrap(self, enum)
end

# Builds Enumerator for nested iteration.
Expand Down Expand Up @@ -179,7 +181,8 @@ def build_csv_enumerator_on_batches(enumerable, cursor:, batch_size: 100)
# end
#
def build_nested_enumerator(enums, cursor:)
NestedEnumerator.new(enums, cursor: cursor).each
enum = NestedEnumerator.new(enums, cursor: cursor).each
wrap(self, enum)
end

alias_method :once, :build_once_enumerator
Expand Down
9 changes: 8 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,14 @@ def adjust_total_time
end

def assert_enumerator!(enum)
return if enum.is_a?(Enumerator)
if enum.is_a?(Enumerator)
unless enum.is_a?(JobIteration.enumerator_builder::Wrapper)
JobIteration::Deprecation.warn("Returning an unwrapped enumerator from build_enumerator is deprecated. " \
"Wrap the enumerator using enumerator_builder.wrap(my_enumerator) instead.")
end

return
end

raise ArgumentError, <<~EOS
#build_enumerator is expected to return Enumerator object, but returned #{enum.class}.
Expand Down
17 changes: 17 additions & 0 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,15 @@ def each_iteration(i)
end
end

class UnwrapppedCustomEnumerator < SingleIterationJob
def build_enumerator(cursor:)
[1, 2, 3].each
end

def each_iteration(*)
end
end

class CustomEnumBuilder
def initialize(*)
end
Expand Down Expand Up @@ -695,6 +704,14 @@ def test_custom_enum_builder
JobIteration.enumerator_builder = original_builder
end

def test_unwrapped_enumerator_deprecation_warning
push(UnwrapppedCustomEnumerator)

assert_deprecated("Returning an unwrapped enumerator from build_enumerator", JobIteration::Deprecation) do
work_one_job
end
end

def test_respects_job_should_exit_from_parent_class
JobShouldExitJob.new.perform_now
assert_equal([0], JobShouldExitJob.records_performed)
Expand Down
51 changes: 5 additions & 46 deletions test/unit/enumerator_builder_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class EnumeratorBuilderTest < ActiveSupport::TestCase
end

test_builder_method(:build_active_record_enumerator_on_batch_relations) do
enumerator_builder(wraps: 1).build_active_record_enumerator_on_batch_relations(Product.all, cursor: nil)
enumerator_builder.build_active_record_enumerator_on_batch_relations(Product.all, cursor: nil)
end

test_builder_method("build_active_record_enumerator_on_batch_relations without wrap") do
Expand All @@ -53,19 +53,19 @@ class EnumeratorBuilderTest < ActiveSupport::TestCase
end

test_builder_method(:build_throttle_enumerator) do
enumerator_builder(wraps: 0).build_throttle_enumerator(nil, throttle_on: -> { false }, backoff: 1)
enumerator_builder.build_throttle_enumerator(nil, throttle_on: -> { false }, backoff: 1)
end

test_builder_method(:build_csv_enumerator) do
enumerator_builder(wraps: 0).build_csv_enumerator(CSV.new("test"), cursor: nil)
enumerator_builder.build_csv_enumerator(CSV.new("test"), cursor: nil)
end

test_builder_method(:build_csv_enumerator_on_batches) do
enumerator_builder(wraps: 0).build_csv_enumerator_on_batches(CSV.new("test"), cursor: nil)
enumerator_builder.build_csv_enumerator_on_batches(CSV.new("test"), cursor: nil)
end

test_builder_method(:build_nested_enumerator) do
enumerator_builder(wraps: 0).build_nested_enumerator(
enumerator_builder.build_nested_enumerator(
[
->(cursor) {
enumerator_builder.build_active_record_enumerator_on_records(Product.all, cursor: cursor)
Expand All @@ -81,39 +81,6 @@ class EnumeratorBuilderTest < ActiveSupport::TestCase
# checks that all the non-alias methods were tested
raise "methods not tested: #{methods.inspect}" unless methods.empty?

test "#build_csv_enumerator uses the CsvEnumerator class" do
csv = CSV.open(
sample_csv_with_headers,
converters: :integer,
headers: true,
)
builder = EnumeratorBuilder.new(mock, wrapper: mock)

enum = builder.build_csv_enumerator(csv, cursor: nil)
csv_rows = open_csv.map(&:fields)
enum.each_with_index do |element_and_cursor, index|
assert_equal [csv_rows[index], index], [element_and_cursor[0].fields, element_and_cursor[1]]
end
end

test "#build_csv_enumerator_on_batches uses the CsvEnumerator class with batches" do
csv = CSV.open(
sample_csv_with_headers,
converters: :integer,
headers: true,
)
builder = EnumeratorBuilder.new(mock, wrapper: mock)

enum = builder.build_csv_enumerator_on_batches(csv, cursor: nil, batch_size: 2)
csv_rows = open_csv.to_a
enum.each_with_index do |batch_and_cursor, index|
batch, cursor = batch_and_cursor
expected_batch = csv_rows[index * 2, 2]
assert_equal expected_batch, batch
assert_equal index, cursor
end
end
bdewater marked this conversation as resolved.
Show resolved Hide resolved

private

def enumerator_builder(wraps: 1)
Expand All @@ -123,13 +90,5 @@ def enumerator_builder(wraps: 1)
wrapper.expects(:wrap).with(builder, anything).times(wraps)
builder
end

def sample_csv_with_headers
["test", "support", "sample_csv_with_headers.csv"].join("/")
end

def open_csv(options = {})
CSV.open(sample_csv_with_headers, converters: :integer, headers: true, **options)
end
end
end
Loading