Skip to content

Commit 9930e35

Browse files
authored
Fix invalid cache write when cacheable split operation fails (#6588)
Signed-off-by: Ben Sherman <[email protected]>
1 parent 7c40ee5 commit 9930e35

File tree

5 files changed

+110
-47
lines changed

5 files changed

+110
-47
lines changed

modules/nextflow/src/main/groovy/nextflow/splitter/AbstractSplitter.groovy

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,24 +141,24 @@ abstract class AbstractSplitter<T> implements SplitterStrategy {
141141

142142
setSource(source)
143143

144-
145-
final chunks = collector = createCollector()
146-
if( chunks instanceof CacheableCollector && chunks.checkCached() ) {
147-
log.debug "Operator `$operatorName` reusing cached chunks at path: ${chunks.getBaseFile()}"
148-
result = resumeFromCache(chunks)
144+
this.collector = createCollector()
145+
if( collector instanceof CacheableCollector && collector.checkCached() ) {
146+
log.debug "Operator `$operatorName` reusing cached chunks at path: ${collector.getBaseFile()}"
147+
result = resumeFromCache(collector)
149148
}
150-
151149
else {
152150
try {
153-
def stream = normalizeSource(source)
151+
final stream = normalizeSource(source)
154152
result = process(stream)
153+
154+
if( collector instanceof CacheableCollector )
155+
collector.markComplete()
155156
}
156157
catch ( StopSplitIterationException e ) {
157158
log.trace 'Split iteration interrupted'
158159
}
159160
}
160161

161-
162162
/*
163163
* now close and return the result
164164
* - when the target it's a channel, send stop message
@@ -246,7 +246,7 @@ abstract class AbstractSplitter<T> implements SplitterStrategy {
246246
* @param index the current split count
247247
* @return Either {@link groovyx.gpars.dataflow.DataflowChannel} or a {@code List} which holds the splitted chunks
248248
*/
249-
protected abstract process( T targetObject )
249+
abstract protected process( T targetObject )
250250

251251
/**
252252
* Normalise the source object to be splitted

modules/nextflow/src/main/groovy/nextflow/splitter/AbstractTextSplitter.groovy

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ abstract class AbstractTextSplitter extends AbstractSplitter<Reader> {
5656

5757
private long itemsCount
5858

59+
@Override
5960
AbstractTextSplitter options(Map options) {
6061
super.options(options)
6162

@@ -108,6 +109,7 @@ abstract class AbstractTextSplitter extends AbstractSplitter<Reader> {
108109
* @return A {@link Reader} for the given object.
109110
* @throws IllegalArgumentException if the object specified is of a type not supported
110111
*/
112+
@Override
111113
protected Reader normalizeSource( obj ) {
112114
Reader reader = normalizeSource0( obj )
113115
// detect if starts with bom and position after it
@@ -178,6 +180,7 @@ abstract class AbstractTextSplitter extends AbstractSplitter<Reader> {
178180
* @param offset
179181
* @return
180182
*/
183+
@Override
181184
protected process( Reader targetObject ) {
182185

183186
def result = null
@@ -244,6 +247,7 @@ abstract class AbstractTextSplitter extends AbstractSplitter<Reader> {
244247
* @return A {@link CollectorStrategy} object implementing a concrete
245248
* strategy according the user provided options
246249
*/
250+
@Override
247251
protected CollectorStrategy createCollector() {
248252

249253
if( !isCollectorEnabled() )
@@ -326,7 +330,6 @@ abstract class AbstractTextSplitter extends AbstractSplitter<Reader> {
326330
*/
327331
abstract protected fetchRecord( BufferedReader reader )
328332

329-
330333
protected int positionAfterBOM(Reader reader ){
331334
if( !reader.markSupported() )
332335
return 0

modules/nextflow/src/main/groovy/nextflow/splitter/TextFileCollector.groovy

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ class TextFileCollector implements CollectorStrategy, CacheableCollector, Header
7979
return file.resolveSibling( fileName )
8080
}
8181

82+
@Override
8283
void setHeader(String value) {
8384
this.header = value
8485
}
@@ -139,8 +140,6 @@ class TextFileCollector implements CollectorStrategy, CacheableCollector, Header
139140
@Override
140141
void close() throws IOException {
141142
closeWriter()
142-
markComplete()
143143
}
144144

145-
146145
}

modules/nextflow/src/test/groovy/nextflow/splitter/AbstractSplitterTest.groovy

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,62 @@ class AbstractSplitterTest extends Specification {
119119
thrown(IllegalArgumentException)
120120
}
121121

122+
def 'test markComplete called when process succeeds'() {
123+
given:
124+
def mockCollector = Mock(TextFileCollector)
125+
def splitter = new AbstractSplitter() {
126+
@Override
127+
protected Object process(Object targetObject) {
128+
return "success"
129+
}
130+
131+
@Override
132+
protected Object normalizeSource(Object object) {
133+
return object
134+
}
135+
136+
@Override
137+
protected CollectorStrategy createCollector() {
138+
return mockCollector
139+
}
140+
}
141+
splitter.target("test")
142+
143+
when:
144+
splitter.apply()
145+
146+
then:
147+
1 * mockCollector.checkCached() >> false
148+
1 * mockCollector.markComplete()
149+
}
150+
151+
def 'test markComplete not called when process fails'() {
152+
given:
153+
def mockCollector = Mock(TextFileCollector)
154+
def splitter = new AbstractSplitter() {
155+
@Override
156+
protected Object process(Object targetObject) {
157+
throw new OutOfMemoryError("Test error")
158+
}
159+
160+
@Override
161+
protected Object normalizeSource(Object object) {
162+
return object
163+
}
164+
165+
@Override
166+
protected CollectorStrategy createCollector() {
167+
return mockCollector
168+
}
169+
}
170+
splitter.target("test")
171+
172+
when:
173+
splitter.apply()
174+
175+
then:
176+
thrown(OutOfMemoryError)
177+
0 * mockCollector.markComplete()
178+
}
179+
122180
}

modules/nextflow/src/test/groovy/nextflow/splitter/TextFileCollectorTest.groovy

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -36,47 +36,48 @@ class TextFileCollectorTest extends Specification {
3636

3737
given:
3838
def base = new TextFileCollector.CachePath(Paths.get('.'))
39-
def buffer = new TextFileCollector(base)
39+
def collector = new TextFileCollector(base)
4040

4141
expect:
42-
buffer.getNextNameFor(Paths.get('/some/file.fa'),1) == Paths.get('/some/file.1.fa')
43-
buffer.getNextNameFor(Paths.get('/some/file.fa'),2) == Paths.get('/some/file.2.fa')
44-
buffer.getNextNameFor(Paths.get('/some/file.fa'),3) == Paths.get('/some/file.3.fa')
42+
collector.getNextNameFor(Paths.get('/some/file.fa'),1) == Paths.get('/some/file.1.fa')
43+
collector.getNextNameFor(Paths.get('/some/file.fa'),2) == Paths.get('/some/file.2.fa')
44+
collector.getNextNameFor(Paths.get('/some/file.fa'),3) == Paths.get('/some/file.3.fa')
4545
}
4646

4747
def 'test add text' () {
4848

4949
given:
5050
def base = Files.createTempDirectory('test').resolve('sample.fasta')
51-
def buffer = new TextFileCollector(new CachePath(base))
51+
def collector = new TextFileCollector(new CachePath(base))
5252

5353
when:
54-
buffer.add('>seq1\n')
55-
buffer.add('alpha\n')
56-
assert buffer.nextChunk() == base.resolveSibling('sample.1.fasta')
54+
collector.add('>seq1\n')
55+
collector.add('alpha\n')
56+
assert collector.nextChunk() == base.resolveSibling('sample.1.fasta')
5757

58-
buffer.add('>seq2\n')
59-
buffer.add('gamma\n')
60-
buffer.add('>seq3\n')
61-
buffer.add('beta\n')
62-
assert buffer.nextChunk() == base.resolveSibling('sample.2.fasta')
58+
collector.add('>seq2\n')
59+
collector.add('gamma\n')
60+
collector.add('>seq3\n')
61+
collector.add('beta\n')
62+
assert collector.nextChunk() == base.resolveSibling('sample.2.fasta')
6363

64-
buffer.add('>seq4\n')
65-
buffer.add('kappa\n')
66-
buffer.add('>seq5\n')
67-
buffer.add('iota\n')
68-
buffer.add('delta\n')
69-
assert buffer.nextChunk() == base.resolveSibling('sample.3.fasta')
64+
collector.add('>seq4\n')
65+
collector.add('kappa\n')
66+
collector.add('>seq5\n')
67+
collector.add('iota\n')
68+
collector.add('delta\n')
69+
assert collector.nextChunk() == base.resolveSibling('sample.3.fasta')
7070

71-
buffer.close()
71+
collector.markComplete()
72+
collector.close()
7273

7374
then:
7475
base.resolveSibling('sample.1.fasta').text == '>seq1\nalpha\n'
7576
base.resolveSibling('sample.2.fasta').text == '>seq2\ngamma\n>seq3\nbeta\n'
7677
base.resolveSibling('sample.3.fasta').text == '>seq4\nkappa\n>seq5\niota\ndelta\n'
7778
base.resolveSibling('.chunks.sample.fasta').exists()
78-
buffer.checkCached()
79-
buffer.getAllChunks()*.name == ['sample.1.fasta','sample.2.fasta','sample.3.fasta']
79+
collector.checkCached()
80+
collector.getAllChunks()*.name == ['sample.1.fasta','sample.2.fasta','sample.3.fasta']
8081

8182
cleanup:
8283
base?.parent?.deleteDir()
@@ -89,29 +90,29 @@ class TextFileCollectorTest extends Specification {
8990
def base = Files.createTempDirectory('test').resolve('chunk.fasta')
9091

9192
when:
92-
def buffer = new TextFileCollector(new CachePath(base))
93+
def collector = new TextFileCollector(new CachePath(base))
9394
then:
94-
!buffer.hasChunk()
95+
!collector.hasChunk()
9596

9697
when:
97-
buffer.add('>seq1\n')
98-
buffer.add('alpha\n')
98+
collector.add('>seq1\n')
99+
collector.add('alpha\n')
99100
then:
100-
buffer.hasChunk()
101+
collector.hasChunk()
101102

102103
when:
103-
buffer.nextChunk()
104-
buffer.add('>seq2\n')
105-
buffer.add('gamma\n')
106-
buffer.add('>seq3\n')
107-
buffer.add('beta\n')
104+
collector.nextChunk()
105+
collector.add('>seq2\n')
106+
collector.add('gamma\n')
107+
collector.add('>seq3\n')
108+
collector.add('beta\n')
108109
then:
109-
buffer.hasChunk()
110+
collector.hasChunk()
110111

111112
when:
112-
buffer.nextChunk()
113+
collector.nextChunk()
113114
then:
114-
!buffer.hasChunk()
115+
!collector.hasChunk()
115116

116117
cleanup:
117118
base?.parent?.deleteDir()
@@ -141,6 +142,7 @@ class TextFileCollectorTest extends Specification {
141142
collector.add('delta\n')
142143
assert collector.nextChunk() == base.resolveSibling('sample.3.fasta.gz')
143144

145+
collector.markComplete()
144146
collector.close()
145147

146148
then:
@@ -171,6 +173,7 @@ class TextFileCollectorTest extends Specification {
171173
collector.add('zzz')
172174
assert collector.nextChunk() == base.resolveSibling('sample.3.fasta')
173175

176+
collector.markComplete()
174177
collector.close()
175178

176179
then:

0 commit comments

Comments
 (0)