Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,24 +141,24 @@ abstract class AbstractSplitter<T> implements SplitterStrategy {

setSource(source)


final chunks = collector = createCollector()
if( chunks instanceof CacheableCollector && chunks.checkCached() ) {
log.debug "Operator `$operatorName` reusing cached chunks at path: ${chunks.getBaseFile()}"
result = resumeFromCache(chunks)
this.collector = createCollector()
if( collector instanceof CacheableCollector && collector.checkCached() ) {
log.debug "Operator `$operatorName` reusing cached chunks at path: ${collector.getBaseFile()}"
result = resumeFromCache(collector)
}

else {
try {
def stream = normalizeSource(source)
final stream = normalizeSource(source)
result = process(stream)

if( collector instanceof CacheableCollector )
collector.markComplete()
}
catch ( StopSplitIterationException e ) {
log.trace 'Split iteration interrupted'
}
}


/*
* now close and return the result
* - when the target it's a channel, send stop message
Expand Down Expand Up @@ -246,7 +246,7 @@ abstract class AbstractSplitter<T> implements SplitterStrategy {
* @param index the current split count
* @return Either {@link groovyx.gpars.dataflow.DataflowChannel} or a {@code List} which holds the splitted chunks
*/
protected abstract process( T targetObject )
abstract protected process( T targetObject )

/**
* Normalise the source object to be splitted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ abstract class AbstractTextSplitter extends AbstractSplitter<Reader> {

private long itemsCount

@Override
AbstractTextSplitter options(Map options) {
super.options(options)

Expand Down Expand Up @@ -108,6 +109,7 @@ abstract class AbstractTextSplitter extends AbstractSplitter<Reader> {
* @return A {@link Reader} for the given object.
* @throws IllegalArgumentException if the object specified is of a type not supported
*/
@Override
protected Reader normalizeSource( obj ) {
Reader reader = normalizeSource0( obj )
// detect if starts with bom and position after it
Expand Down Expand Up @@ -178,6 +180,7 @@ abstract class AbstractTextSplitter extends AbstractSplitter<Reader> {
* @param offset
* @return
*/
@Override
protected process( Reader targetObject ) {

def result = null
Expand Down Expand Up @@ -244,6 +247,7 @@ abstract class AbstractTextSplitter extends AbstractSplitter<Reader> {
* @return A {@link CollectorStrategy} object implementing a concrete
* strategy according the user provided options
*/
@Override
protected CollectorStrategy createCollector() {

if( !isCollectorEnabled() )
Expand Down Expand Up @@ -326,7 +330,6 @@ abstract class AbstractTextSplitter extends AbstractSplitter<Reader> {
*/
abstract protected fetchRecord( BufferedReader reader )


protected int positionAfterBOM(Reader reader ){
if( !reader.markSupported() )
return 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class TextFileCollector implements CollectorStrategy, CacheableCollector, Header
return file.resolveSibling( fileName )
}

@Override
void setHeader(String value) {
this.header = value
}
Expand Down Expand Up @@ -139,8 +140,6 @@ class TextFileCollector implements CollectorStrategy, CacheableCollector, Header
@Override
void close() throws IOException {
closeWriter()
markComplete()
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,62 @@ class AbstractSplitterTest extends Specification {
thrown(IllegalArgumentException)
}

def 'test markComplete called when process succeeds'() {
given:
def mockCollector = Mock(TextFileCollector)
def splitter = new AbstractSplitter() {
@Override
protected Object process(Object targetObject) {
return "success"
}

@Override
protected Object normalizeSource(Object object) {
return object
}

@Override
protected CollectorStrategy createCollector() {
return mockCollector
}
}
splitter.target("test")

when:
splitter.apply()

then:
1 * mockCollector.checkCached() >> false
1 * mockCollector.markComplete()
}

def 'test markComplete not called when process fails'() {
given:
def mockCollector = Mock(TextFileCollector)
def splitter = new AbstractSplitter() {
@Override
protected Object process(Object targetObject) {
throw new OutOfMemoryError("Test error")
}

@Override
protected Object normalizeSource(Object object) {
return object
}

@Override
protected CollectorStrategy createCollector() {
return mockCollector
}
}
splitter.target("test")

when:
splitter.apply()

then:
thrown(OutOfMemoryError)
0 * mockCollector.markComplete()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,47 +36,48 @@ class TextFileCollectorTest extends Specification {

given:
def base = new TextFileCollector.CachePath(Paths.get('.'))
def buffer = new TextFileCollector(base)
def collector = new TextFileCollector(base)

expect:
buffer.getNextNameFor(Paths.get('/some/file.fa'),1) == Paths.get('/some/file.1.fa')
buffer.getNextNameFor(Paths.get('/some/file.fa'),2) == Paths.get('/some/file.2.fa')
buffer.getNextNameFor(Paths.get('/some/file.fa'),3) == Paths.get('/some/file.3.fa')
collector.getNextNameFor(Paths.get('/some/file.fa'),1) == Paths.get('/some/file.1.fa')
collector.getNextNameFor(Paths.get('/some/file.fa'),2) == Paths.get('/some/file.2.fa')
collector.getNextNameFor(Paths.get('/some/file.fa'),3) == Paths.get('/some/file.3.fa')
}

def 'test add text' () {

given:
def base = Files.createTempDirectory('test').resolve('sample.fasta')
def buffer = new TextFileCollector(new CachePath(base))
def collector = new TextFileCollector(new CachePath(base))

when:
buffer.add('>seq1\n')
buffer.add('alpha\n')
assert buffer.nextChunk() == base.resolveSibling('sample.1.fasta')
collector.add('>seq1\n')
collector.add('alpha\n')
assert collector.nextChunk() == base.resolveSibling('sample.1.fasta')

buffer.add('>seq2\n')
buffer.add('gamma\n')
buffer.add('>seq3\n')
buffer.add('beta\n')
assert buffer.nextChunk() == base.resolveSibling('sample.2.fasta')
collector.add('>seq2\n')
collector.add('gamma\n')
collector.add('>seq3\n')
collector.add('beta\n')
assert collector.nextChunk() == base.resolveSibling('sample.2.fasta')

buffer.add('>seq4\n')
buffer.add('kappa\n')
buffer.add('>seq5\n')
buffer.add('iota\n')
buffer.add('delta\n')
assert buffer.nextChunk() == base.resolveSibling('sample.3.fasta')
collector.add('>seq4\n')
collector.add('kappa\n')
collector.add('>seq5\n')
collector.add('iota\n')
collector.add('delta\n')
assert collector.nextChunk() == base.resolveSibling('sample.3.fasta')

buffer.close()
collector.markComplete()
collector.close()

then:
base.resolveSibling('sample.1.fasta').text == '>seq1\nalpha\n'
base.resolveSibling('sample.2.fasta').text == '>seq2\ngamma\n>seq3\nbeta\n'
base.resolveSibling('sample.3.fasta').text == '>seq4\nkappa\n>seq5\niota\ndelta\n'
base.resolveSibling('.chunks.sample.fasta').exists()
buffer.checkCached()
buffer.getAllChunks()*.name == ['sample.1.fasta','sample.2.fasta','sample.3.fasta']
collector.checkCached()
collector.getAllChunks()*.name == ['sample.1.fasta','sample.2.fasta','sample.3.fasta']

cleanup:
base?.parent?.deleteDir()
Expand All @@ -89,29 +90,29 @@ class TextFileCollectorTest extends Specification {
def base = Files.createTempDirectory('test').resolve('chunk.fasta')

when:
def buffer = new TextFileCollector(new CachePath(base))
def collector = new TextFileCollector(new CachePath(base))
then:
!buffer.hasChunk()
!collector.hasChunk()

when:
buffer.add('>seq1\n')
buffer.add('alpha\n')
collector.add('>seq1\n')
collector.add('alpha\n')
then:
buffer.hasChunk()
collector.hasChunk()

when:
buffer.nextChunk()
buffer.add('>seq2\n')
buffer.add('gamma\n')
buffer.add('>seq3\n')
buffer.add('beta\n')
collector.nextChunk()
collector.add('>seq2\n')
collector.add('gamma\n')
collector.add('>seq3\n')
collector.add('beta\n')
then:
buffer.hasChunk()
collector.hasChunk()

when:
buffer.nextChunk()
collector.nextChunk()
then:
!buffer.hasChunk()
!collector.hasChunk()

cleanup:
base?.parent?.deleteDir()
Expand Down Expand Up @@ -141,6 +142,7 @@ class TextFileCollectorTest extends Specification {
collector.add('delta\n')
assert collector.nextChunk() == base.resolveSibling('sample.3.fasta.gz')

collector.markComplete()
collector.close()

then:
Expand Down Expand Up @@ -171,6 +173,7 @@ class TextFileCollectorTest extends Specification {
collector.add('zzz')
assert collector.nextChunk() == base.resolveSibling('sample.3.fasta')

collector.markComplete()
collector.close()

then:
Expand Down
Loading