Skip to content

Commit

Permalink
Add logging around attempts and state transitions
Browse files Browse the repository at this point in the history
  • Loading branch information
svevang committed Jul 20, 2023
1 parent 05d5462 commit 0e9b0ff
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
46 changes: 29 additions & 17 deletions app/models/publishing_pipeline_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,40 @@ def self.most_recent_state(podcast)
end

def self.start_pipeline!(podcast)
PublishingQueueItem.ensure_queued!(podcast)
attempt!(podcast)
Rails.logger.tagged("PublishingPipeLineState.start_pipeline!") do
PublishingQueueItem.ensure_queued!(podcast)
attempt!(podcast)
end
end

# None of the methods that grab locks are threadsafe if we assume that
# creating published artifacts is non-idempotent (e.g. creating remote Apple
# resources)
def self.attempt!(podcast, perform_later: true)
podcast.with_publish_lock do
next if PublishingQueueItem.unfinished_items(podcast).empty?
next if PublishingQueueItem.current_unfinished_item(podcast).present?

# Dedupe the work, grab the latest unfinished item in the queue
latest_unfinished_item = PublishingQueueItem.unfinished_items(podcast).first

PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: latest_unfinished_item, status: :created)

if perform_later
Rails.logger.info("Scheduling PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id})
PublishFeedJob.perform_later(podcast)
else
Rails.logger.info("Performing PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id})
PublishFeedJob.perform_now(podcast)
Rails.logger.tagged("PublishingPipeLineState.attempt!") do
podcast.with_publish_lock do
if PublishingQueueItem.unfinished_items(podcast).empty?
Rails.logger.info("Unfinished items empty, nothing to do", podcast_id: podcast.id)
next
end
if curr_running_item = PublishingQueueItem.current_unfinished_item(podcast)
Rails.logger.info("Podcast's PublishingQueueItem already has running pipeline", podcast_id: podcast.id, running_queue_item: curr_running_item.id)
next
end

# Dedupe the work, grab the latest unfinished item in the queue
latest_unfinished_item = PublishingQueueItem.unfinished_items(podcast).first

Rails.logger.info("Creating publishing pipeline for podcast #{podcast.id}", {podcast_id: podcast.id, queue_item_id: latest_unfinished_item.id})
PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: latest_unfinished_item, status: :created)

if perform_later
Rails.logger.info("Scheduling PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id})
PublishFeedJob.perform_later(podcast)
else
Rails.logger.info("Performing PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id})
PublishFeedJob.perform_now(podcast)
end
end
end
end
Expand Down Expand Up @@ -166,6 +177,7 @@ def self.state_transition(podcast, to_state)
podcast.with_publish_lock do
pqi = PublishingQueueItem.current_unfinished_item(podcast)
if pqi.present?
Rails.logger.info("Transitioning podcast #{podcast.id} publishing pipeline to state #{to_state}", {podcast_id: podcast.id, to_state: to_state})
PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: pqi, status: to_state)
else
Rails.logger.error("Podcast #{podcast.id} has no unfinished work, cannot transition state", {podcast_id: podcast.id, to_state: to_state})
Expand Down
1 change: 1 addition & 0 deletions app/models/publishing_queue_item.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def self.delivery_status
end

def self.ensure_queued!(podcast)
Rails.logger.info("Creating new PublishingQueueItem", {podcast_id: podcast.id})
create!(podcast: podcast)
end

Expand Down

0 comments on commit 0e9b0ff

Please sign in to comment.