From 91d237f514d39d2cbc4f0e497c27fcb83c9d318e Mon Sep 17 00:00:00 2001 From: Marcel Martin Date: Tue, 23 Apr 2024 23:19:18 +0200 Subject: [PATCH] Open files in main process, send descriptors to reader process --- pyproject.toml | 2 +- src/cutadapt/runners.py | 53 +++++++++++++++++++++++++++++------------ 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 73511a88..b42d72b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,7 @@ requires-python = ">=3.8" dynamic = ["version"] dependencies = [ "dnaio >= 1.2.0", - "xopen >= 1.6.0", + "xopen >= 2.0.1", ] [project.urls] diff --git a/src/cutadapt/runners.py b/src/cutadapt/runners.py index 2bf4e09f..389c5609 100644 --- a/src/cutadapt/runners.py +++ b/src/cutadapt/runners.py @@ -50,34 +50,33 @@ class ReaderProcess(mpctx_Process): def __init__( self, - *paths: str, + input_fds_pipe: multiprocessing.Pipe, + n_files: int, file_format_connection: Connection, connections: Sequence[Connection], queue: multiprocessing.Queue, buffer_size: int, - stdin_fd, + stdin_fd: int, ): """ Args: - paths: path to input files + input_fds_pipe: a Pipe through which file descriptors for the input files are + sent + n_files: Number of input files (this many file descriptors need to be sent + over the input_fds_pipe) connections: a list of Connection objects, one for each worker. queue: a Queue of worker indices. A worker writes its own index into this queue to notify the reader that it is ready to receive more data. buffer_size: stdin_fd: - - Note: - This expects the paths to the input files as strings because these can be pickled - while file-like objects such as BufferedReader cannot. When using multiprocessing with - the "spawn" method, which is the default method on macOS, function arguments must be - picklable. """ super().__init__() - if len(paths) > 2: + self._input_fds_pipe = input_fds_pipe + if n_files > 2: raise ValueError("Reading from more than two files currently not supported") - if not paths: + if n_files < 1: raise ValueError("Must provide at least one file") - self._paths = paths + self._n_files = n_files self._file_format_connection = file_format_connection self.connections = connections self.queue = queue @@ -91,9 +90,17 @@ def run(self): try: with ExitStack() as stack: try: + fds = [ + multiprocessing.reduction.recv_handle(self._input_fds_pipe) + for _ in range(self._n_files) + ] + self._input_fds_pipe.close() + raw_files = [ + stack.enter_context(os.fdopen(fd, mode="rb")) for fd in fds + ] files = [ - stack.enter_context(xopen_rb_raise_limit(path)) - for path in self._paths + stack.enter_context(xopen_rb_raise_limit(file)) + for file in raw_files ] file_format = detect_file_format(files[0]) except Exception as e: @@ -311,8 +318,10 @@ def __init__( fileno = -1 file_format_connection_r, file_format_connection_w = mpctx.Pipe(duplex=False) + input_fds_pipe_r, input_fds_pipe_w = mpctx.Pipe() self._reader_process = ReaderProcess( - *inpaths.paths, + input_fds_pipe_r, + n_files=len(inpaths.paths), file_format_connection=file_format_connection_w, connections=connw, queue=self._need_work_queue, @@ -321,6 +330,20 @@ def __init__( ) self._reader_process.daemon = True self._reader_process.start() + + pid = 0 # FIXME needed on Windows + for path in inpaths.paths: + if path == "-": + multiprocessing.reduction.send_handle( + input_fds_pipe_w, sys.stdin.fileno(), pid + ) + else: + with open(path, "rb") as f: + multiprocessing.reduction.send_handle( + input_fds_pipe_w, f.fileno(), pid + ) + input_fds_pipe_w.close() + file_format: Optional[FileFormat] = self._try_receive(file_format_connection_r) if file_format is None: raise dnaio.exceptions.UnknownFileFormat(