Skip to content

Commit

Permalink
Fix cylc play service
Browse files Browse the repository at this point in the history
- cylc version arg was not working
- could not play multiple workflows
  • Loading branch information
MetRonnie committed Nov 20, 2023
1 parent e2a4ffa commit cd6b6fa
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 55 deletions.
73 changes: 20 additions & 53 deletions cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,6 @@ def snake_to_kebab(snake):
raise TypeError(type(snake))


def check_cylc_version(version):
"""Check the provided Cylc version is available on the CLI.
Sets CYLC_VERSION=version and tests the result of cylc --version
to make sure the requested version is installed and selectable via
the CYLC_VERSION environment variable.
"""
proc = Popen(
['cylc', '--version'],
env={**os.environ, 'CYLC_VERSION': version},
stdin=DEVNULL,
stdout=PIPE,
stderr=PIPE,
text=True
)
ret = proc.wait(timeout=5)
out, err = proc.communicate()
return ret or out.strip() == version


def _build_cmd(cmd: List, args: Dict) -> List:
"""Add args to command.
Expand Down Expand Up @@ -289,32 +269,19 @@ async def scan(
return cls._return("Scan requested")

@classmethod
async def play(cls, workflows, args, workflows_mgr, log):
async def play(
cls,
workflows: Iterable[Tokens],
args: Dict[str, Any],
workflows_mgr: 'WorkflowsManager',
log: 'Logger',
) -> List[Union[bool, str]]:
"""Calls `cylc play`."""
response = []
# get ready to run the command
try:
# check that the request cylc version is available
cylc_version = None
if 'cylc_version' in args:
cylc_version = args['cylc_version']
if not check_cylc_version(cylc_version):
return cls._error(
f'cylc version not available: {cylc_version}'
)
args = dict(args)
args.pop('cylc_version')

# build the command
cmd = ['cylc', 'play', '--color=never']
cmd = _build_cmd(cmd, args)

except Exception as exc:
# oh noes, something went wrong, send back confirmation
return cls._error(exc)
# start each requested flow
cylc_version = args.pop('cylc_version', None)
for tokens in workflows:
try:
cmd = _build_cmd(['cylc', 'play', '--color=never'], args)

if tokens['user'] and tokens['user'] != getuser():
return cls._error(
'Cannot start workflows for other users.'
Expand All @@ -329,9 +296,15 @@ async def play(cls, workflows, args, workflows_mgr, log):
cmd_repr = f'CYLC_VERSION={cylc_version} {cmd_repr}'
log.info(f'$ {cmd_repr}')

# run cylc run
env = os.environ.copy()
env.pop('CYLC_ENV_NAME', None)
if cylc_version:
env['CYLC_VERSION'] = cylc_version

# run cylc play
proc = Popen(
cmd,
env=env,
stdin=DEVNULL,
stdout=PIPE,
stderr=PIPE,
Expand All @@ -346,22 +319,16 @@ async def play(cls, workflows, args, workflows_mgr, log):
f'Could not start {tokens["workflow"]}'
f' - {cmd_repr}'
)
raise Exception(
msg
)
raise Exception(msg)

except Exception as exc:
# oh noes, something went wrong, send back confirmation
return cls._error(exc)

else:
# send a success message
return cls._return(
'Workflow started'
)
# trigger a re-scan
await workflows_mgr.scan()
return response
# send a success message
return cls._return('Workflow(s) started')

@staticmethod
async def enqueue(stream, queue):
Expand Down
126 changes: 124 additions & 2 deletions cylc/uiserver/tests/test_resolvers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
from typing import Any, Dict, List, Optional
from async_timeout import timeout
import logging
import os
import pytest
from unittest import mock
from unittest.mock import MagicMock, Mock
from subprocess import Popen, TimeoutExpired

from cylc.flow import CYLC_LOG
from cylc.flow.id import Tokens
Expand All @@ -13,6 +16,7 @@
Services,
process_cat_log_stderr,
)
from cylc.uiserver.workflows_mgr import WorkflowsManager

services = Services()

Expand Down Expand Up @@ -47,6 +51,124 @@ def test_Services_anciliary_methods(func, message, expect):
assert func(message) == expect


@pytest.mark.parametrize(
'workflows, args, env, popen_ret_code, expected_ret, expected_env',
[
pytest.param(
[Tokens('wflow1'), Tokens('~murray/wflow2')],
{},
{},
0,
[True, "Workflow(s) started"],
{},
id="multiple"
),
pytest.param(
[Tokens('~feynman/wflow1')],
{},
{},
None,
[False, "Cannot start workflows for other users."],
{},
id="other user's wflow"
),
pytest.param(
[Tokens('wflow1')],
{},
{},
1,
[False, "strange"],
{},
id="command failed"
),
pytest.param(
[Tokens('wflow1')],
{'cylc_version': 'top'},
{'CYLC_VERSION': 'bottom', 'CYLC_ENV_NAME': 'quark'},
0,
[True, "Workflow(s) started"],
{'CYLC_VERSION': 'top'},
id="cylc version overrides env"
),
]
)
async def test_play(
monkeypatch: pytest.MonkeyPatch,
workflows: List[Tokens],
args: Dict[str, Any],
env: Dict[str, str],
popen_ret_code: Optional[int],
expected_ret: list,
expected_env: Dict[str, str],
):
"""It runs cylc play correctly.
Params:
workflows: list of workflow tokens
args: any args/options for cylc play
env: any environment variables
popen_ret_code: return code from cylc play
expected_ret: expected return value
expected_env: any expected environment variables
"""
for k, v in env.items():
monkeypatch.setenv(k, v)
monkeypatch.setattr('cylc.uiserver.resolvers.getuser', lambda: 'murray')
mock_popen = Mock(
spec=Popen,
return_value=Mock(
spec=Popen,
wait=Mock(return_value=popen_ret_code),
communicate=Mock(return_value=('charm', 'strange')),
)
)
monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen)

ret = await Services.play(
workflows,
{'some': 'opt', **args},
workflows_mgr=Mock(spec=WorkflowsManager),
log=Mock(),
)

assert ret == expected_ret

expected_env = {**os.environ, **expected_env}
expected_env.pop('CYLC_ENV_NAME', None)

for i, call_args in enumerate(mock_popen.call_args_list):
cmd_str = ' '.join(call_args.args[0])
assert cmd_str.startswith('cylc play')
assert '--some opt' in cmd_str
assert workflows[i]['workflow'] in cmd_str

assert call_args.kwargs['env'] == expected_env


async def test_play_timeout(monkeypatch: pytest.MonkeyPatch):
"""It returns an error if cylc play times out."""
def timeout(*args, **kwargs):
raise TimeoutExpired('cylc play', 42)

mock_popen = Mock(
spec=Popen,
return_value=Mock(
spec=Popen,
wait=timeout,
)
)
monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen)

ret = await Services.play(
[Tokens('wflow1')],
{},
workflows_mgr=Mock(spec=WorkflowsManager),
log=Mock(),
)

assert ret == [False, "Command 'cylc play' timed out after 42 seconds"]


async def test_cat_log(workflow_run_dir):
"""This is a functional test for cat_log subscription resolver.
Expand Down Expand Up @@ -80,7 +202,7 @@ async def test_cat_log(workflow_run_dir):
log_file = log_dir / '01-start-01.log'
log_file.write_text(log_file_content)
expected = log_file.read_text()
info = mock.MagicMock()
info = MagicMock()
info.root_value = 2
# mock the context
info.context = {'sub_statuses': {2: "start"}}
Expand Down

0 comments on commit cd6b6fa

Please sign in to comment.