Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge_partitions is slow with datasets opened without chunks argument #105

Closed
veenstrajelmer opened this issue Jun 27, 2023 · 6 comments · Fixed by #280
Closed

merge_partitions is slow with datasets opened without chunks argument #105

veenstrajelmer opened this issue Jun 27, 2023 · 6 comments · Fixed by #280

Comments

@veenstrajelmer
Copy link
Collaborator

veenstrajelmer commented Jun 27, 2023

When using xu.merge_partitions the process is fast (2 seconds) when providing chunks={'time':1} to xr.open_dataset. When leaving out the chunks argument (or chunks=None) the process is way slower (56 seconds). The topology is the same for all timesteps, so I would not expect a major difference. Furthermore, this particular dataset only has 4 timesteps.

import xugrid as xu
import glob
import datetime as dt

chunks = {'time':1} #merging takes 2 sec
#chunks = None #merging takes 56 sec

file_nc = r'c:\DATA\dfm_tools_testdata\DFM_grevelingen_3D\Grevelingen-FM_0*_map.nc'
file_nc_list = glob.glob(file_nc)

partitions = []
for iF, file_nc_one in enumerate(file_nc_list):
    uds_one = xu.open_dataset(file_nc_one, chunks=chunks)
    partitions.append(uds_one)

print(f'>> xu.merge_partitions() with {len(file_nc_list)} partition(s): ',end='')
dtstart = dt.datetime.now()
uds = xu.merge_partitions(partitions)
print(f'{(dt.datetime.now()-dtstart).total_seconds():.2f} sec')

Version: xugrid 0.5.0, also with 0.4.0 and 0.3.0

@veenstrajelmer veenstrajelmer changed the title merge_partitions takes ages with dataset opened without chunks argument merge_partitions is slow with datasets opened without chunks argument Jun 27, 2023
@Huite
Copy link
Collaborator

Huite commented Jul 17, 2023

Are you sure this is unexpected? The chunked version will not load any data variables from disk into memory, whereas the unchunked version will. So we're looking at either loading only the topology variables into memory versus loading all variables into memory.

@veenstrajelmer
Copy link
Collaborator Author

When using chunks=None, dask is disabled and xarray uses numpy arrays, which is slow. There are alternatives, but the uses has to be aware of the operations they want to do and choose the most performant chunks argument. For merging only, these are some timings:

chunks = {'time':1} #merging takes 0.5 sec
chunks = 'auto' #merging takes 0.5 sec
chunks = {} #merging takes 0.5 sec
chunks = None #merging takes 56 sec

@veenstrajelmer veenstrajelmer closed this as not planned Won't fix, can't repro, duplicate, stale Jul 19, 2023
@veenstrajelmer
Copy link
Collaborator Author

veenstrajelmer commented Jul 10, 2024

I'd like to revive this issue again, since chunks='auto' often fails with "ValueError: Object has inconsistent chunks along dimension time. This can be fixed by calling unify_chunks().", like in the below example with the RMM dataset, also after fixing #253. This can probably be resolved (help appreciated). However, the merging process is independent of time, would it not be possible to make the merging faster by making it independent of the time chunks?

import xugrid as xu
import glob
import datetime as dt

chunks = {'time':1} #merging takes 1 sec for Grevelingen, 5 sec for RMM
# chunks = None #merging takes 56 sec for Grevelingen, takes ages for RMM
# chunks = 'auto' #merging takes 1 sec for Grevelingen, but RMM raises ValueError: Object has inconsistent chunks along dimension time. This can be fixed by calling unify_chunks().

# file_nc = r'c:\DATA\dfm_tools_testdata\DFM_grevelingen_3D\Grevelingen-FM_0*_map.nc'
import dfm_tools as dfmt
file_nc = dfmt.data.fm_grevelingen_map(return_filepath=True) # 'Grevelingen-FM_0*_map.nc'
file_nc = 'p:\\archivedprojects\\11206813-006-kpp2021_rmm-2d\\C_Work\\31_RMM_FMmodel\\computations\\model_setup\\run_207\\results\\RMM_dflowfm_0*_map.nc' #RMM 2D
file_nc_list = glob.glob(file_nc)

partitions = []
for iF, file_nc_one in enumerate(file_nc_list):
    uds_one = xu.open_dataset(file_nc_one, chunks=chunks)
    partitions.append(uds_one)

print(f'>> xu.merge_partitions() with {len(file_nc_list)} partition(s): ',end='')
dtstart = dt.datetime.now()
uds = xu.merge_partitions(partitions)
print(f'{(dt.datetime.now()-dtstart).total_seconds():.2f} sec')

@Huite
Copy link
Collaborator

Huite commented Jul 11, 2024

The inconsistent chunks makes sense right? Because you're opening the datasets one by one, xarray/dask will pick a chunksize per dataset. Since the datasets are differently sized, the chunks are also differently sized. I'm not sure unify_chunks will do the right thing, although we could call it automatically. When does the error show up?

What does chunks='auto' do, I guess it always chunks along time only since due to layout of the file? We could make sure any non-UGRID dimension is chunked consistently along partitions, by finding the smallest chunksize of the partitions. But it's not great to rely on chunks='auto' here anyway, because it's using the partitioned datasets to define the chunksize, while you probably want a chunksize that's good for the merged one. But if you're using "auto", you're doing it because you're unsure about the appropriate size, and you'll probably also be unsure about the right chunk size for the merged result... but I guess at the least, we should make it a bit more robust if you did choose chunks=auto.

I'm not sure what you mean exactly with "faster ... independent of time". The merging occurs through an xarray isel operation, which calls a dask indexing operation behind the scenes. This is "independent of time" in the sense that each time chunk will use the same indices. Of course, when you require the data, it will have to do the work for each chunk.

@veenstrajelmer
Copy link
Collaborator Author

veenstrajelmer commented Aug 14, 2024

Some additional insights. It probably all makes sense, but the behavior is different for different datasets:

  • RMM with chunks = {'time':1} is fine, but with chunks = 'auto' it raises "ValueError: Object has inconsistent chunks along dimension time. This can be fixed by calling unify_chunks()."
  • DCSM with chunks = 'auto' is fine, but with chunks = {'time':1} it raises "ValueError: Object has inconsistent chunks along dimension mesh2d_nEdges. This can be fixed by calling unify_chunks()."
  • DCSM with chunks = {'time':1} also gives the following warning for each partition: "UserWarning: The specified chunks separate the stored chunks along dimension "time" starting at index 1. This could degrade performance. Instead, consider rechunking after loading."

Example for 2 partitions of RMM and 2 partitions of DCSM output:

import xugrid as xu
import glob
import datetime as dt

chunks = {'time':1} # merging takes 0.73 sec for 2 RMM partitions. Merging 2 DCSM partions raises "ValueError: Object has inconsistent chunks along dimension mesh2d_nEdges. This can be fixed by calling unify_chunks()." (and Userwarning separate stored chunks)
# chunks = 'auto' # merging RMM raises "ValueError: Object has inconsistent chunks along dimension time. This can be fixed by calling unify_chunks().". Merging DCSM takes 5.8 sec

file_nc = 'p:\\archivedprojects\\11206813-006-kpp2021_rmm-2d\\C_Work\\31_RMM_FMmodel\\computations\\model_setup\\run_207\\results\\RMM_dflowfm_0*_map.nc' #RMM 2D
file_nc = r"p:\11210284-011-nose-c-cycling\runs_fine_grid\B05_waq_2012_PCO2_ChlC_NPCratios_DenWat_stats_2023.01\B05_waq_2012_PCO2_ChlC_NPCratios_DenWat_stats_2023.01\DFM_OUTPUT_DCSM-FM_0_5nm_waq\DCSM-FM_0_5nm_waq_000*_map.nc"

file_nc_list = glob.glob(file_nc)
file_nc_list = file_nc_list[:2]

partitions = []
for iF, file_nc_one in enumerate(file_nc_list):
    uds_one = xu.open_dataset(file_nc_one, chunks=chunks)
    # uds_one = uds_one.unify_chunks() # works for DCSM, not for RMM
    partitions.append(uds_one)

print(f'>> xu.merge_partitions() with {len(file_nc_list)} partition(s): ',end='')
dtstart = dt.datetime.now()
uds = xu.merge_partitions(partitions)
print(f'{(dt.datetime.now()-dtstart).total_seconds():.2f} sec')

@Huite
Copy link
Collaborator

Huite commented Aug 14, 2024

chunks="auto" should work fine, but resulted in inconsistent chunks error. This will be fixed by: #280

After merging the PR, using "auto" will give reasonable performance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants