Skip to content

Commit c2a3e95

Browse files
committed
Add mix operator
Signed-off-by: Paolo Di Tommaso <[email protected]>
1 parent 8639e4d commit c2a3e95

File tree

2 files changed

+16
-5
lines changed

2 files changed

+16
-5
lines changed

modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717

1818
package nextflow.extension
1919

20-
import static nextflow.extension.DataflowHelper.*
21-
2220
import java.util.concurrent.atomic.AtomicInteger
2321

2422
import groovy.transform.CompileStatic
2523
import groovyx.gpars.dataflow.DataflowReadChannel
2624
import groovyx.gpars.dataflow.DataflowWriteChannel
2725
import groovyx.gpars.dataflow.operator.DataflowProcessor
2826
import nextflow.Channel
27+
import nextflow.extension.op.ContextRunPerThread
2928
import nextflow.extension.op.Op
29+
import nextflow.extension.op.OpContext
3030

3131
/**
3232
* Implements Nextflow Mix operator
@@ -39,6 +39,7 @@ class MixOp {
3939
private DataflowReadChannel source
4040
private List<DataflowReadChannel> others
4141
private DataflowWriteChannel target
42+
private OpContext context = new ContextRunPerThread()
4243

4344
MixOp(DataflowReadChannel source, DataflowReadChannel other) {
4445
this.source = source
@@ -71,14 +72,22 @@ class MixOp {
7172
onComplete: { DataflowProcessor proc -> if(count.decrementAndGet()==0) { Op.bind(proc, target, Channel.STOP) } }
7273
]
7374

74-
subscribeImpl(source, handlers)
75+
subscribe0(source, handlers)
7576
for( def it : others ) {
76-
subscribeImpl(it, handlers)
77+
subscribe0(it, handlers)
7778
}
7879

7980
final allSources = [source]
8081
allSources.addAll(others)
8182
return target
8283
}
8384

85+
private void subscribe0(final DataflowReadChannel source, final Map<String,Closure> events ) {
86+
new SubscribeOp()
87+
.withSource(source)
88+
.withContext(context)
89+
.withEvents(events)
90+
.apply()
91+
}
92+
8493
}

modules/nextflow/src/test/groovy/nextflow/prov/ProvTest.groovy

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,6 @@ class ProvTest extends Dsl2Spec {
574574
.name == ['p1 (5)']
575575
}
576576

577-
@Ignore
578577
def 'should track provenance with mix operator'() {
579578
when:
580579
dsl_eval(globalConfig(), '''
@@ -608,6 +607,9 @@ class ProvTest extends Dsl2Spec {
608607
then:
609608
upstreamTasksOf('p3 (1)')
610609
.name == ['p1 (1)']
610+
and:
611+
upstreamTasksOf('p3 (2)')
612+
.name == ['p2 (1)']
611613

612614
}
613615

0 commit comments

Comments
 (0)