Skip to content

Commit

Permalink
Serialize cursor_position
Browse files Browse the repository at this point in the history
Previously, `cursor_position` was handed as-is to the queue adapter. This could
lead to the queue adapter corrupting cursors of certain classes. For example,
if given a `Time` cursor, Sidekiq would save it as JSON by calling `to_s`,
resulting in the deserialized cursor being a `String` instead of a `Time`.

To prevent this, we now leverage `ActiveJob::Arguments` to (de)serialize the
`cursor_position` and ensure it will make the round trip safely.

However, as this is a breaking change (as unsafe cursors would previously be
accepted, but possibly corrupted, whereas they would now be rejected), we begin
by rescuing (de)serialization failures and emitting a deprecation warning.

Starting in Job Iteration version 2.0, the deprecation warning will be removed,
and (de)serialization failure will raise.

Application owners can opt-in to the 2.0 behavior either globally by setting

    JobIteration.enforce_serializable_cursors = true

or on an inheritable per-class basis by setting

    class MyJob < ActiveJob::Base
      include JobIteration::Iteration
      self.job_iteration_enforce_serializable_cursors = true
      # ...
    end
  • Loading branch information
sambostock committed Jul 19, 2022
1 parent c9f2f7c commit e96553e
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 158 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Ruby 2.7+, dropping 2.6 support
- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support
- [80](https://github.com/Shopify/job-iteration/pull/80) - Serialize cursors using ActiveJob::Arguments
- [80](https://github.com/Shopify/job-iteration/pull/80) - Deprecate un(de)serializable cursors
- [80](https://github.com/Shopify/job-iteration/pull/80) - Add `enforce_serializable_cursors` config

## v1.3.6 (Mar 9, 2022)

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ class MyJob < ApplicationJob
end
```

See [the guide on Custom Enumerators](guides/custom-enumerator.md) for details.

## Credits

This project would not be possible without these individuals (in alphabetical order):
Expand Down
58 changes: 58 additions & 0 deletions guides/custom-enumerator.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Custom Enumerator

Iteration leverages the [Enumerator](http://ruby-doc.org/core-2.5.1/Enumerator.html) pattern from the Ruby standard library, which allows us to use almost any resource as a collection to iterate.

## Cursorless Enumerator

Consider a custom Enumerator that takes items from a Redis list. Because a Redis List is essentially a queue, we can ignore the cursor:

```ruby
Expand All @@ -19,6 +23,8 @@ class ListJob < ActiveJob::Base
end
```

## Enumerator with cursor

But what about iterating based on a cursor? Consider this Enumerator that wraps third party API (Stripe) for paginated iteration:

```ruby
Expand Down Expand Up @@ -82,6 +88,58 @@ class StripeJob < ActiveJob::Base
end
```

## Notes

We recommend that you read the implementation of the other enumerators that come with the library (`CsvEnumerator`, `ActiveRecordEnumerator`) to gain a better understanding of building Enumerator objects.

### Post-`yield` code

Code that is written after the `yield` in a custom enumerator is not guaranteed to execute. In the case that a job is forced to exit ie `job_should_exit?` is true, then the job is re-enqueued during the yield and the rest of the code in the enumerator does not run. You can follow that logic [here](https://github.com/Shopify/job-iteration/blob/9641f455b9126efff2214692c0bef423e0d12c39/lib/job-iteration/iteration.rb#L128-L131).

### Cursor types

Cursors should be of a [type that Active Job can serialize](https://edgeguides.rubyonrails.org/active_job_basics.html#supported-types-for-arguments).

For example, consider:

```ruby
FancyCursor = Struct.new(:wrapped_value) do
def to_s
wrapped_value
end
end
```

```ruby
def build_enumerator(cursor:)
Enumerator.new do |yielder|
# ...something with fancy cursor...
yield 123, FancyCursor.new(:abc)
end
end
```

If this job was interrupted, Active Job would be unable to serialize
`FancyCursor`, and Job Iteration would fallback to the legacy behavior of not
serializing the cursor. This would typically result in the queue adapter
eventually serializing the cursor as JSON by calling `.to_s` on it. Upon
deserialization, the cursor would be corrupted as `:abc`, rather than
`FancyCursor.new(:abc)`.

To avoid this, job authors should take care to ensure that their cursor is
serializable by Active Job. This can be done in a couple ways, such as:
- [adding GlobalID support to the cursor class](https://edgeguides.rubyonrails.org/active_job_basics.html#globalid)
- [implementing a custom serializer for the cursor class](https://edgeguides.rubyonrails.org/active_job_basics.html#serializers)
- handling (de)serialization in the job/enumerator itself
```diff
def build_enumerator(cursor:)
fancy_cursor = FancyCursor.new(cursor) unless cursor.nil?
Enumerator.new do |yielder|
# ...something with fancy cursor...
yield 123, FancyCursor(:abc).wrapped_value
end
end
```

Note that starting in 2.0, Job Iteration will stop supporting fallback behavior
and raise when it encounters an unserializable cursor.
20 changes: 20 additions & 0 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ module JobIteration

INTEGRATIONS = [:resque, :sidekiq]

Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration")

extend self

# Use this to _always_ interrupt the job after it's been running for more than N seconds.
Expand All @@ -20,6 +22,24 @@ module JobIteration
# Defaults to nil which means that jobs will not be interrupted except on termination signal.
attr_accessor :max_job_runtime

# Set this to `true` to enforce that cursors be composed of objects capable
# of built-in (de)serialization by Active Job.
#
# JobIteration.enforce_serializable_cursors = true
#
# For more granular control, this can also be configured per job class, and
# is inherited by child classes.
#
# class MyJob < ActiveJob::Base
# include JobIteration::Iteration
# self.job_iteration_enforce_serializable_cursors = false
# # ...
# end
#
# Note that non-enforcement is deprecated and enforcement will be mandatory
# in version 2.0, at which point this config will go away.
attr_accessor :enforce_serializable_cursors

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
attr_accessor :interruption_adapter

Expand Down
105 changes: 53 additions & 52 deletions lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,17 @@ module Iteration
:total_time,
)

class CursorError < ArgumentError
attr_reader :cursor

def initialize(message, cursor:)
super(message)
@cursor = cursor
end

def message
"#{super} (#{inspected_cursor})"
end

private

def inspected_cursor
cursor.inspect
rescue NoMethodError
# For those brave enough to try to use BasicObject as cursor. Nice try.
Object.instance_method(:inspect).bind(cursor).call
end
end

included do |_base|
define_callbacks :start
define_callbacks :shutdown
define_callbacks :complete

class_attribute(
:job_iteration_enforce_serializable_cursors,
instance_writer: false,
instance_predicate: false,
default: JobIteration.enforce_serializable_cursors,
)
end

module ClassMethods
Expand Down Expand Up @@ -77,16 +62,25 @@ def initialize(*arguments)
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)

def serialize # @private
super.merge(
"cursor_position" => cursor_position,
iteration_job_data = {
"cursor_position" => cursor_position, # Backwards compatibility
"times_interrupted" => times_interrupted,
"total_time" => total_time,
)
}

begin
iteration_job_data["serialized_cursor_position"] = serialize_cursor(cursor_position)
rescue ActiveJob::SerializationError
raise if job_iteration_enforce_serializable_cursors
# No point in duplicating the deprecation warning from assert_valid_cursor!
end

super.merge(iteration_job_data)
end

def deserialize(job_data) # @private
super
self.cursor_position = job_data["cursor_position"]
self.cursor_position = cursor_position_from_job_data(job_data)
self.times_interrupted = job_data["times_interrupted"] || 0
self.total_time = job_data["total_time"] || 0
end
Expand Down Expand Up @@ -149,8 +143,7 @@ def iterate_with_enumerator(enumerator, arguments)
@needs_reenqueue = false

enumerator.each do |object_from_enumerator, index|
# Deferred until 2.0.0
# assert_valid_cursor!(index)
assert_valid_cursor!(index)

record_unit_of_work do
found_record = true
Expand Down Expand Up @@ -209,16 +202,21 @@ def build_enumerator(params, cursor:)
EOS
end

# The adapter must be able to serialize and deserialize the cursor back into an equivalent object.
# https://github.com/mperham/sidekiq/wiki/Best-Practices#1-make-your-job-parameters-small-and-simple
def assert_valid_cursor!(cursor)
return if serializable?(cursor)
serialize_cursor(cursor)
true
rescue ActiveJob::SerializationError
raise if job_iteration_enforce_serializable_cursors

raise CursorError.new(
"Cursor must be composed of objects capable of built-in (de)serialization: " \
"Strings, Integers, Floats, Arrays, Hashes, true, false, or nil.",
cursor: cursor,
)
Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(3))
The Enumerator returned by #{self.class.name}#build_enumerator yielded a cursor which is unsafe to serialize.
TBD MENTION SERIALIZERS!
This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!"
DEPRECATION_MESSAGE

false
end

def assert_implements_methods!
Expand Down Expand Up @@ -288,6 +286,25 @@ def handle_completed(completed)
raise "Unexpected thrown value: #{completed.inspect}"
end

def cursor_position_from_job_data(job_data)
if job_data.key?("serialized_cursor_position")
deserialize_cursor(job_data.fetch("serialized_cursor_position"))
else
# Backwards compatibility for
# - jobs interrupted before cursor serialization feature shipped, or
# - jobs whose cursor could not be serialized
job_data.fetch("cursor_position", nil)
end
end

def serialize_cursor(cursor)
ActiveJob::Arguments.serialize([cursor]).first
end

def deserialize_cursor(cursor)
ActiveJob::Arguments.deserialize([cursor]).first
end

def valid_cursor_parameter?(parameters)
# this condition is when people use the splat operator.
# def build_enumerator(*)
Expand All @@ -299,21 +316,5 @@ def valid_cursor_parameter?(parameters)
end
false
end

SIMPLE_SERIALIZABLE_CLASSES = [String, Integer, Float, NilClass, TrueClass, FalseClass].freeze
private_constant :SIMPLE_SERIALIZABLE_CLASSES
def serializable?(object)
# Subclasses must be excluded, hence not using is_a? or ===.
if object.instance_of?(Array)
object.all? { |element| serializable?(element) }
elsif object.instance_of?(Hash)
object.all? { |key, value| serializable?(key) && serializable?(value) }
else
SIMPLE_SERIALIZABLE_CLASSES.any? { |klass| object.instance_of?(klass) }
end
rescue NoMethodError
# BasicObject doesn't respond to instance_of, but we can't serialize it anyway
false
end
end
end
9 changes: 2 additions & 7 deletions test/integration/integration_behaviour.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,12 @@ module IntegrationBehaviour

test "unserializable corruption is prevented" do
skip_until_version("2.0.0")
# Cursors are serialized as JSON, but not all objects are serializable.
# time = Time.at(0).utc # => 1970-01-01 00:00:00 UTC
# json = JSON.dump(time) # => "\"1970-01-01 00:00:00 UTC\""
# string = JSON.parse(json) # => "1970-01-01 00:00:00 UTC"
# We serialized a Time, but it was deserialized as a String.
TimeCursorJob.perform_later
UnserializableCursorJob.perform_later
TerminateJob.perform_later
start_worker_and_wait

assert_equal(
JobIteration::Iteration::CursorError.name,
ActiveJob::SerializationError.name,
failed_job_error_class_name,
)
end
Expand Down
5 changes: 3 additions & 2 deletions test/support/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ def each_iteration(omg)
end
end

class TimeCursorJob < ActiveJob::Base
class UnserializableCursorJob < ActiveJob::Base
include JobIteration::Iteration
UnserializableCursor = Class.new

def build_enumerator(cursor:)
return [["item", Time.now]].to_enum if cursor.nil?
return [["item", UnserializableCursor.new]].to_enum if cursor.nil?

raise "This should never run; cursor is unserializable!"
end
Expand Down
Loading

0 comments on commit e96553e

Please sign in to comment.