Skip to content

Commit

Permalink
implement set-task --pre [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Aug 1, 2023
1 parent 569797d commit 1901c8c
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 41 deletions.
14 changes: 8 additions & 6 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,26 +811,28 @@ def force_spawn_children(
self,
tasks: Iterable[str],
outputs: Optional[Iterable[str]] = None,
flow: Iterable[str] = None,
prerequisites: Optional[Iterable[str]] = None,
flow: Optional[Iterable[str]] = None,
flow_wait: bool = False,
flow_descr: str = ""
flow_descr: Optional[str] = None,
) -> Tuple[bool, str]:
"""Spawn children of given task outputs.
User-facing method name: set_task.
Args:
tasks: List of identifiers or task globs.
outputs: List of outputs to spawn on.
flow (list):
Flow ownership of triggered tasks.
tasks: Identifiers or task globs.
outputs: Outputs to set complete.
prerequisites: Prerequisites to set satisfied.
flow: Flows that spawned tasks should belong to.
"""
self.schd.command_queue.put(
(
"force_spawn_children",
(tasks,),
{
"outputs": outputs,
"prerequisites": prerequisites,
"flow": flow,
"flow_wait": flow_wait,
"flow_descr": flow_descr,
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2111,7 +2111,6 @@ class Meta:
class Arguments(TaskMutation.Arguments, FlowMutationArguments):
outputs = graphene.List(
String,
default_value=[TASK_OUTPUT_SUCCEEDED],
description='List of task outputs to set complete.'
)
prerequisites = graphene.List(
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,7 @@ async def main_loop(self) -> None:
tinit = time()

# Useful for debugging core scheduler issues:
# import logging
# self.pool.log_task_pool(logging.CRITICAL)
if self.incomplete_ri_map:
self.manage_remote_init()
Expand Down Expand Up @@ -2125,14 +2126,14 @@ def command_force_trigger_tasks(self, items, flow, flow_wait, flow_descr):
items, flow, flow_wait, flow_descr)

def command_force_spawn_children(
self, items, outputs, flow, flow_wait, flow_descr
self, items, outputs, prerequisites, flow, flow_wait, flow_descr
):
"""Force spawn task successors.
User-facing method name: set_task.
"""
return self.pool.force_spawn_children(
items, outputs, flow, flow_wait, flow_descr
items, outputs, prerequisites, flow, flow_wait, flow_descr
)

def _update_profile_info(self, category, amount, amount_format="%s"):
Expand Down
30 changes: 29 additions & 1 deletion cylc/flow/scripts/set_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@
- started implies submitted
- succeeded and failed imply started
- custom outputs and expired do not imply any other outputs
Specify prerequisites in the form "point/task:message".
"""

from functools import partial
from optparse import Values

from cylc.flow.exceptions import InputError
from cylc.flow.network.client_factory import get_client
from cylc.flow.network.multi import call_multi
from cylc.flow.option_parsers import (
Expand All @@ -53,6 +57,8 @@
ERR_OPT_FLOW_WAIT,
validate_flow_opts
)
from cylc.flow.task_id import TaskID
from cylc.flow.task_pool import REC_CLI_PREREQ


MUTATION = '''
Expand Down Expand Up @@ -105,6 +111,7 @@ def get_option_parser() -> COP:
"Set task prerequisites satisfied."
' May be "all", which is equivalent to "cylc trigger".'
" (Multiple use allowed, may be comma separated)."
" Prerequisite format: 'point/task:message'."
),
action="append", default=None, dest="prerequisites"
)
Expand Down Expand Up @@ -134,6 +141,25 @@ def get_option_parser() -> COP:
return parser


def get_prerequisite_opts(options):
"""Convert prerequisite inputs to a single list, and validate.
This:
--pre=a -pre=b,c
is equivalent to this:
--pre=a,b,c
Validation: format <point>/<name>:<qualifier>
"""
result = []
for p in options.prerequisites:
result += p.split(',')
for p in result:
if not REC_CLI_PREREQ.match(p):
raise InputError(f"Bad prerequisite: {p}")
return result


async def run(options: 'Values', workflow_id: str, *tokens_list) -> None:
pclient = get_client(workflow_id, timeout=options.comms_timeout)

Expand All @@ -146,7 +172,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list) -> None:
for tokens in tokens_list
],
'outputs': options.outputs,
'prerequisites': options.prerequisites,
'prerequisites': get_prerequisite_opts(options),
'flow': options.flow,
'flowWait': options.flow_wait,
'flowDescr': options.flow_descr
Expand All @@ -158,8 +184,10 @@ async def run(options: 'Values', workflow_id: str, *tokens_list) -> None:

@cli_function(get_option_parser)
def main(parser: COP, options: 'Values', *ids) -> None:

if options.flow is None:
options.flow = [FLOW_ALL] # default to all active flows

validate_flow_opts(options)

call_multi(
Expand Down
108 changes: 77 additions & 31 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

"""Wrangle task proxies to manage the workflow."""

import re
from contextlib import suppress
from collections import Counter
import json
Expand Down Expand Up @@ -85,6 +86,15 @@
Pool = Dict['PointBase', Dict[str, TaskProxy]]


# CLI prerequisite pattern: point/name:label
REC_CLI_PREREQ = re.compile(
rf"({TaskID.POINT_RE})" +
rf"{TaskID.DELIM2}" +
rf"({TaskID.NAME_RE})" +
r':' + r'(\w+)' # TODO: formally define qualifier RE?
)


class TaskPool:
"""Task pool of a workflow."""

Expand Down Expand Up @@ -702,7 +712,7 @@ def _get_spawned_or_merged_task(
# ntask does not exist: spawn it in the flow.
ntask = self.spawn_task(name, point, flow_nums)
else:
# ntask already exists (n=0 or incomplete): merge flows.
# ntask already exists (n=0): merge flows.
self.merge_flows(ntask, flow_nums)
return ntask # may be None

Expand Down Expand Up @@ -1259,7 +1269,7 @@ def spawn_on_output(self, itask, output, forced=False):
Args:
tasks: List of identifiers or task globs.
outputs: List of outputs to spawn on.
output: Output to spawn on.
forced: If True this is a manual spawn command.
"""
Expand Down Expand Up @@ -1576,45 +1586,79 @@ def spawn_task(
self.db_add_new_flow_rows(itask)
return itask

# TODO RENAME THIS METHOD
def force_spawn_children(
self,
items: Iterable[str],
outputs: List[str],
prerequisites: List[str],
flow: List[str],
flow_wait: bool = False,
flow_descr: str = "",
flow_descr: Optional[str] = None
):
"""Spawn downstream children of given outputs, on user command.
"""Force set prerequistes satisfied and outputs completed.
User-facing command name: set_task. Creates a transient parent just
for the purpose of spawning children.
For prerequisites:
- spawn target task if necessary, and set the prerequisites
For outputs:
- spawn child tasks if necessary, and spawn/update prereqs of
children
- TODO: set outputs completed in the target task (DB, and task
proxy if already spawned - but don't spawn a new one)
Args:
items: Identifiers for matching task definitions, each with the
form "point/name".
outputs: List of outputs to spawn on
flow: Flow number to attribute the outputs
items: Identifiers for matching task definitions
prerequisites: prerequisites to set and spawn children of
outputs: Outputs to set and spawn children of
flow: Flow numbers for spawned or updated tasks
flow_wait: wait for flows to catch up before continuing
flow_descr: description of new flow
"""
outputs = outputs or [TASK_OUTPUT_SUCCEEDED]
flow_nums = self._flow_cmd_helper(flow)
if not outputs and not prerequisites:
# Default: set all required outputs.
outputs = outputs or [TASK_OUTPUT_SUCCEEDED]

flow_nums = self._flow_cmd_helper(flow, flow_descr)
if flow_nums is None:
return
return

n_warnings, task_items = self.match_taskdefs(items)
for (_, point), taskdef in sorted(task_items.items()):
# This the parent task:
itask = TaskProxy(
self.tokens,
taskdef,
point,
flow_nums=flow_nums,

itask = self._get_spawned_or_merged_task(
point, taskdef.name, flow_nums
)
# Spawn children of selected outputs.
for trig, out, _ in itask.state.outputs.get_all():
if trig in outputs:
LOG.info(f"[{itask}] Forced spawning on {out}")
self.spawn_on_output(itask, out, forced=True)
if itask is None:
# Not in pool but was spawned already in this flow.
return

if outputs:
# Spawn children of outputs, add them to the pool.
# (Don't add the target task to pool if we just spawned it)
for trig, out, _ in itask.state.outputs.get_all():
if trig in outputs:
LOG.info(f"[{itask}] Forced spawning on {out}")
self.spawn_on_output(itask, out, forced=True)
self.workflow_db_mgr.put_update_task_outputs(itask)

if prerequisites:
for pre in prerequisites:
m = REC_CLI_PREREQ.match(pre)
if m is not None:
itask.state.satisfy_me({m.groups()})
else:
# TODO warn here? (checked on CLI)
continue

self.data_store_mgr.delta_task_prerequisite(itask)
self.add_to_pool(itask) # move from hidden if necessary
if (
self.runahead_limit_point is not None
and itask.point <= self.runahead_limit_point
):
self.rh_release_and_queue(itask)

def _get_active_flow_nums(self) -> Set[int]:
"""Return all active, or most recent previous, flow numbers.
Expand All @@ -1639,8 +1683,12 @@ def remove_tasks(self, items):
self.release_runahead_tasks()
return len(bad_items)

def _flow_cmd_helper(self, flow):
# TODO type hints
def _flow_cmd_helper(
self,
flow: List[str],
flow_descr: Optional[str]
) -> Optional[Set[int]]:
"""TODO"""
if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}):
if len(flow) != 1:
LOG.warning(
Expand Down Expand Up @@ -1669,7 +1717,7 @@ def force_trigger_tasks(
flow: List[str],
flow_wait: bool = False,
flow_descr: Optional[str] = None
) -> int:
):
"""Manual task triggering.
Don't get a new flow number for existing n=0 tasks (e.g. incomplete
Expand All @@ -1678,9 +1726,9 @@ def force_trigger_tasks(
Queue the task if not queued, otherwise release it to run.
"""
flow_nums = self._flow_cmd_helper(flow)
flow_nums = self._flow_cmd_helper(flow, flow_descr)
if flow_nums is None:
return
return

# n_warnings, task_items = self.match_taskdefs(items)
itasks, future_tasks, unmatched = self.filter_task_proxies(
Expand Down Expand Up @@ -1729,8 +1777,6 @@ def force_trigger_tasks(
# De-queue it to run now.
self.task_queue_mgr.force_release_task(itask)

return len(unmatched)

def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool:
"""Simulation mode: simulate task run times and set states."""
if not self.config.run_mode('simulation'):
Expand Down

0 comments on commit 1901c8c

Please sign in to comment.