Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize I/O operations #97

Merged
merged 14 commits into from
Nov 19, 2024
Merged

Conversation

Will-Tyler
Copy link
Contributor

Overview

Currently, vcztools view loads data from Zarr and writes the VCF output sequentially. These operations can be done in parallel. This pull request uses multi-threading to parallelize reading from Zarr and writing the VCF output.

I am opening this pull request as a draft because I think additional changes are likely needed.

Approach

The sequential implementation has the following form:

def vcztools_view():
    for chunk in chunks:
        handle_chunk(chunk)

def handle_chunk(chunk):
    load_dataset1()
    load_dataset2()
    write_output()

Here is the form of this pull request's multi-threading approach:

def vcztools_view():
    with ThreadPoolExecutor() as executor:
        preceding_future = None
        for chunk in chunks:
            future = executor.submit(handle_chunk, chunk, preceding_future)
            wait(preceding_future)
            preceding_future = future

def handle_chunk(chunk, preceding_future):
    load_dataset1()
    load_dataset2()
    wait(preceding_future)
    write_output()

This approach allows at most two threads to run concurrently and requires the preceding thread to finish before the current thread can start writing.

Testing

I added a multi-chunk dataset for unit testing. The dataset is a 100 variants by 100 samples subset of the chromosome 22 performance testing data. The variant and sample chunk sizes are each 10. Having a multi-chunk dataset should help us detect concurrent-programming errors such as not writing the output in the correct order.

Results

The results are disappointing so far. There is practically no improvement over the sequential implementation's results in #94.

bcftools view data/chr22.vcf.gz
1.67GiB 0:00:10 [ 162MiB/s] [                             <=>                                                                                                         ]

real    0m10.533s
user    0m10.142s
sys     0m0.481s

vcztools view data/chr22.vcz
1.67GiB 0:00:24 [70.1MiB/s] [                                                                <=>                                                                      ]

real    0m24.411s
user    0m23.836s
sys     0m4.271s

Profiling

Interestingly, the the profiling results show most of the time being spent on acquire.

         815373 function calls (794975 primitive calls) in 23.215 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      104   22.278    0.214   22.278    0.214 {method 'acquire' of '_thread.lock' objects}
      115    0.385    0.003    0.388    0.003 {built-in method _imp.create_dynamic}
      772    0.085    0.000    0.085    0.000 {method 'read' of '_io.BufferedReader' objects}
   115/77    0.064    0.001    0.137    0.002 {built-in method _imp.exec_dynamic}
      654    0.039    0.000    0.039    0.000 {built-in method marshal.loads}
      656    0.020    0.000    0.020    0.000 {built-in method io.open_code}
     3155    0.012    0.000    0.012    0.000 {method 'sub' of 're.Pattern' objects}

References

@Will-Tyler
Copy link
Contributor Author

I suspect that moving the variant-wise encoding for-loop to the VCZ encoder library will help.

for j in range(num_variants):
failed = True
while failed:
try:
line = encoder.encode(j, buflen)
failed = False
except _vcztools.VczBufferTooSmall:
buflen *= 2
# print("Bumping buflen to", buflen)
print(line, file=output)

This way the C extension can release the GIL for all of the lines rather than acquiring and releasing for every line.

@Will-Tyler
Copy link
Contributor Author

I pushed the for-loop down to the C extension, and now I am seeing the hoped-for results:

bcftools view data/chr22.vcf.gz
1.67GiB 0:00:10 [ 162MiB/s] [                             <=>                                                                                                         ]

real    0m10.525s
user    0m10.173s
sys     0m0.468s

vcztools view data/chr22.vcz
1.67GiB 0:00:17 [95.7MiB/s] [                                             <=>                                                                                         ]

real    0m17.874s
user    0m23.969s
sys     0m3.347s

To get this prototype working, I used puts to print the output from C. However, vcztools does not always write to the standard output, and I am thinking about the best way to write the output from the C extension. Some of Python's I/O objects do not have file descriptors associated with them. For example, the test cases appear to use a TextIO object that raises an exception when fileno() is called.

References

@Will-Tyler
Copy link
Contributor Author

In the latest commit, the encode_all method uses fputs and fputc to write the output to the output file descriptor. In Python land, if the output I/O object has a file descriptor, then Python passes it to encode_all. If not, such as in the test cases, then Python creates a pipe for encode_all to write to and copies the pipe data to the output I/O object.

With this approach, all the test cases pass locally and the performance is still improved.

I am not sure why the GitHub checks are failing. They seem to be unrelated to this pull request's changes.

@tomwhite
Copy link
Contributor

tomwhite commented Nov 5, 2024

I am not sure why the GitHub checks are failing. They seem to be unrelated to this pull request's changes.

Sorry @Will-Tyler that's my fault! The changes in #95 caused ruff to change Optional[X] to X | None since that is allowed in Python 3.10 (see https://docs.astral.sh/ruff/rules/non-pep604-annotation/).

You should be able to rebase and run pre-commit run -v --all-files to get ruff to update your changes. (You may need to pip install pre-commit first.)

@jeromekelleher
Copy link
Contributor

jeromekelleher commented Nov 5, 2024

Very nice @Will-Tyler! I'm nervous about the C code writing directly to the FD though, there's a lot that can go wrong here and we'd need to work quite a bit harder to make things air tight (e.g., there's no error checking at the moment, so if a write fails it's silent). What you can and can't do while holding Py_BEGIN_ALLOW_THREADS etc is also very subtle, and it definitely pays to keep things as simple as possible.

What I had in mind was to write multiple lines at once to a buffer, and then flush these buffers out in-order to the output in Python. So, you could have something like this:

  • Zarr decode thread: decodes variants chunks, and pushed onto a queue
  • VCF encode thread(s): encodes numpy array(s) slice to VCF, returning buffer and pushing onto a priority queue (tagged by the chunk and slice index)
  • IO Write thread - consumes encoded VCF buffers in order, writing to output

This is quite a bit more complicated that what you have here as you'd need to set up some Queues etc to keep the threads fed , but I think it would perform quite well ultimately. We wouldn't need to go to the full thing initially though, and I think just writing the output to a pre-supplied buffer from C would be a good start.

So, rather than writing one line, we write a range of lines. Also, let's just raise an exception from C if the buffer size is too small rather and do the doubling in Python than trying to do the adaptive updates in C.

Keeping things as simple as possible in C is really important here. There's no point in trying to optimise the corner cases (i.e, you've supplied a buffer size that's too small) - do the important nominal case thing fast in C, and leave everything else to Python.

Also - do as little as possible in the CPython interface code. If you find yourself doing anything that involves anything other than massaging input or output, it should probably be moved into the library code.

So, for this first PR, let's use your current architecture, but write the full chunk to a string that we return to Python rather than doing the IO in C. Does this sound OK?

@Will-Tyler
Copy link
Contributor Author

I think that to get the most from parallelism, the extension module should do as much work as possible with the GIL released. If I understand correctly, you are proposing to encode multiple lines to a single buffer and raise an exception from C if the buffer is too small. I'm concerned that if the number of lines is too small, then the module will acquire the GIL too frequently and if the number of lines is too large, then vcztools might redo a lot of work whenever the buffer is too small. And if Python does the I/O, that is less parallelism because the I/O thread would need the lock although simply printing does not seem to be a top time consumer.

What you can and can't do while holding Py_BEGIN_ALLOW_THREADS etc is also very subtle.

This is true. I believe my current implementation is thread-safe. The raw memory interface my implementation uses are documented as thread-safe.

There's a lot that can go wrong here and we'd need to work quite a bit harder to make things air tight (e.g., there's no error checking at the moment, so if a write fails it's silent).

I feel like I am missing something but it seems overall simpler and more efficient to use C's standard I/O API and add error handling rather than share buffers between threads and C and Python.

@jeromekelleher
Copy link
Contributor

The buffer won't be too small very often (we'll make the default much bigger, and make better guesses) so we don't need to worry about that. By writing multiple lines to at once, we can do a significant chunk of work with the GIL released. The difference between doing file IO in C and Python is negligible if you're writing a lot of text at a time, so there's no point in doing the IO in C unless you have to. It's easy to get stuff like this to work initially, but it's very hard to get it it fully air-tight and any minor efficiency you gain by doing stuff directly in C is bought at a high cost in subtle and difficult to track down bugs later on (I have war stories!).

I'm happy to pick this up and take it the last few steps, but I do want to take the C-level IO out before merging. You can just take out the encode_all method and not worry about the performance for now.

@Will-Tyler
Copy link
Contributor Author

I trust your judgement and experience—I reverted some of the changes so that this pull request is mostly about setting up the parallel threads.

@Will-Tyler Will-Tyler marked this pull request as ready for review November 6, 2024 18:31
@jeromekelleher jeromekelleher merged commit 4308f1b into sgkit-dev:main Nov 19, 2024
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants