-
Notifications
You must be signed in to change notification settings - Fork 94
Documenting Communications Layer
An outline of how the communication layer works in cylc 6.1.2.
Two interfaces provided via Pyro for communicating with suite:
- A "command_queue" -
from cylc.suite_cmd_interface import comqueue
- A "info_interface" -
from cylc.suite_info_interface import info_interface
Also using Pyro is a log interface via:
- "log_interface" -
from cylc.suite_log_interface import log_interface
The commands these work with are defined in scheduler.py
under def configure
which links the incoming messages with various commands, outlined below.
A command in the command_queue is processed in scheduler.py
by def process_command_queue( self )
which performs name, args = queue.get(False)
to extract the command and it's arguments.
If a piece of code wishes to interact with the suite and put entries in the queue it will first request a proxy from the pyro client, targetted at the appropriate named interface e.g.:
proxy = cylc_pyro_client.client( suite, pphrase, options.owner,
options.host, options.pyro_timeout,
options.port ).get_proxy( 'command-interface' )
Commands can then be issued using this proxy as:
proxy.put( <COMMAND AND ARGS> )
All of these require access to the suite passphrase. A number of them could potentially be made available as accessible by anyone e.g. "get cylc version".
Command: | ping suite |
---|---|
Calls: | info_ping_suite( self ) |
Returns: | True |
Notes: |
Command: | ping task |
---|---|
Calls: | info_ping_task( self, task_id ) |
Returns: | self.pool.ping_task( task_id ) |
Notes: |
Command: | suite info |
---|---|
Calls: | info_get_suite_info( self ) |
Returns: | [ self.config.cfg['title'], user ] |
Notes: |
Command: | task info |
---|---|
Calls: | info_get_task_info( self, task_names ) |
Returns: | info |
Notes: |
info is a dict with keys for each of the task names requested |
Command: | all families |
---|---|
Calls: | info_get_all_families( self, exclude_root=False ) |
Returns: | List of families including or excluding the "root" family |
Notes: |
Command: | triggering families |
---|---|
Calls: | info_get_triggering_families( self ) |
Returns: | self.config.triggering_families |
Notes: |
Command: | first-parent ancestors |
---|---|
Calls: | info_get_first_parent_ancestors( self, pruned=False ) |
Returns: | deepcopy(self.config.get_first_parent_ancestors(pruned) ) |
Notes: | single-inheritance hierarchy based on first parents |
Command: | first-parent descendants |
---|---|
Calls: | info_get_first_parent_descendants( self ) |
Returns: | deepcopy(self.config.get_first_parent_descendants()) |
Notes: |
Command: | graph raw |
---|---|
Calls: | info_get_graph_raw( self, cto, ctn, group_nodes, ungroup_nodes,ungroup_recursive, group_all, ungroup_all ) |
Returns: | self.config.get_graph_raw( cto, ctn, group_nodes, ungroup_nodes, ungroup_recursive, group_all, ungroup_all), self.config.suite_polling_tasks, self.config.leaves, self.config.feet |
Notes: |
Command: | task requisites |
---|---|
Calls: | info_get_task_requisites( self, in_ids ) |
Returns: | self.pool.get_task_requisites( ids ) |
Notes: |
Command: | get cylc version |
---|---|
Calls: | info_get_cylc_version(self) |
Returns: | CYLC_VERSION |
Notes: |
As with the info commands, these all require access to the suite passphrase.
Command: | stop cleanly |
---|---|
Calls: | command_set_stop_cleanly(self, kill_active_tasks=False) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Sets shutdown flag in scheduler.py
|
Command: | stop now |
---|---|
Calls: | command_stop_now(self) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Results in "immediate" suite shutdown |
Command: | stop after point |
---|---|
Calls: | command_set_stop_after_point( self, point_string ) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Calls self.set_stop_point( point_string ) in scheduler |
Command: | stop after clock time |
---|---|
Calls: | command_set_stop_after_clock_time( self, arg ) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Parses an input time and sets suite to stop then. |
Command: | stop after task |
---|---|
Calls: | command_set_stop_after_task(self, tid) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Results in call to self.set_stop_task(tid)
|
Command: | release suite |
---|---|
Calls: | command_release_suite( self ) |
Returns: | - |
Dependency negotiation run after this? | Y |
Notes: | Calls self.release_suite()
|
Command: | release task |
---|---|
Calls: | command_release_task( self, name, point_string, is_family ) |
Returns: | - |
Dependency negotiation run after this? | Y |
Notes: | Parses inputs to result in call to self.pool.release_tasks( task_ids )
|
Command: | remove cycle |
---|---|
Calls: | command_remove_cycle( self, point_string, spawn ) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Results in call to self.pool.remove_entire_cycle( point, spawn )
|
Command: | remove task |
---|---|
Calls: | command_remove_task( self, name, point_string, is_family, spawn ) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Results in call to self.pool.remove_tasks( task_ids, spawn )
|
Command: | hold suite now |
---|---|
Calls: | command_hold_suite( self ) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Just calls self.hold_suite() - ultimately an alias |
Command: | hold task now |
---|---|
Calls: | command_hold_task( self, name, point_string, is_family ) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Processes inputs and calls self.pool.hold_tasks( task_ids )
|
Command: | set runahead |
---|---|
Calls: | command_set_runahead( self, *args ) |
Returns: | - |
Dependency negotiation run after this? | Y |
Notes: | Calls self.pool.set_runahead(*args) - just an alias. |
Command: | set verbosity |
---|---|
Calls: | command_set_verbosity(self, lvl) |
Returns: | True, 'OK' |
Dependency negotiation run after this? | |
Notes: |
Command: | purge tree |
---|---|
Calls: | command_purge_tree( self, id, stop ) |
Returns: | - |
Dependency negotiation run after this? | Y |
Notes: | Leads to call to self.pool.purge_tree( id, get_point(stop) )
|
Command: | reset task state |
---|---|
Calls: | command_reset_task_state( self, name, point_string, state, is_family ) |
Returns: | |
Dependency negotiation run after this? | Y |
Notes: | Results in call to self.pool.reset_task_states( task_ids, state )
|
Command: | trigger task |
---|---|
Calls: | command_trigger_task( self, name, point_string, is_family ) |
Returns: | - |
Dependency negotiation run after this? | Y |
Notes: | Calls self.pool.trigger_tasks( task_ids )
|
Command: | nudge suite |
---|---|
Calls: | command_nudge( self ) |
Returns: | |
Dependency negotiation run after this? | Y |
Notes: | Puts an entry in the command queue |
Command: | insert task |
---|---|
Calls: | command_insert_task( self, name, point_string, is_family, stop_point_string ) |
Returns: | - |
Dependency negotiation run after this? | Y |
Notes: |
Command: | reload suite |
---|---|
Calls: | command_reload_suite( self ) |
Returns: | - |
Dependency negotiation run after this? | Y |
Notes: | Just calls self.reconfigure()
|
Command: | add prerequisite |
---|---|
Calls: | command_add_prerequisite( self, task_id, message ) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Calls self.pool.add_prereq_to_task( task_id, message )
|
Command: | poll tasks |
---|---|
Calls: | command_poll_tasks( self, name, point_string, is_family ) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Extracts task ids and calls self.pool.poll_tasks( task_ids )
|
Command: | kill tasks |
---|---|
Calls: | command_kill_tasks( self, name, point_string, is_family ) |
Returns: | - |
Dependency negotiation run after this? | |
Notes: | Extracts task ids and calls self.pool.kill_tasks( task_ids )
|
Other events that trigger "dependency negotiation":
'kill cycle',
'kill task',
'prerequisite'