diff --git a/app/models/publishing_pipeline_state.rb b/app/models/publishing_pipeline_state.rb index 4e64f3c77..956fa979b 100644 --- a/app/models/publishing_pipeline_state.rb +++ b/app/models/publishing_pipeline_state.rb @@ -88,29 +88,40 @@ def self.most_recent_state(podcast) end def self.start_pipeline!(podcast) - PublishingQueueItem.ensure_queued!(podcast) - attempt!(podcast) + Rails.logger.tagged("PublishingPipeLineState.start_pipeline!") do + PublishingQueueItem.ensure_queued!(podcast) + attempt!(podcast) + end end # None of the methods that grab locks are threadsafe if we assume that # creating published artifacts is non-idempotent (e.g. creating remote Apple # resources) def self.attempt!(podcast, perform_later: true) - podcast.with_publish_lock do - next if PublishingQueueItem.unfinished_items(podcast).empty? - next if PublishingQueueItem.current_unfinished_item(podcast).present? - - # Dedupe the work, grab the latest unfinished item in the queue - latest_unfinished_item = PublishingQueueItem.unfinished_items(podcast).first - - PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: latest_unfinished_item, status: :created) - - if perform_later - Rails.logger.info("Scheduling PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id}) - PublishFeedJob.perform_later(podcast) - else - Rails.logger.info("Performing PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id}) - PublishFeedJob.perform_now(podcast) + Rails.logger.tagged("PublishingPipeLineState.attempt!") do + podcast.with_publish_lock do + if PublishingQueueItem.unfinished_items(podcast).empty? + Rails.logger.info("Unfinished items empty, nothing to do", podcast_id: podcast.id) + next + end + if curr_running_item = PublishingQueueItem.current_unfinished_item(podcast) + Rails.logger.info("Podcast's PublishingQueueItem already has running pipeline", podcast_id: podcast.id, running_queue_item: curr_running_item.id) + next + end + + # Dedupe the work, grab the latest unfinished item in the queue + latest_unfinished_item = PublishingQueueItem.unfinished_items(podcast).first + + Rails.logger.info("Creating publishing pipeline for podcast #{podcast.id}", {podcast_id: podcast.id, queue_item_id: latest_unfinished_item.id}) + PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: latest_unfinished_item, status: :created) + + if perform_later + Rails.logger.info("Scheduling PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id}) + PublishFeedJob.perform_later(podcast) + else + Rails.logger.info("Performing PublishFeedJob for podcast #{podcast.id}", {podcast_id: podcast.id}) + PublishFeedJob.perform_now(podcast) + end end end end @@ -166,6 +177,7 @@ def self.state_transition(podcast, to_state) podcast.with_publish_lock do pqi = PublishingQueueItem.current_unfinished_item(podcast) if pqi.present? + Rails.logger.info("Transitioning podcast #{podcast.id} publishing pipeline to state #{to_state}", {podcast_id: podcast.id, to_state: to_state}) PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: pqi, status: to_state) else Rails.logger.error("Podcast #{podcast.id} has no unfinished work, cannot transition state", {podcast_id: podcast.id, to_state: to_state}) diff --git a/app/models/publishing_queue_item.rb b/app/models/publishing_queue_item.rb index f98af237c..2fe567288 100644 --- a/app/models/publishing_queue_item.rb +++ b/app/models/publishing_queue_item.rb @@ -18,6 +18,7 @@ def self.delivery_status end def self.ensure_queued!(podcast) + Rails.logger.info("Creating new PublishingQueueItem", {podcast_id: podcast.id}) create!(podcast: podcast) end