Skip to content

Commit

Permalink
Avoid crashing flows when job watch exits but conatiner is still runn…
Browse files Browse the repository at this point in the history
…ing (#15728)
  • Loading branch information
soamicharan authored Oct 16, 2024
1 parent 58c881b commit 3500c2f
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,20 @@ async def _watch_job(
if not first_container_status:
logger.error(f"Job {job_name!r}: No pods found for job.")
return -1

# In some cases, the pod will still be running at this point.
# We can assume that the job is still running and return 0 to prevent marking the flow run as crashed
elif first_container_status.state and (
first_container_status.state.running is not None
or first_container_status.state.waiting is not None
):
logger.warning(
f"The worker's watch for job {job_name!r} has exited early. Check the logs for more information."
" The job is still running, but the worker will not wait for it to complete."
)
# Return 0 to prevent marking the flow run as crashed
return 0

# In some cases, such as spot instance evictions, the pod will be forcibly
# terminated and not report a status correctly.
elif (
Expand Down
2 changes: 2 additions & 0 deletions src/integrations/prefect-kubernetes/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dev = [
"pytest-asyncio",
"pytest",
"pytest-env",
"pytest-timeout",
"pytest-xdist",
]

Expand Down Expand Up @@ -80,3 +81,4 @@ show_missing = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
env = ["PREFECT_TEST_MODE=1"]
timeout = 30
63 changes: 63 additions & 0 deletions src/integrations/prefect-kubernetes/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2430,6 +2430,17 @@ async def test_streaming_pod_logs_timeout_warns(
mock_pod,
mock_job,
):
job_pod = MagicMock(spec=kubernetes_asyncio.client.V1Pod)
mock_container_status = MagicMock(
spec=kubernetes_asyncio.client.V1ContainerStatus
)
mock_container_status.state.running = None
mock_container_status.state.waiting = None
job_pod.status.container_statuses = [mock_container_status]
mock_core_client.return_value.list_namespaced_pod.return_value.items = [
job_pod
]

async def mock_stream(*args, **kwargs):
mock_job.status.completion_time = pendulum.now("utc").timestamp()
stream = [
Expand Down Expand Up @@ -2516,6 +2527,16 @@ async def mock_log_stream(*args, **kwargs):

mock_core_client.return_value.read_namespaced_pod_log.return_value.stream = mock_log_stream
mock_watch.return_value.stream = mock.Mock(side_effect=mock_stream)
job_pod = MagicMock(spec=kubernetes_asyncio.client.V1Pod)
mock_container_status = MagicMock(
spec=kubernetes_asyncio.client.V1ContainerStatus
)
mock_container_status.state.running = None
mock_container_status.state.waiting = None
job_pod.status.container_statuses = [mock_container_status]
mock_core_client.return_value.list_namespaced_pod.return_value.items = [
job_pod
]

default_configuration.job_watch_timeout_seconds = 100
async with KubernetesWorker(work_pool_name="test") as k8s_worker:
Expand Down Expand Up @@ -2595,6 +2616,8 @@ async def test_watch_stops_after_backoff_limit_reached(
spec=kubernetes_asyncio.client.V1ContainerStatus
)
mock_container_status.state.terminated.exit_code = 137
mock_container_status.state.running = None
mock_container_status.state.waiting = None
job_pod.status.container_statuses = [mock_container_status]
mock_core_client.return_value.list_namespaced_pod.return_value.items = [
job_pod
Expand Down Expand Up @@ -2679,6 +2702,8 @@ async def test_watch_handles_pod_without_exit_code(
# The container may exist but because it has been forcefully terminated
# it will not have an exit code.
mock_container_status.state.terminated = None
mock_container_status.state.running = None
mock_container_status.state.waiting = None
job_pod.status.container_statuses = [mock_container_status]
mock_core_client.return_value.list_namespaced_pod.return_value.items = [
job_pod
Expand Down Expand Up @@ -2761,6 +2786,44 @@ async def mock_stream(*args, **kwargs):
]
)

async def test_watch_early_exit(
self,
default_configuration: KubernetesWorkerJobConfiguration,
flow_run,
mock_batch_client,
mock_core_client,
mock_watch,
mock_job,
mock_pod,
):
mock_batch_client.return_value.read_namespaced_job.return_value.status.completion_time = None
job_pod = MagicMock(spec=kubernetes_asyncio.client.V1Pod)
job_pod.status.phase = "Running"
mock_container_status = MagicMock(
spec=kubernetes_asyncio.client.V1ContainerStatus
)
mock_container_status.state.running = MagicMock(
start_time=pendulum.now("utc")
)
job_pod.status.container_statuses = [mock_container_status]
mock_core_client.return_value.list_namespaced_pod.return_value.items = [
job_pod
]

async def mock_stream(*args, **kwargs):
if kwargs["func"] == mock_core_client.return_value.list_namespaced_pod:
yield {"object": mock_pod, "type": "ADDED"}

if kwargs["func"] == mock_batch_client.return_value.list_namespaced_job:
raise Exception("This is a test exception")

mock_watch.return_value.stream = mock.Mock(side_effect=mock_stream)

async with KubernetesWorker(work_pool_name="test") as k8s_worker:
result = await k8s_worker.run(flow_run, default_configuration)

assert result.status_code == 0

@pytest.fixture
async def mock_events(self, mock_core_client):
mock_core_client.return_value.list_namespaced_event.return_value = (
Expand Down

0 comments on commit 3500c2f

Please sign in to comment.