Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic output path generation #19

Open
hbredin opened this issue Mar 22, 2016 · 4 comments
Open

Automatic output path generation #19

hbredin opened this issue Mar 22, 2016 · 4 comments

Comments

@hbredin
Copy link

hbredin commented Mar 22, 2016

I had difficulty finding a good naming convention for all my out_xxxx path, when my workflow would become complicated (e.g. with one task taking three other tasks as input: how should I name its output?)

Therefore, I have created a sciluigi.Task mixin called AutoOutput that would automatically add an out_put method to a task (see below). Maybe it can be useful for others...

All you have to do to use it is the following:

  • add a workdir luigi.Parameter to the WorkflowTask
  • add the AutoOutput mixin to the task you are adding to the workflow
class MyTask(sciluigi.Task, AutoOutput):
    in_put1 = None
    in_put2 = None
    paramA = luigi.Parameter()
    paramB = luigi.Parameter()

    def run(self):
        with self.out_put().open('w') as fp:
            pass

class MyWorkflow(sciluigi.WorkflowTask):
    workdir = luigi.Parameter()

    task1 = self.new_task('task1', Task1)
    task2 = self.new_task('task2, Task2)

    task = self.new_task('task', MyTask)
    task.in_put1 = task1.out_put()
    task.in_put2 = task2.out_put()

It does have a few limitations, the main one being that it does not support tasks with structured inputs.

This will work:

task.in_put1 = task1.out_put
task.in_put2 = task2.out_put

This will not work:

task.in_put = {
  'input1': task1.out_put
  'input2': task2.out_put
}

Here is the code of the AutoOutput mixin:

class AutoOutput(object):

    def _output_from_hash(self):

        # working directory within which all automatic outputs will be stored
        workdir = self.workflow_task.workdir

        description = {}

        # add one {key: value} per in_xxxx method
        # key = 'in_xxxx'
        # value = F(in_xxxx().path)
        for attrname, attrval in six.iteritems(self.__dict__):
            if 'in_' == attrname[0:3]:
                path = attrval().path
                if path.startswith(workdir):
                    path = path[len(workdir):]
                description[attrname] = path


        # add one {key: value} per task parameter
        # key = parameter name
        # value = parameter value
        params = self.get_params()
        params = [name for name, _ in params]
        for param_name in params:
            # do not take 'instance_name' and 'workflow_task' into account
            if param_name in ['instance_name', 'workflow_task']:
                continue
            description[param_name] = getattr(self, param_name)

        # hash the resulting dictionary
        digest = hashlib.sha1(
            json.dumps(description, sort_keys=True)).hexdigest()

        # generate out_put path automatically
        output_path = '{workdir}/{workflow_name}/{instance_name}/{digest}'
        return output_path.format(
            workdir=workdir,
            instance_name=self.instance_name,
            workflow_name=self.workflow_task.__class__.__name__,
            digest=digest)

    def out_put(self):
        # automagically get out_put path
        path = self._output_from_hash()
        return sciluigi.TargetInfo(self, path)
@hbredin
Copy link
Author

hbredin commented Mar 22, 2016

The question that ensues is somehow related to #18 (rather a generalization of it).
Since paths are now generated automatically, it is not that obvious to know where output files were written (this is true even without the AutoOutput mixin)

Is there an easy way to access the output of all tasks constituting the workflow.
I tried the following

def getAllOutputs(workflow):
    outputs = {}
    for instance_name, task in six.iteritems(workflow._tasks):
        outputs[instance_name] = task.out_put().path
    return outputs

workflow = MyWorkflow()
outputs = getAllOutputs(workflow)

But it looks like, at this point, tasks constituting the workflow (workflow._tasks) are not instantiated yet, and all we got are output paths based on default parameter values.

What does workflow._tasks contains exactly?

@samuell
Copy link
Member

samuell commented Mar 24, 2016

(Hi, sorry, have been a bit busy, will look at this now!)

@samuell
Copy link
Member

samuell commented Mar 24, 2016

I had difficulty finding a good naming convention for all my out_xxxx path, when my workflow would become complicated (e.g. with one task taking three other tasks as input: how should I name its output?)

This is hard to say without a concrete example. We have had cases where we often have multiple outputs, so it has been central for us to give each output a unique name and thus "identity". In cases where we had a single output, we have kept with the same pattern and tried to give a descriptive name, such as .out_concatenated, or .out_traindata, or .out_testdata.

But it looks like, at this point, tasks constituting the workflow (workflow._tasks) are not instantiated yet, and all we got are output paths based on default parameter values.

Yea, without knowing this for sure in this case without testing, I often found problems with the fact that Luigi separates scheduling and workflow execution in two phases, and so tasks are not fully instantiated until the scheduling phase is finished and the execution started.

Our biggest problem with this is that makes it hard for example to initiate a new task with parameter values calculated by a previous task, since parameter values need to be provided at scheduling time, and scheduling time is over after the execution starts. As a side note, this is one reason why we are experimenting with a fully dataflow-based approach in scipipe, where scheduling and execution can happen interchangeably (but it's not production ready yet).

Is there an easy way to access the output of all tasks constituting the workflow.
I tried the following

Will have to test a little before getting back on this, and the other remaining questions. Will get back to you shortly!

@hbredin
Copy link
Author

hbredin commented Apr 5, 2016

FYI, I ended up saving every automagically generated output paths in an attribute of the parent workflow:
https://github.com/pyannote/pyannote-workflows/blob/master/pyannote_workflows/utils.py#L56-L68

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants