Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nested iteration #295

Closed
wants to merge 1 commit into from

Conversation

fatkodima
Copy link
Contributor

Fixes #63.
This is a generalized version of #100, as suggested in #63 (comment).

Example usage:

class NestedIterationJob < ApplicationJob
  include JobIteration::Iteration

  def build_enumerator(cursor:)
    enumerator_builder.nested(
      [
        ->(cursor) { enumerator_builder.active_record_on_records(Shop.all, cursor: cursor) },
        ->(shop, cursor) { enumerator_builder.active_record_on_records(shop.products, cursor: cursor) },
        ->(_shop, product, cursor) { enumerator_builder.active_record_on_batch_relations(product.product_variants, cursor: cursor) }
      ],
      cursor: cursor
    )
  end

  def each_iteration(product_variants_relation)
    # do something
  end
end

cc @bdewater

@fatkodima
Copy link
Contributor Author

I signed CLA, a re-run is needed.

@bdewater
Copy link
Contributor

bdewater commented Nov 2, 2022

Thank you for tackling this 🙇 the API looks great!

Once CI runs you'll hit failures against Rails edge, you may want to get ahead of that by borrowing and extending #294

@Mangara Mangara mentioned this pull request Nov 2, 2022
Copy link
Contributor

@Mangara Mangara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this contribution! It's been on my wishlist for a long time, so it's great to finally see it become reality ❤️

@Mangara
Copy link
Contributor

Mangara commented Nov 4, 2022

Would you mind rebasing on the latest main? We merged the fix for the Rails edge tests, so CI should be 🟢 after that.

@fatkodima
Copy link
Contributor Author

Rebased.

@Mangara
Copy link
Contributor

Mangara commented Nov 4, 2022

There are some CI failures 🤔

1) Failure:
JobIteration::NestedEnumeratorTest#test_cursor_can_be_used_to_resume [/home/runner/work/job-iteration/job-iteration/test/unit/nested_enumerator_test.rb:48]:
--- expected
+++ actual
@@ -1 +1 @@
-[#<Comment id: 7, content: "lipstick 0 comment #0", product_id: 1>, [1, 7]]
+[#<Comment id: 2, content: "lipstick 1 comment #0", product_id: 2>, [2, 2]]

Is this test flaky? Or did one of the gem bumps break it?

@fatkodima
Copy link
Contributor Author

fatkodima commented Nov 4, 2022

@Mangara I identified what is the problem.

Do not know how I overlooked it, but this iterator isn't working correctly when created from an existing cursor when resuming iteration. Each cursor in the chain is independently advanced. For example, if the cursor was [1, 2], the next yielded record will have the cursor [2, 3] (instead of [1, 3] or [2, 1]).

And additionally the test was weak enough to identify it when running in isolation.

The simplest solution is to just store the previous cursor

self.cursor_position = index
when the job is interrupted. But the next time the same record will be processed once again. Is it a problem? In ActiveJob/Sidekiq each job already can be processed multiple times because of errors/etc, so people are already encouraged to write idempotent code in jobs.

We can slightly improve the situation with the same record reruns mentioned above, by storing previous cursor value for cursors with multiple values, and the current value having only one.

Upd: We need to start from the current cursor in enumerators (not the next), e.g. -

def count_of_processed_rows(cursor)
cursor.nil? ? 0 : cursor + 1
end

But the next time the same record will be processed once again. Is it a problem? In ActiveJob/Sidekiq each job already can be processed multiple times because of errors/etc, so people are already encouraged to write idempotent code in jobs.

comments_count.times.map { |n| { content: "#{product.name} comment ##{n}", product_id: product.id } }
end.flatten

Comment.insert_all!(comments)
end

def truncate_fixtures
Copy link
Contributor

@bdewater bdewater Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to change the line below to ActiveRecord::Base.connection.truncate(Product.table_name, Comment.table_name) to clean up this new model as well.

@fatkodima
Copy link
Contributor Author

@Mangara @bdewater Do you have opinions on my last comment? I could implement that changes regarding cursors separately, if agreed, and the nested iteration will become usable.

@bdewater
Copy link
Contributor

bdewater commented Nov 26, 2022

But the next time the same record will be processed once again. Is it a problem?

Is it different than how it is today? That doesn't seem a problem to me in that case.

I had a bit of trouble parsing this and the test cases, so I played with these to make sure I understand it correctly (and please correct me if I'm still wrong)

test "array cursor can be used to resume" do
  enumerator_builder = EnumeratorBuilder.new(mock)
  integers = [0, 1, 2]

  enum = enumerator_builder.array(integers, cursor: 0)

  assert_equal [1, 2], enum.map { |value, _cursor| value }
end

test "nested array cursor can be used to resume" do
  enumerator_builder = EnumeratorBuilder.new(mock)
  integers = [0, 1, 2]
  strings = [["0a", "0b", "0c"], ["1a", "1b", "1c"], ["2a", "2b", "2c"]]

  enum = enumerator_builder.nested(
    [
      ->(cursor) { enumerator_builder.array(integers, cursor: cursor) },
      ->(integer, cursor) { enumerator_builder.array(strings[integer], cursor: cursor) },
    ],
    cursor: [0, 0]
  )

  assert_equal ["1b", "1c", "2a", "2b", "2c"], enum.map { |value, _cursor| value }
end

The second test is correctly specified, but currently fails with:

Expected: ["1b", "1c", "2a", "2b", "2c"]
  Actual: ["1b", "1c", "2b", "2c"]

@fatkodima
Copy link
Contributor Author

Is it different than how it is today?

Yes. Currently (main branch), resumed iteration will start from the cursor + 1 (actually, passed cursors value is a value from the latest previous iteration).

The first test case has assert_equal [1, 2], enum.map { |value, _cursor| value }, but with the new behavior of starting from the latest cursor (without +1) will be assert_equal [0, 1, 2], enum.map { |value, _cursor| value }.

Second test case: since we are starting from [0, 0], the expected next value should be from [0, 1], isn't it? - that's why we need to restart from the previous cursor value because otherwise some values will be incorrectly skipped when doing nested iteration.

Not sure if it is more clear now. Feel free to ask, and I will try rephrase and somehow improve my explanation.


enum.each do |item, cursor_value|
if index == @cursor.size - 1
yield item, current_cursor + [cursor_value]
Copy link
Contributor

@bdewater bdewater Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been stepping through the debugger a few times and what stood out to me is cursor = @cursor[index], the resumed cursor value stays 'stuck' for subsequent iterations, but when not resuming is initialized as an array of nils.

In my nested array example this means skipping over "2a" but also "3a" if you extend the pattern as such:

integers = [0, 1, 2, 3]
strings = [["0a", "0b", "0c"], ["1a", "1b", "1c"], ["2a", "2b", "2c"], ["3a", "3b", "3c"]]

This seems to do the trick unless I'm missing something (it's getting late here)?

Suggested change
yield item, current_cursor + [cursor_value]
yield item, current_cursor + [cursor_value]
@cursor[index] = nil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to do this inside each, would work under cursor = @cursor[index] as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fix the mentioned issue.
But for the cursor like [0, 1], it will start from [1, something], because when the iteration is resumed, it starts from the cursor + 1. So 0b and 0c will be incorrectly skipped.

I would say that this PR is correctly implemented, but works incorrectly, because as I mentioned previously, we need to resume from the same cursor value each time (without +1).

A separate PR with changes like fatkodima/sidekiq-iteration@5d3fe18 needs to be made for this PR to start working and not skipping values.

Copy link
Contributor

@bdewater bdewater Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess what I am not grasping is why the cursor logic throughout this library needs to change to make nesting work. AFAIK its nil to start at the beginning of a collection, and a non-nil cursor to resume after. With the generic way nested enumerators are set up I don't see why this should change? The individual enumerators shouldn't need to care if it's nested or not, the nested enumerator should feed the correct value to them and handle the nesting level (index).

To whiteboard the possibly cursor values for ["0a", "0b", "0c"] and the item processed in each_iteration:

  • cursor: nil, item: "0a"
  • cursor: 0, item: "0b"
  • cursor: 1, item: "0c"

and nested with: [["0a", "0b", "0c"], ["1a", "1b", "1c"], ["2a", "2b", "2c"]]:

  • cursor: [nil, nil], item: "0a"
  • cursor: [nil, 0], item: "0b"
  • cursor: [nil, 1], item: "0c"
  • cursor: [0, nil], item: "1a"
  • cursor: [0, 0], item: "1b"
  • cursor: [0, 1], item: "1c"
  • cursor: [1, nil], item: "2a"

etc. It seems to me resetting the right cursor index to nil at the correct time should make the above possible and make it work without changes to the rest of the library.

Having debugged some more I think it's also possible to refactor out current_cursor in the iterate method this is needed to serialize the cursor, and inspecting that I am beginning to see what you described :) Inside JobIteration::Iteration#iterate_with_enumerator for value object_from_enumerator="1a", index=[1, 0] and that would be serialized as cursor_position. A more integration-style test asserting that pausing and resuming a nested job will cover all the records would be nice. Will play with this a bit more!

Either way - thanks for taking the time to work on this and answering my questions 🙇‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to get a PR up in the next few days.

Looking forward for it 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for nested iteration
3 participants