Skip to content

Commit

Permalink
Add debug logging
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Sep 18, 2024
1 parent 07182cf commit 78127b6
Showing 1 changed file with 29 additions and 3 deletions.
32 changes: 29 additions & 3 deletions app/models/miq_server/worker_management/kubernetes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,51 @@ def sync_starting_workers
# Get a list of pods that aren't currently assigned to MiqWorker records
pods_without_workers = current_pods.keys - MiqWorker.server_scope.pluck(:system_uid).compact

_log.debug("AG: pods_without_workers: [#{pods_without_workers.join(", ")}]")

# Non-rails workers cannot set their own miq_worker record to started once they
# have finished initializing. Check for any starting non-rails workers whose
# pod is running and mark the miq_worker as started.
starting.reject(&:rails_worker?).each do |worker|
# If the current worker doesn't have a system_uid assigned then find the first
# pod available for our worker type and link them up.
if worker.system_uid.nil?
_log.debug("AG: worker [#{worker.type}] [#{worker.id}] has no system_uid")
system_uid = pods_without_workers.detect { |pod_name| pod_name.start_with?(worker.worker_deployment_name) }
if system_uid
_log.debug("AG: worker [#{worker.type}] [#{worker.id}] found pod [#{system_uid}]")
# We have found a pod for the current worker record so remove the pod from
# the list of pods without workers and set the pod name as the system_uid
# for the current worker record.
pods_without_workers.delete(system_uid)
worker.update!(:system_uid => system_uid)
else
_log.debug("AG: worker [#{worker.type}] [#{worker.id}] no pods")
# If we haven't found a pod for this worker record then we need to check
# whether it has been starting for too long and should be marked as
# not responding.
stop_worker(worker, MiqServer::WorkerManagement::NOT_RESPONDING) if exceeded_heartbeat_threshold?(worker)
if exceeded_heartbeat_threshold?(worker)
_log.debug("AG: worker [#{worker.type}] [#{worker.id}] no pod for too long, stopping worker")
stop_worker(worker, MiqServer::WorkerManagement::NOT_RESPONDING)
end
# Without a valid system_uid we cannot run any further logic in this
# loop.
next
end
end

worker_pod = current_pods[worker.system_uid]
next if worker_pod.nil?
if worker_pod.nil?
_log.debug("AG: no pod for worker [#{worker.type}] [#{worker.id}] [#{worker.system_uid}]")
next
end

worker.update!(:status => MiqWorker::STATUS_STARTED) if worker_pod[:running]
if worker_pod[:running]
_log.debug("AG: worker [#{worker.type}] [#{worker.id}] [#{worker.system_uid}], pod is running, setting to started")
worker.update!(:status => MiqWorker::STATUS_STARTED)
else
_log.debug("AG: worker [#{worker.type}] [#{worker.id}] [#{worker.system_uid}], pod is not running")
end
end

starting.reload
Expand Down Expand Up @@ -249,6 +265,8 @@ def collect_initial(resource = :pods)

def watch_for_events(resource, resource_version)
orchestrator.send(:"watch_#{resource}", resource_version).each do |event|
_log.debug("AG: #{resource}: event type: [#{event.type}] object: [#{event.object}]")

case event.type.downcase
when "added", "modified"
send(:"save_#{resource.to_s.singularize}", event.object)
Expand All @@ -274,15 +292,19 @@ def save_deployment(deployment)
name = deployment.metadata.name
new_hash = Concurrent::Hash.new
new_hash[:spec] = deployment.spec.to_h

_log.debug("AG: deployment: [#{new_hash}]")
current_deployments[name] ||= new_hash
current_deployments[name].merge!(new_hash)
end

def delete_deployment(deployment)
_log.debug("AG: deleting deployment: [#{deployment.metadata.name}]")
current_deployments.delete(deployment.metadata.name)
end

def save_pod(pod)
_log.debug("AG: save pod [#{pod}]")
return unless pod.status.containerStatuses

ch = Concurrent::Hash.new
Expand All @@ -291,12 +313,16 @@ def save_pod(pod)
ch[:container_restarts] = pod.status.containerStatuses.sum { |cs| cs.restartCount.to_i }
ch[:running] = pod.status.phase == "Running" && pod.status.containerStatuses.all? { |cs| cs.ready && cs.started }

_log.debug("AG: pod: [#{ch}]")

name = pod.metadata.name
current_pods[name] ||= ch
current_pods[name].merge!(ch)
end

def delete_pod(pod)
_log.debug("AG: deleting pod [#{pod.metadata.name}]")

current_pods.delete(pod.metadata.name)
end

Expand Down

0 comments on commit 78127b6

Please sign in to comment.