Skip to content

Commit

Permalink
Better error handling for workflow_request() (#379)
Browse files Browse the repository at this point in the history
* Better error handling for async_request
  • Loading branch information
MetRonnie authored Aug 10, 2023
1 parent 2b866af commit 3fdfafa
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 151 deletions.
9 changes: 9 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ creating a new release entry be sure to copy & paste the span tag with the
`actions:bind` attribute, which is used by a regex to find the text to be
updated. Only the first match gets replaced, so it's fine to leave the old
ones in. -->
-------------------------------------------------------------------------------
## __cylc-uiserver-1.3.1 (<span actions:bind='release-date'>Upcoming</span>)__

<!-- [Updated cylc-ui to x.y.z](https://github.com/cylc/cylc-ui/blob/master/CHANGES.md) -->

### Fixes

[#379](https://github.com/cylc/cylc-uiserver/pull/379) - Fixed lack of info
for errors recorded in logs.

-------------------------------------------------------------------------------
## __cylc-uiserver-1.3.0 (<span actions:bind='release-date'>Released 2023-07-21</span>)__
Expand Down
79 changes: 34 additions & 45 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import time
from typing import Dict, Optional, Set

from cylc.flow.exceptions import WorkflowStopped
from cylc.flow.id import Tokens
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.network import MSG_TIMEOUT
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.data_store_mgr import (
EDGES, DATA_TEMPLATE, ALL_DELTAS, DELTAS_MAP, WORKFLOW,
Expand Down Expand Up @@ -337,7 +337,7 @@ def _reconcile_update(self, topic, delta, w_id):
),
self.loop
)
_, new_delta_msg = future.result(self.RECONCILE_TIMEOUT)
new_delta_msg = future.result(self.RECONCILE_TIMEOUT)
new_delta = DELTAS_MAP[topic]()
new_delta.ParseFromString(new_delta_msg)
self._clear_data_field(w_id, topic)
Expand Down Expand Up @@ -366,52 +366,41 @@ async def _entire_workflow_update(

# Request new data
req_method = 'pb_entire_workflow'
req_kwargs = (
{
'client': info['req_client'],
'command': req_method,
'req_context': w_id
}

requests = {
w_id: workflow_request(
client=info['req_client'], command=req_method, log=self.log
)
for w_id, info in self.workflows_mgr.workflows.items()
if info.get('req_client') # skip stopped workflows
and (not ids or w_id in ids)
}
results = await asyncio.gather(
*requests.values(), return_exceptions=True
)

gathers = [
workflow_request(**kwargs)
for kwargs in req_kwargs
if not ids or kwargs['req_context'] in ids
]
items = await asyncio.gather(*gathers, return_exceptions=True)

successes: Set[str] = set()
for item in items:
if isinstance(item, Exception):
self.log.exception(
'Failed to update entire local data-store '
'of a workflow', exc_info=item
)
else:
w_id, result = item
if result is not None and result != MSG_TIMEOUT:
pb_data = PB_METHOD_MAP[req_method]()
pb_data.ParseFromString(result)
new_data = deepcopy(DATA_TEMPLATE)
for field, value in pb_data.ListFields():
if field.name == WORKFLOW:
new_data[field.name].CopyFrom(value)
new_data['delta_times'] = {
key: value.last_updated
for key in DATA_TEMPLATE
}
continue
new_data[field.name] = {n.id: n for n in value}
self.data[w_id] = new_data
successes.add(w_id)
else:
for w_id, result in zip(requests, results):
if isinstance(result, Exception):
if not isinstance(result, WorkflowStopped):
self.log.error(
f'Error: communicating with {w_id} - {result}'
'Failed to update entire local data-store '
f'of a workflow: {result}'
)

continue
pb_data = PB_METHOD_MAP[req_method]()
pb_data.ParseFromString(result)
new_data = deepcopy(DATA_TEMPLATE)
for field, value in pb_data.ListFields():
if field.name == WORKFLOW:
new_data[field.name].CopyFrom(value)
new_data['delta_times'] = {
key: value.last_updated
for key in DATA_TEMPLATE
}
continue
new_data[field.name] = {n.id: n for n in value}
self.data[w_id] = new_data
successes.add(w_id)
return successes

def _update_contact(
Expand Down Expand Up @@ -488,12 +477,12 @@ def _get_status_msg(self, w_id: str, is_active: bool) -> str:
if is_active:
# this will get overridden when we sync with the workflow
# set a sensible default here incase the sync takes a while
return 'Running'
return 'running'
w_id = Tokens(w_id)['workflow']
db_file = Path(get_workflow_srv_dir(w_id), WorkflowFiles.Service.DB)
if db_file.exists():
# the workflow has previously run
return 'Stopped'
return 'stopped'
else:
# the workflow has not yet run
return 'Not yet run'
return 'not yet run'
4 changes: 3 additions & 1 deletion cylc/uiserver/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Test code and fixtures."""

import asyncio
from getpass import getuser
import inspect
import logging
Expand Down Expand Up @@ -47,6 +46,7 @@ class AsyncClientFixture(WorkflowRuntimeClient):
pattern = zmq.REQ
host = ''
port = 0
workflow = 'myflow'

def __init__(self):
self.returns = None
Expand All @@ -57,6 +57,8 @@ def will_return(self, returns):
async def async_request(
self, command, args=None, timeout=None, req_meta=None
):
if isinstance(self.returns, Exception):
raise self.returns
if (
inspect.isclass(self.returns)
and issubclass(self.returns, Exception)
Expand Down
43 changes: 30 additions & 13 deletions cylc/uiserver/tests/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import pytest
import zmq

from cylc.flow.exceptions import ClientTimeout, WorkflowStopped
from cylc.flow.id import Tokens
from cylc.flow.network import (MSG_TIMEOUT, ZMQSocketBase)
from cylc.flow.network import ZMQSocketBase
from cylc.flow.workflow_files import ContactFileFields as CFF

from cylc.uiserver.data_store_mgr import DataStoreMgr
Expand Down Expand Up @@ -69,7 +70,7 @@ async def test_entire_workflow_update_ignores_timeout_message(
receives a ``MSG_TIMEOUT`` message.
"""
w_id = 'workflow_id'
async_client.will_return(MSG_TIMEOUT)
async_client.will_return(ClientTimeout)

# Set the client used by our test workflow.
data_store_mgr.workflows_mgr.workflows[w_id] = {
Expand All @@ -81,7 +82,7 @@ async def test_entire_workflow_update_ignores_timeout_message(
# calling ``workflow_request``.
await data_store_mgr._entire_workflow_update()

# When a ``MSG_TIMEOUT`` happens, the ``DataStoreMgr`` object ignores
# When a ClientTimeout happens, the ``DataStoreMgr`` object ignores
# that message. So it means that its ``.data`` dictionary MUST NOT
# have an entry for the Workflow ID.
assert w_id not in data_store_mgr.data
Expand All @@ -90,7 +91,7 @@ async def test_entire_workflow_update_ignores_timeout_message(
async def test_entire_workflow_update_gather_error(
async_client: AsyncClientFixture,
data_store_mgr: DataStoreMgr,
caplog,
caplog: pytest.LogCaptureFixture,
):
"""
Test that if ``asyncio.gather`` in ``entire_workflow_update``
Expand All @@ -102,8 +103,7 @@ async def test_entire_workflow_update_gather_error(
#
# This test wants to confirm this is not raised, but instead the
# error is returned, so that we can inspect, log, etc.
error_type = ValueError
async_client.will_return(error_type)
async_client.will_return(ValueError)

# Set the client used by our test workflow.
data_store_mgr.workflows_mgr.workflows['workflow_id'] = {
Expand All @@ -113,16 +113,33 @@ async def test_entire_workflow_update_gather_error(
# Call the entire_workflow_update function.
# This should use the client defined above (``async_client``) when
# calling ``workflow_request``.
caplog.clear()
await data_store_mgr._entire_workflow_update()
assert caplog.record_tuples == [
(
'cylc',
40,
'Failed to update entire local data-store of a workflow'
)
('cylc', 40, 'Error communicating with myflow'),
('cylc', 40, 'x'),
('cylc', 40,
'Failed to update entire local data-store of a workflow: x'),
]
exc_info = caplog.records[1].exc_info
assert exc_info and exc_info[0] == ValueError


async def test_entire_workflow_update__stopped_workflow(
async_client: AsyncClientFixture,
data_store_mgr: DataStoreMgr,
caplog: pytest.LogCaptureFixture,
):
"""Test that DataStoreMgr._entire_workflow_update() handles a stopped
workflow reasonably."""
exc = WorkflowStopped('myflow')
async_client.will_return(exc)
data_store_mgr.workflows_mgr.workflows['workflow_id'] = {
'req_client': async_client
}
await data_store_mgr._entire_workflow_update()
assert caplog.record_tuples == [
('cylc', 40, f'WorkflowStopped: {exc}'),
]
assert caplog.records[0].exc_info[0] == error_type


async def test_register_workflow(
Expand Down
72 changes: 27 additions & 45 deletions cylc/uiserver/tests/test_workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from itertools import product
import logging
from random import random
from typing import Type

import pytest

Expand All @@ -40,49 +41,27 @@

# --- workflow_request

async def test_workflow_request_client_timeout(
async_client: AsyncClientFixture):
async_client.will_return(ClientTimeout)
ctx, msg = await workflow_request(client=async_client, command='')
assert not ctx
assert 'timeout' in msg.lower() # type: ignore[attr-defined]


@pytest.mark.parametrize(
'exc', [ClientError, ClientTimeout]
)
async def test_workflow_request_client_error(
async_client: AsyncClientFixture, caplog):
caplog.set_level(logging.CRITICAL, logger='cylc')
async_client.will_return(ClientError)
ctx, msg = await workflow_request(client=async_client, command='')
assert not ctx
assert not msg
exc: Type[Exception],
async_client: AsyncClientFixture,
caplog: pytest.LogCaptureFixture
):
caplog.set_level(logging.ERROR, logger='cylc')
logger = logging.getLogger('cylc')
async_client.will_return(exc)
with pytest.raises(exc):
await workflow_request(client=async_client, command='', log=logger)
assert exc.__name__ in caplog.text


@pytest.mark.parametrize(
"returns,command,req_context,expected_ctx,expected_msg",
[
pytest.param(
42, 'cmd', None, 'cmd', 42
),
pytest.param(
42, '', None, '', 42
),
pytest.param(
42, 'cmd', 'some-context', 'some-context', 42
)
])
async def test_workflow_request(
async_client: AsyncClientFixture,
returns,
command,
req_context,
expected_ctx,
expected_msg
):
async_client.will_return(returns)
ctx, msg = await workflow_request(
client=async_client, command=command, req_context=req_context)
assert expected_ctx == ctx
assert expected_msg == msg
async def test_workflow_request(async_client: AsyncClientFixture):
"""Test normal response of workflow_request matches async_request"""
async_client.will_return(42)
res = await workflow_request(client=async_client, command='')
assert res == 42


# --- WorkflowsManager
Expand Down Expand Up @@ -226,7 +205,7 @@ async def test_workflow_state_change_uuid(


async def test_multi_request(
workflows_manager,
workflows_manager: WorkflowsManager,
async_client: AsyncClientFixture
):
workflow_id = 'multi-request-workflow'
Expand All @@ -251,17 +230,18 @@ async def test_multi_request(
response = await workflows_manager.multi_request(
'', [workflow_id], None, multi_args)
assert len(response) == 1
assert value == response[0]
assert response[0] == res


async def test_multi_request_gather_errors(
workflows_manager,
async_client: AsyncClientFixture,
caplog
caplog: pytest.LogCaptureFixture
):
workflow_id = 'gather-error-workflow'
error_type = ValueError
async_client.will_return(error_type)
async_client.workflow = workflow_id

workflows_manager.workflows[workflow_id] = {
'req_client': async_client
Expand All @@ -270,9 +250,11 @@ async def test_multi_request_gather_errors(
caplog.clear()
await workflows_manager.multi_request('', [workflow_id], None, None)
assert caplog.record_tuples == [
('cylc', 40, 'Failed to send requests to multiple workflows')
('cylc', 40, f'Error communicating with {workflow_id}'),
('cylc', 40, 'x'),
]
assert caplog.records[0].exc_info[0] == error_type
exc_info = caplog.records[1].exc_info
assert exc_info and exc_info[0] == error_type


async def test_crashed_workflow(one_workflow_aiter, caplog, uis_caplog):
Expand Down
Loading

0 comments on commit 3fdfafa

Please sign in to comment.