-
Notifications
You must be signed in to change notification settings - Fork 495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add dependencies
feature for task dependency management
#4067
base: master
Are you sure you want to change the base?
Conversation
@cblmemo Hi, could you kindly review this PR when you have a moment? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this feature @andylizf ! This would be really helpful for specifying complex workflows. Left some comments. It mostly looks good to me ;)
I created a branch for our dag project and lets merge all features to this branch first. We might want to do some final checks before we merge this to our master branch and this advanced-dag
branch can serve as a buffer :))
depends_on
feature for task dependency managementdependencies
feature for task dependency management
Also, as mentioned here, lets change the target branch to merge to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andylizf for the awesome work! Left some discussion here. I tested and the basic functionality works fine!
sky/utils/dag_utils.py
Outdated
if multi_tasks: | ||
raise ValueError('Multiple task definitions without a valid' | ||
'header.') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets default to use chain yaml for backward compatibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it also a chain yaml only if there is a name section?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your initial point now. We can default to chain YAML without header, add a generate_dag_name
function, and use the YAML filename as the default name. Sound good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly sounds good to me. for the name of the dag, lets keep align with current master branch?
skypilot/sky/utils/dag_utils.py
Lines 84 to 94 in 3e98afe
configs = common_utils.read_yaml_all(path) | |
dag_name = None | |
if set(configs[0].keys()) == {'name'}: | |
dag_name = configs[0]['name'] | |
configs = configs[1:] | |
elif len(configs) == 1: | |
dag_name = configs[0].get('name') | |
if len(configs) == 0: | |
# YAML has only `name: xxx`. Still instantiate a task. | |
configs = [{'name': dag_name}] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the current master's behavior is that if the header is missing and it is a multitasking pipeline, then dag_name
will remain None
. Is this the expected behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allowing dag.name
to be None
creates a problematic cycle. If a user loads a DAG without a name, dag.name
becomes None
. When dumping this DAG to YAML, it results in name: null
. Later, when the controller loads this YAML, it parses null
back to None
. This leads to an AssertionError
in _get_dag_and_name
function, which assumes dag.name
is not None
. This cycle of None -> null -> None causes unexpected runtime errors in the existing codebase and should be addressed at once to ensure system stability.
skypilot/sky/jobs/controller.py
Lines 39 to 43 in 71a95f4
def _get_dag_and_name(dag_yaml: str) -> Tuple['sky.Dag', str]: | |
dag = dag_utils.load_chain_dag_from_yaml(dag_yaml) | |
dag_name = dag.name | |
assert dag_name is not None, dag | |
return dag, dag_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls check this
Line 67 in c6ae536
dag_utils.maybe_infer_and_fill_dag_and_task_names(dag) |
…#4067) provide an example, edited from pipeline.yml more focus on dependencies for user dag lib more powerful user interface load and dump new yaml format fix fix: reversed logic in add_edge rename refactor due to reviewer's comments generate task.name if not given add comments for add_edge add `print_exception_no_traceback` when raise make `Dag.tasks` a property print dependencies for `__repr__` move `get_unique_task_name` to common_utils rename methods to use downstream/edge terminology
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the prompt fix @andylizf ! Left some discussion. Most looks good to me!
sky/dag.py
Outdated
""" | ||
self.name = name | ||
self._task_name_lookup: Dict[str, 'task.Task'] = {} | ||
self.edges: Dict['task.Task', Set['task.Task']] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little bit confused why we need to maintain our own edges. doesnt nx.DiGraph
provide this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Ditched self.edges
and are now fully using nx.DiGraph
for edge stuff. Makes sense since we got rid of self.tasks
too. These help keeping things cleaner.
def get_unique_task_name(_: 'task_lib.Task') -> str: | ||
timestamp = int(time.time()) | ||
unique_suffix = uuid.uuid4().hex[:6] | ||
name = f'task_{timestamp}_{unique_suffix}' | ||
return name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets respect the original task name and append uuid suffix to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this function is only called when task name is None
. However, to keep each task name unique, I would do what you said and add the uuid
suffix to every task name while keeping this logic when the task name is null.
Co-authored-by: Tian Xia <[email protected]>
sky/utils/common_utils.py
Outdated
def get_unique_task_name(_: 'task_lib.Task') -> str: | ||
timestamp = int(time.time()) | ||
def get_unique_task_name(task: 'task_lib.Task') -> str: | ||
name = task.name or f'task_{int(time.time())}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name = task.name or f'task_{int(time.time())}' | |
name = task.name if task.name is not None else f'task_{int(time.time())}' |
nit: Always use if foo is None
.
https://google.github.io/styleguide/pyguide.html#2144-decision
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's intentional. I think we should keep it as is. Catching empty strings is also what we want. If the task name is an empty string, we still want to fall back to the timestamp.
Co-authored-by: Tian Xia <[email protected]>
…amed" Otherwise, users can not refer to the task by name in the DAG. This reverts commit 8486352.
Resolve #4054
This PR introduces the
dependencies
feature to SkyPilot, enabling users to specify task dependencies in their YAML configurations. This enhancement allows for more complex workflow definitions and ensures proper execution order of interdependent tasks.Key changes:
dependencies
sectionTested (run the relevant ones):
bash format.sh
pytest tests/test_smoke.py
pytest tests/test_smoke.py::test_fill_in_the_name
conda deactivate; bash -i tests/backward_compatibility_tests.sh