Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow-specific hold/release #5698

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
1 change: 1 addition & 0 deletions changes.d/5698.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Flow-specific task hold and release.
5 changes: 1 addition & 4 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,10 +1012,7 @@ def generate_ghost_task(
id=tp_id,
task=t_id,
cycle_point=point_string,
is_held=(
(name, point)
in self.schd.pool.tasks_to_hold
),
is_held=self.schd.pool.hold_mgr.is_held(name, point),
depth=task_def.depth,
name=name,
)
Expand Down
10 changes: 10 additions & 0 deletions cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import datetime

from cylc.flow import LOG
from cylc.flow.exceptions import InputError
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager


Expand All @@ -30,6 +31,15 @@
FLOW_NONE = "none"


def validate_flow_opt(val):
Copy link
Member

@wxtim wxtim Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this semi duplicate the logic in cylc trigger? Surely all use-cases of --flow should behave roughly the same, and probably be centralized in option_parser.py. Centralizing them here is reasonable too, but on balance this is more about options than flows?

You might want to give it a kwarg where you pass it a list of acceptable strings?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, there are some differences, e.g. cylc trigger --flow=new makes sense, but cylc hold --flow=new doesn't.

"""Validate command line --flow opions."""
if val is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will the parser allow this value to be None?

try:
int(val)
except ValueError:
raise InputError(f"--flow={val}: value must be integer.")


class FlowMgr:
"""Logic to manage flow counter and flow metadata."""

Expand Down
14 changes: 13 additions & 1 deletion cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,9 @@ class Arguments:
description='Hold all tasks after the specified cycle point.',
required=True
)

flow_num = Int(
description='Number of flow to hold.'
)
Comment on lines +1738 to +1740
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the default be flow=all?

The pattern used in other mutations is a field called flow of type Flow which accepts either an integer or a string. See FlowMutationArguments in this file.

In this case new and none don't make sense, so it might need a new type.

result = GenericScalar()


Expand Down Expand Up @@ -2035,6 +2037,11 @@ class Meta:
''')
resolver = mutator

class Arguments(TaskMutation.Arguments):
flow_num = Int(
description='Number of flow to hold.'
)


class Release(Mutation, TaskMutation):
class Meta:
Expand All @@ -2047,6 +2054,11 @@ class Meta:
''')
resolver = mutator

class Arguments(TaskMutation.Arguments):
flow_num = Int(
description='Number of flow to release.'
)


class Kill(Mutation, TaskMutation):
# TODO: This should be a job mutation?
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ class CylcWorkflowDAO:
TABLE_TASKS_TO_HOLD: [
["name"],
["cycle"],
["flow"],
],
}

Expand Down Expand Up @@ -939,11 +940,11 @@ def select_task_prerequisites(
stmt_args = [cycle, name, flow_nums]
return list(self.connect().execute(stmt, stmt_args))

def select_tasks_to_hold(self) -> List[Tuple[str, str]]:
def select_tasks_to_hold(self) -> List[Tuple[str, str, str]]:
"""Return all tasks to hold stored in the DB."""
stmt = rf'''
SELECT
name, cycle
name, cycle, flow
FROM
{self.TABLE_TASKS_TO_HOLD}
''' # nosec (table name is code constant)
Expand Down
60 changes: 42 additions & 18 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,8 @@
def queue_command(self, command: str, kwargs: dict) -> None:
self.command_queue.put((
command,
tuple(kwargs.values()), {}
(),
kwargs,
))

async def process_command_queue(self) -> None:
Expand Down Expand Up @@ -1011,9 +1012,13 @@
self.stop_mode = stop_mode
self.update_data_store()

def command_release(self, task_globs: Iterable[str]) -> int:
def command_release(
self,
tasks: Iterable[str],
flow_num: Optional[int] = None
) -> int:
"""Release held tasks."""
return self.pool.release_held_tasks(task_globs)
return self.pool.release_held_tasks(tasks, flow_num)

def command_release_hold_point(self) -> None:
"""Release all held tasks and unset workflow hold after cycle point,
Expand All @@ -1025,57 +1030,68 @@
"""Resume paused workflow."""
self.resume_workflow()

def command_poll_tasks(self, items: List[str]) -> int:
def command_poll_tasks(self, tasks: List[str]) -> int:
"""Poll pollable tasks or a task or family if options are provided."""
if self.config.run_mode('simulation'):
return 0
itasks, _, bad_items = self.pool.filter_task_proxies(items)
itasks, _, bad_items = self.pool.filter_task_proxies(tasks)
self.task_job_mgr.poll_task_jobs(self.workflow, itasks)
return len(bad_items)

def command_kill_tasks(self, items: List[str]) -> int:
def command_kill_tasks(self, tasks: List[str]) -> int:
"""Kill all tasks or a task/family if options are provided."""
itasks, _, bad_items = self.pool.filter_task_proxies(items)
itasks, _, bad_items = self.pool.filter_task_proxies(tasks)
if self.config.run_mode('simulation'):
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
itask.state_reset(TASK_STATUS_FAILED)
self.data_store_mgr.delta_task_state(itask)
return len(bad_items)
self.task_job_mgr.kill_task_jobs(self.workflow, itasks)
to_kill = self.task_job_mgr.kill_task_jobs(self.workflow, itasks)
# Hold killed tasks to prevent automatic retry.
for itask in to_kill:
self.pool.hold_mgr.hold_active_task(itask)
return len(bad_items)

def command_hold(self, task_globs: Iterable[str]) -> int:
def command_hold(
self,
tasks: Iterable[str],
flow_num: Optional[int] = None
) -> int:
"""Hold specified tasks."""
return self.pool.hold_tasks(task_globs)
return self.pool.hold_tasks(tasks, flow_num)

def command_set_hold_point(self, point: str) -> None:
def command_set_hold_point(
self,
point: str,
flow_num: Optional[int] = None
) -> None:
"""Hold all tasks after the specified cycle point."""
cycle_point = TaskID.get_standardised_point(point)
if cycle_point is None:
raise CyclingError("Cannot set hold point to None")
LOG.info(
f"Setting hold cycle point: {cycle_point}\n"
"All tasks after this point will be held.")
self.pool.set_hold_point(cycle_point)
self.pool.set_hold_point(cycle_point, flow_num)

def command_pause(self) -> None:
"""Pause the workflow."""
self.pause_workflow()

@staticmethod
def command_set_verbosity(lvl: Union[int, str]) -> None:
def command_set_verbosity(level: Union[int, str]) -> None:
"""Set workflow verbosity."""
try:
lvl = int(lvl)
LOG.setLevel(lvl)
level = int(level)
LOG.setLevel(level)
except (TypeError, ValueError) as exc:
raise CommandFailedError(exc)
cylc.flow.flags.verbosity = log_level_to_verbosity(lvl)
cylc.flow.flags.verbosity = log_level_to_verbosity(level)

def command_remove_tasks(self, items) -> int:
def command_remove_tasks(self, tasks) -> int:
"""Remove tasks."""
return self.pool.remove_tasks(items)
return self.pool.remove_tasks(tasks)

async def command_reload_workflow(self) -> None:
"""Reload workflow configuration."""
Expand Down Expand Up @@ -1329,6 +1345,8 @@
* Original workflow run time zone.
"""
LOG.info('LOADING workflow parameters')
self.options.holdcp_flow = None # (not CLI but needed on restart)

for key, value in params:
if value is None:
continue
Expand Down Expand Up @@ -1370,6 +1388,12 @@
):
self.options.holdcp = value
LOG.info(f"+ hold point = {value}")
elif (
key == self.workflow_db_mgr.KEY_HOLD_CYCLE_POINT_FLOW
and self.options.holdcp_flow is None
):
self.options.holdcp_flow = value
LOG.info(f"+ hold point flow = {value}")

Check warning on line 1396 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1395-L1396

Added lines #L1395 - L1396 were not covered by tests
elif key == self.workflow_db_mgr.KEY_STOP_CLOCK_TIME:
int_val = int(value)
msg = f"stop clock time = {int_val} ({time2str(int_val)})"
Expand Down
33 changes: 25 additions & 8 deletions cylc/flow/scripts/hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from typing import TYPE_CHECKING

from cylc.flow.exceptions import InputError
from cylc.flow.flow_mgr import validate_flow_opt
from cylc.flow.network.client_factory import get_client
from cylc.flow.option_parsers import (
FULL_ID_MULTI_ARG_DOC,
Expand All @@ -67,18 +68,21 @@
from cylc.flow.terminal import cli_function
from cylc.flow.network.multi import call_multi


if TYPE_CHECKING:
from optparse import Values


HOLD_MUTATION = '''
mutation (
$wFlows: [WorkflowID]!,
$tasks: [NamespaceIDGlob]!
$tasks: [NamespaceIDGlob]!,
$flowNum: Int
) {
hold (
workflows: $wFlows,
tasks: $tasks
tasks: $tasks,
flowNum: $flowNum
) {
result
}
Expand All @@ -88,11 +92,13 @@
SET_HOLD_POINT_MUTATION = '''
mutation (
$wFlows: [WorkflowID]!,
$point: CyclePoint!
$point: CyclePoint!,
$flowNum: Int
) {
setHoldPoint (
workflows: $wFlows,
point: $point
point: $point,
flowNum: $flowNum
) {
result
}
Expand All @@ -114,6 +120,11 @@ def get_option_parser() -> COP:
help="Hold all tasks after this cycle point.",
metavar="CYCLE_POINT", action="store", dest="hold_point_string")

parser.add_option(
"--flow",
help="Hold tasks that belong to a specific flow.",
metavar="INT", action="store", dest="flow_num")
Comment on lines +123 to +126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably --flow=all is the default?


return parser


Expand All @@ -123,12 +134,14 @@ def _validate(options: 'Values', *task_globs: str) -> None:
if task_globs:
raise InputError(
"Cannot combine --after with Cylc/Task IDs.\n"
"`cylc hold --after` holds all tasks after the given "
"cycle point.")
"`cylc hold --after` holds ALL tasks after the given "
"cycle point. Can be used with `--flow`.")
elif not task_globs:
raise InputError(
"Must define Cycles/Tasks. See `cylc hold --help`.")

validate_flow_opt(options.flow_num)


async def run(options, workflow_id, *tokens_list):
_validate(options, *tokens_list)
Expand All @@ -137,14 +150,18 @@ async def run(options, workflow_id, *tokens_list):

if options.hold_point_string:
mutation = SET_HOLD_POINT_MUTATION
args = {'point': options.hold_point_string}
args = {
'point': options.hold_point_string,
'flowNum': options.flow_num
}
else:
mutation = HOLD_MUTATION
args = {
'tasks': [
id_.relative_id_with_selectors
for id_ in tokens_list
]
],
'flowNum': options.flow_num
}

mutation_kwargs = {
Expand Down
15 changes: 13 additions & 2 deletions cylc/flow/scripts/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from typing import TYPE_CHECKING

from cylc.flow.exceptions import InputError
from cylc.flow.flow_mgr import validate_flow_opt
from cylc.flow.network.client_factory import get_client
from cylc.flow.network.multi import call_multi
from cylc.flow.option_parsers import (
Expand All @@ -57,11 +58,13 @@
RELEASE_MUTATION = '''
mutation (
$wFlows: [WorkflowID]!,
$tasks: [NamespaceIDGlob]!
$tasks: [NamespaceIDGlob]!,
$flowNum: Int
) {
release (
workflows: $wFlows,
tasks: $tasks,
flowNum: $flowNum
) {
result
}
Expand Down Expand Up @@ -97,6 +100,11 @@ def get_option_parser() -> COP:
"if set."),
action="store_true", dest="release_all")

parser.add_option(
"--flow",
help="Release tasks that belong to a specific flow.",
metavar="INT", action="store", dest="flow_num")

return parser


Expand All @@ -111,6 +119,8 @@ def _validate(options: 'Values', *tokens_list: str) -> None:
"Must define Cycles/Tasks. See `cylc release --help`."
)

validate_flow_opt(options.flow_num)


async def run(options: 'Values', workflow_id, *tokens_list):
_validate(options, *tokens_list)
Expand All @@ -126,7 +136,8 @@ async def run(options: 'Values', workflow_id, *tokens_list):
'tasks': [
tokens.relative_id_with_selectors
for tokens in tokens_list
]
],
'flowNum': options.flow_num
}

mutation_kwargs = {
Expand Down
Loading
Loading