Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry on error_apple non-terminal error #1148

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions app/models/publishing_pipeline_state.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
class PublishingPipelineState < ApplicationRecord
TERMINAL_STATUSES = [:complete, :error, :expired].freeze
TERMINAL_FAILURE_STATUSES = [:error, :expired].freeze
FAILURE_STATUSES = [:error, :expired, :error_apple].freeze
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, makes sense to have this grouping

UNIQUE_STATUSES = TERMINAL_STATUSES + [:created, :started]

# Handle the max timout for a publishing pipeline: Pub RSS job + Pub Apple job + a few extra minutes of flight
Expand All @@ -19,7 +19,7 @@ class PublishingPipelineState < ApplicationRecord
scope :latest_failed_pipelines, -> {
# Grab the latest attempted Publishing Item AND the latest failed Pub Item.
# If that is a non-null intersection, then we have a current/latest+failed pipeline.
where(publishing_queue_item_id: PublishingQueueItem.latest_attempted.latest_failed.select(:id))
where(publishing_queue_item_id: PublishingQueueItem.latest_attempted.latest_failed.order(id: :asc).select(:id))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that order(:id) appropriate to add to either the latest_attempted or latest_failed scopes?

Also, why :asc? That means the latest will be at the end? Wouldn't the most recently failed make sense as the first?

}

scope :latest_by_queue_item, -> {
Expand Down Expand Up @@ -173,8 +173,16 @@ def self.expire_pipelines!
end
end

def self.latest_failed_publishing_queue_items
PublishingQueueItem.where(id: latest_failed_pipelines.select(:publishing_queue_item_id).distinct)
end

def self.latest_failed_podcasts
Podcast.where(id: latest_failed_publishing_queue_items.select(:podcast_id).distinct)
end

def self.retry_failed_pipelines!
Podcast.where(id: latest_failed_pipelines.select(:podcast_id).distinct).each do |podcast|
latest_failed_podcasts.each do |podcast|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, nicer to have a method for this.

Rails.logger.tagged("PublishingPipeLineState.retry_failed_pipelines!", "Podcast:#{podcast.id}") do
start_pipeline!(podcast)
end
Expand Down
2 changes: 1 addition & 1 deletion app/models/publishing_queue_item.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class PublishingQueueItem < ApplicationRecord
latest_by_status(PublishingPipelineState::TERMINAL_STATUSES)
}
scope :latest_failed, -> {
latest_by_status(PublishingPipelineState::TERMINAL_FAILURE_STATUSES)
latest_by_status(PublishingPipelineState::FAILURE_STATUSES)
}

scope :latest_by_status, ->(status) {
Expand Down
47 changes: 47 additions & 0 deletions test/models/publishing_pipeline_state_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,37 @@
end
end

describe ".latest_failed_pipelines" do
it "returns the latest failed pipelines including intermediate and terminal errors" do
# Create a publishing queue item and associated pipeline state
pqi1 = PublishingQueueItem.ensure_queued!(podcast)
_s1 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: pqi1)
PublishingPipelineState.error_apple!(podcast)
PublishingPipelineState.complete!(podcast)

# Verify that the intermediate error is included in the latest failed pipelines
assert_equal [podcast], PublishingPipelineState.latest_failed_podcasts
assert_equal ["created", "error_apple", "complete"], PublishingPipelineState.latest_failed_pipelines.where(podcast: podcast).map(&:status)

# Create another publishing queue item and associated pipeline state
pqi2 = PublishingQueueItem.ensure_queued!(podcast)
_s2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: pqi2)
PublishingPipelineState.error!(podcast)

# Verify that the terminal error is included in the latest failed pipelines
assert_equal [podcast], PublishingPipelineState.latest_failed_podcasts
assert_equal ["created", "error"], PublishingPipelineState.latest_failed_pipelines.where(podcast: podcast).map(&:status)

# Verify that a successful pipeline is not included in the latest failed pipelines
pqi3 = PublishingQueueItem.ensure_queued!(podcast)
_s3 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: pqi3)
PublishingPipelineState.complete!(podcast)

assert_equal [].sort, PublishingPipelineState.latest_failed_pipelines.where(podcast: podcast)
assert ["created", "complete"], PublishingPipelineState.latest_pipelines.where(podcast: podcast).pluck(:status)
end
end

describe ".retry_failed_pipelines!" do
it "should retry failed pipelines" do
PublishingPipelineState.start_pipeline!(podcast)
Expand All @@ -200,6 +231,22 @@
assert_equal ["created"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort
end

it "retries pipelines with intermediate error_apple and non-error terminal status" do
PublishingPipelineState.start_pipeline!(podcast)
assert_equal ["created"], PublishingPipelineState.latest_pipeline(podcast).map(&:status)

# it fails
PublishingPipelineState.error_apple!(podcast)
assert_equal ["created", "error_apple"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort

PublishingPipelineState.complete!(podcast)
assert_equal ["created", "error_apple", "complete"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort

# it retries
PublishingPipelineState.retry_failed_pipelines!
assert_equal ["created"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort
end

it "ignores previously errored pipelines back in the queue" do
# A failed pipeline
PublishingPipelineState.start_pipeline!(podcast)
Expand Down
14 changes: 14 additions & 0 deletions test/models/publishing_queue_item_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@
assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast)
assert_equal [].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast)
end

it "includes intermediate states like error_apple" do
pqi1 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item
PublishingPipelineState.error_apple!(podcast)

assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast)
assert_equal [pqi1].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast)
assert_equal [podcast], PublishingPipelineState.latest_failed_podcasts

_pqi2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item

assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast)
assert_equal [].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast)
end
end

describe ".all_unfinished_items" do
Expand Down
Loading