Skip to content

Commit

Permalink
Merge pull request #812 from rhpvorderman/multithread_bam_read
Browse files Browse the repository at this point in the history
Multithread bam read
  • Loading branch information
marcelm authored Nov 13, 2024
2 parents ff5ba46 + 8edb888 commit 628ce98
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ development version
Option ``-Z`` (equivalent to ``--compression-level=1``) is now deprecated.
* The previously hidden option ``--compression-level`` is now shown in the
``--help`` output.
* :issue:`812`: Fixed an issue where multithreaded reading of unaligned BAM
files would cause an error.
* :issue:`820`: On Bioconda, Cutadapt is now also available for ARM64 Macs (M1/M2).
* :issue:`817`: Fixed missing statistics when ``--revcomp`` was used for
paired-end data.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers = [
requires-python = ">=3.9"
dynamic = ["version"]
dependencies = [
"dnaio >= 1.2.0",
"dnaio >= 1.2.3",
"xopen >= 1.6.0",
]

Expand Down
9 changes: 8 additions & 1 deletion src/cutadapt/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,21 @@ def __init__(
self,
*files: BinaryIO,
interleaved: bool = False,
fileformat=None,
):
self._files = files
self.interleaved = interleaved
self.fileformat = fileformat
for f in self._files:
assert f is not None

def open(self):
return dnaio.open(*self._files, interleaved=self.interleaved, mode="r")
return dnaio.open(
*self._files,
interleaved=self.interleaved,
mode="r",
fileformat=self.fileformat,
)

def close(self) -> None:
for file in self._files:
Expand Down
17 changes: 15 additions & 2 deletions src/cutadapt/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def __init__(
read_pipe: Connection,
write_pipe: Connection,
need_work_queue: multiprocessing.Queue,
file_format,
):
super().__init__()
self._id = id_
Expand All @@ -168,6 +169,7 @@ def __init__(
self._write_pipe = write_pipe
self._need_work_queue = need_work_queue
self._proxy_files = proxy_files
self._file_format = file_format

def run(self):
try:
Expand All @@ -189,7 +191,11 @@ def run(self):
io.BytesIO(self._read_pipe.recv_bytes())
for _ in range(self._n_input_files)
]
infiles = InputFiles(*files, interleaved=self._interleaved_input)
infiles = InputFiles(
*files,
interleaved=self._interleaved_input,
fileformat=self._file_format,
)
(n, bp1, bp2) = self._pipeline.process_reads(infiles)
stats += Statistics().collect(n, bp1, bp2, [], [])
self._send_outfiles(chunk_index, n)
Expand Down Expand Up @@ -320,7 +326,13 @@ def __init__(
)
self._reader_process.daemon = True
self._reader_process.start()
self._input_file_format = self._try_receive(file_format_connection_r)
self._input_file_format: FileFormat = self._try_receive(
file_format_connection_r
)
self._file_format_string = self._input_file_format.name.lower()
if self._file_format_string == "bam":
# Individual BAM record chunks will have no header
self._file_format_string = "bam_no_header"

def _start_workers(
self, pipeline, proxy_files
Expand All @@ -338,6 +350,7 @@ def _start_workers(
self._connections[index],
conn_w,
self._need_work_queue,
file_format=self._file_format_string,
)
worker.daemon = True
worker.start()
Expand Down
12 changes: 12 additions & 0 deletions tests/cut/small_from_bam.fastq
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
@prefix:1_13_573
CGTCCGAANTAGCTACCACCCTGA
+
)3%)&&&&!.1&(6:<'67..*,:
@prefix:1_13_1259
AGCCGCTANGACGGGTTGGCCC
+
;<:&:A;A!9<<<,7:<=3=;:
@prefix:1_13_1440
CAAGATCTNCCCTGCCACATTGCCCTAGTTAAAC
+
<=A:A=57!7<';<6?5;;6:+:=)71>70<,=:
Binary file added tests/data/small.bam
Binary file not shown.
8 changes: 8 additions & 0 deletions tests/test_commandline.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ def test_small(run):
run("-a TTAGACATATCTCCGTCG", "small.fastq", "small.fastq")


def test_small_bam(run, cores):
run(
"--cores {} -a TTAGACATATCTCCGTCG".format(cores),
"small_from_bam.fastq",
"small.bam",
)


def test_empty_fastq(run, cores):
run("--cores {} -a TTAGACATATCTCCGTCG".format(cores), "empty.fastq", "empty.fastq")

Expand Down

0 comments on commit 628ce98

Please sign in to comment.