Processes and workflows recursion [EXPERIMENTAL] #2521
Replies: 15 comments 65 replies
-
This seems like it could be incredibly useful! I have no comments apart from my strong support. |
Beta Was this translation helpful? Give feedback.
-
I'd tried to develop a pipeline for generating simulated datasets that could be used to test variant calling tools: https://github.com/dfornika/simulate-outbreak-dataset The idea is to do a rudimentary simulation of an outbreak by introducing mutations into a reference genome, then passing those mutated genomes forward to another iteration. I was never quite able to get it working the way I wanted using DSL2 syntax. I'm looking forward to giving it another try with the new |
Beta Was this translation helpful? Give feedback.
-
Christmas came early! Very excited to try this out. Thanks @pditommaso! |
Beta Was this translation helpful? Give feedback.
-
This is a good initial implementation. For this feature to be robust, I think it needs to be rooted in the concept of a finite state machine. FSMs are used to design digital circuits that require recursion or iteration. Digital circuits are just workflows implemented in hardware, so they are a good analogy for Nextflow. See also recurrent neural networks. Taking the above example from @dfornika and this state machine diagram, we can see that:
This example works in Nextflow because the state and the output happen to be the same, but what if they aren't? I think a process/workflow that supports recursion should be able to have a sense of "state" that is distinct from the output (but can be the same). Being able to define state variables for a process/workflow would (I think) solve limitations 1, 3, 4, and 5. If you have state, the input and output don't have to be identical. State can be used to keep track of recursion level (either by hand or we could add a convenience function). State allows you to distinguish between the initial input and accumulated outputs in the scan example. In fact, if you can define state variables then I think the difference between Regarding limitation 2, passing a queue channel into a recursive process/workflow should work just like any other process/workflow -- each item in the channel should be processed independently. Instead of calling a process once per item like normal, each item will trigger a recursive sequence of tasks, but the overall parallelism of the queue channel remains the same. So basically I think we need to rework this feature to allow for state channels. I think the example by @dfornika is a good one to work with -- how would this example be written if we could define state channels? Another good example would be to implement a multiplier "workflow" that takes two integers (as lists of 0s and 1s) and produces the product. If we do go forward with this feature, I think it will be important to give some examples for when recursion should and should not be used. Many recursive/iterative use cases can just be implemented in an imperative language (a Python script) and then called in a single process. As far as I can tell, it is only when the state consists of files (especially large files) that recursion should be used at the workflow level. |
Beta Was this translation helpful? Give feedback.
-
Another thought I had is maybe Rewriting the nextflow.enable.dsl=2
nextflow.preview.recursion=true
process foo {
input:
path infiles
val i
path outfiles
output:
path infiles
val i_
path 'out_*.txt', includeInputs: true
script:
"""
echo "Task ${i} inputs: ${infiles[i]}" > out_${i}.txt
# ${i_ = i + 1}
"""
}
workflow {
infiles = channel.fromPath("sample*.txt").toSortedList(it->it.name)
foo.recurse( infiles, 0, [] ).until { infiles, i, outfiles -> (i == infiles.size()) }
foo.out[2].view(it -> it.text)
} Also this way you can keep track of inputs vs outputs. |
Beta Was this translation helpful? Give feedback.
-
How would recursion work with sub-workflows? The examples given so far appear to only involve recursive processes. |
Beta Was this translation helpful? Give feedback.
-
What if I have to change the parameters for each recursion iteration? Any ideas? |
Beta Was this translation helpful? Give feedback.
-
I also asked in the Nextflow-help chat and got a satisfying answer there. Thanks @bentsherman ! |
Beta Was this translation helpful? Give feedback.
-
is it possible to extend this to rerun a sub-workflow for different sets of inputs? sort of:
|
Beta Was this translation helpful? Give feedback.
-
How about an implementation like this? process append_square {
input:
val(x)
output:
val(y)
exec:
y = x + x[-1]**2
}
workflow MyRecursion {
take:
data
main:
result = data |
append_square |
branch {
done: it[-1] >= 10
feedback: true
}
emit:
result.done
recurse:
result.feedback
}
workflow {
Channel.of([2], [3], [4]) |
MyRecursion |
view
}
I see a few advantages to doing it this way:
|
Beta Was this translation helpful? Give feedback.
-
Hi, I'm failing to use the recursive pattern on a workflow. I'm very new to nextflow so please forgive me for basic errors. I'm trying to figure out how to use nextflow to "reduce" over an ordered set of items/files by a processes/tasks in an hierarchical manner, e.g. for a channel with 8 items, named a-h, I want to use nextflow to calculate something like
where In the following example include { CREATEDATA } from './module'
include { concat_wrapper as concat_wrapper1} from './module'
include { concat_wrapper as concat_wrapper2} from './module'
include { concat_wrapper as concat_wrapper3} from './module'
workflow {
(sortkeys, files) = Channel.from( 'A'..'H' ) | CREATEDATA
keyed_files = sortkeys.merge(files).toSortedList({ a, b -> a[0] <=> b[0] })
keyed_files.view()
concat_wrapper1(keyed_files)
concat_wrapper2(concat_wrapper1.out)
concat_wrapper3(concat_wrapper2.out)
concat_wrapper3.out.view()
} where process CREATEDATA {
input:
val key_in
output:
val key_out
path "${key_out}.txt"
script:
key_out = key_in
"""
echo $key_in >> ${key_in}.txt
"""
}
process CONCAT {
input:
tuple val(first_key), path(first_path), val(second_key), path(second_path)
output:
tuple val(key_out), path("${key_out}.txt")
script:
key_out = "${first_key}_${second_key}"
"""
touch ${key_out}.txt
cat $first_path >> ${key_out}.txt
cat $second_path >> ${key_out}.txt
"""
}
workflow concat_wrapper {
take:
keyed_files
main:
keyed_files_out = CONCAT(keyed_files.flatten().collate(4)).toSortedList({ a, b -> a[0] <=> b[0] })
emit:
keyed_files_out
} The above works as expected, but the manual looping over the nextflow.preview.recursion=true
include { CREATEDATA } from './module'
include { concat_wrapper} from './module'
workflow {
(sortkeys, files) = Channel.from( 'A'..'H' ) | CREATEDATA
keyed_files = sortkeys.merge(files).toSortedList({ a, b -> a[0] <=> b[0] })
keyed_files.view()
concat_wrapper.recurse(keyed_files).times(2)
concat_wrapper.out.view()
} But running the above does not work. And the terminal seems to hang after the first invocation of |
Beta Was this translation helpful? Give feedback.
-
So just to clarify, in the current state of the world, is there any option to proceed if we need to recurse and our initial recursion input happens to come from running another process or workflow? Something like:
|
Beta Was this translation helpful? Give feedback.
-
A basic question;
But if I wanted to evaluate the output in some other way, perhaps using a custom function, how would I do that? Say, for the sake of argument, I actually wanted to know how many times the letter i showed up in the output in order to call the recursion to a finish. I could easily design a process that counts the letter i, but how can that be used in this evaluation? |
Beta Was this translation helpful? Give feedback.
-
So does this mean none of the steps of the iteration will be cached? Since Input file is being modified? |
Beta Was this translation helpful? Give feedback.
-
Hi all, for me it seems that the Have a look at this minimal (non) working example:
This workflow mimics a Bayesian optimization loop in which in the first step candidates are generated which are then evaluated in parallel and are combined in the If I execute the workflow, it stops without error messate before Any idea what is going on there? Best, Johannes |
Beta Was this translation helpful? Give feedback.
-
Computational workflows have usually based on a DAG, i.e. direct-acyclic-graph of tasks to be computed, as such the concept of recursion or iteration does not fit well into this model.
However, there are not uncommon use cases in which recursion could be useful in a computational workflow.
Using Nextflows DSL1, it's possible to create an iteration using an output channel, linked to an upstream process, which essentially creates a continuous feedback loop.
https://nextflow-io.github.io/patterns/index.html#_feedback_loop
However, this is solution is quite tricky to implement, even more, it's not supported by Nextlflow DSL2, since the
channel.create
operation is not allowed anymore.Recursion in DSL2
As of version 21.11.0-edge, provides an - experimental - syntax that brings native support for recursion to Nextflow workflows, without the need to hack with channels creation.
The overall idea is to provide a new
recurse
operation that allow the re-execute the process or sub-workflow to which is applied with the last output produced as input for a fixed number of times or an until a user-provided condition is verified.As usual, an example is the best way to describe it:
In the snippet above
foo
represent an arbitrary Nextflow process taking two input values, nothing new has been added here.In order to repeat the execution of the
foo
process, the keywordrecurse
is applied to it, passing the initial input values to be used for the first iteration as an argument e.g.(10,20)
.The iteration i-th +1 will automatically take as inputs the outputs of the iteration i-th and stops after 4 iterations as stated by
.times(4)
.If we execute this piece of code, the following output is printed
In place of
times
is possible also to useuntil
to provide a condition to be evaluated to determine the stop condition. For example:Note that
until
received as arguments the outputs of the current iteration, therefore in the example aboveit
holdsresult.txt
file produced by the process execution, and the iteration terminates when the output file size is > 100 bytes.Running this snippet the following output is printed:
Accumulators
Another important use case when dealing with recursion is the ability to repeat the execution with not with just the previous output, but will the outputs produced by any previous executions.
When using reactive programming this is known as scan operation
Nextflow implements a similar concept applying the
scan
operation to a workflow or process component. For example:running the above snippet the following output will be printed
Note that when using
scan
it should not be provided with any termination condition, because the number of times the process (or the workflow is executed) only depends on items emitted by the input channel(s) as in any other Nextflow process.Limitations & cavetas
This is still an explorative feature with a lot of limitations:
recurse
orscan
theinput
andoutput
definitions, either for a process or a workflow, should be identicalrecurse
operation allows as input only object values and channel value, queue channel cannot be used.scan
operation contains both the actual input, plus the accumulated output, there's no way to distinguish them (apart from using hacking on the file names)Conclusion
This feature potential can have a big impact on Nextflow workflows, for this reason, I think it's very important to collect the feedback of the community or whoever may be interested in this feature.
Do you think it's useful? How it could be improved. Your comments are welcome
Resources
You can find the above examples at this repo.
Beta Was this translation helpful? Give feedback.
All reactions