Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raster io doesn't work with multiprocessor #512

Open
longshuicy opened this issue Mar 6, 2024 · 3 comments
Open

Raster io doesn't work with multiprocessor #512

longshuicy opened this issue Mar 6, 2024 · 3 comments
Labels
3storypoints Between 7-15 hours of work, requiring back and forth communications to clarify a complex problem bug Something isn't working

Comments

@longshuicy
Copy link
Member

longshuicy commented Mar 6, 2024

File <stringsource>:2, in rasterio._io.DatasetReaderBase.__reduce_cython__()
TypeError: self._hds cannot be converted to a Python object for pickling

Complete error see below comments

To reproduce the error
try this notebook box link: https://uofi.app.box.com/folder/252253923688


Might related to rasterio open with multiprocess

def get_raster_value(self, x, y):

Similar issues:
rasterio/rasterio#1731
dymaxionlabs/dask-rasterio#3

@longshuicy
Copy link
Member Author

---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Users/cwang138/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/multiprocessing/queues.py", line 264, in _feed
    obj = _ForkingPickler.dumps(obj)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/cwang138/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "<stringsource>", line 2, in rasterio._io.DatasetReaderBase.__reduce_cython__
TypeError: self._hds cannot be converted to a Python object for pickling
"""

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
Cell In[6], line 10
      8 w_bldg_dmg.set_input_hazard("hazard", hurricane)
      9 w_bldg_dmg.set_parameter("num_cpu", 1)
---> 10 w_bldg_dmg.run_analysis()

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/site-packages/pyincore/baseanalysis.py:400, in BaseAnalysis.run_analysis(self)
    397         print("Error reading parameter: " + result[1])
    398         return result
--> 400 return self.run()

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/site-packages/pyincore/analyses/buildingdamage/buildingdamage.py:87, in BuildingDamage.run(self)
     84     inventory_args.append(inventory_list[count:count + avg_bulk_input_size])
     85     count += avg_bulk_input_size
---> 87 (ds_results, damage_results) = self.building_damage_concurrent_future(self.building_damage_analysis_bulk_input,
     88                                                                       num_workers,
     89                                                                       inventory_args)
     91 self.set_result_csv_data("ds_result", ds_results, name=self.get_parameter("result_name"))
     92 self.set_result_json_data("damage_result",
     93                           damage_results,
     94                           name=self.get_parameter("result_name") + "_additional_info")

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/site-packages/pyincore/analyses/buildingdamage/buildingdamage.py:117, in BuildingDamage.building_damage_concurrent_future(self, function_name, parallelism, *args)
    115 output_dmg = []
    116 with concurrent.futures.ProcessPoolExecutor(max_workers=parallelism) as executor:
--> 117     for ret1, ret2 in executor.map(function_name, *args):
    118         output_ds.extend(ret1)
    119         output_dmg.extend(ret2)

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/concurrent/futures/process.py:642, in _chain_from_iterable_of_lists(iterable)
    636 def _chain_from_iterable_of_lists(iterable):
    637     """
    638     Specialized implementation of itertools.chain.from_iterable.
    639     Each item in *iterable* should be a list.  This function is
    640     careful not to keep references to yielded objects.
    641     """
--> 642     for element in iterable:
    643         element.reverse()
    644         while element:

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/concurrent/futures/_base.py:619, in Executor.map.<locals>.result_iterator()
    616 while fs:
    617     # Careful not to keep a reference to the popped future
    618     if timeout is None:
--> 619         yield _result_or_cancel(fs.pop())
    620     else:
    621         yield _result_or_cancel(fs.pop(), end_time - time.monotonic())

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/concurrent/futures/_base.py:317, in _result_or_cancel(***failed resolving arguments***)
    315 try:
    316     try:
--> 317         return fut.result(timeout)
    318     finally:
    319         fut.cancel()

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/concurrent/futures/_base.py:456, in Future.result(self, timeout)
    454     raise CancelledError()
    455 elif self._state == FINISHED:
--> 456     return self.__get_result()
    457 else:
    458     raise TimeoutError()

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/multiprocessing/queues.py:264, in Queue._feed(buffer, notempty, send_bytes, writelock, reader_close, writer_close, ignore_epipe, onerror, queue_sem)
    261     return
    263 # serialize the data before acquiring the lock
--> 264 obj = _ForkingPickler.dumps(obj)
    265 if wacquire is None:
    266     send_bytes(obj)

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/multiprocessing/reduction.py:51, in ForkingPickler.dumps(cls, obj, protocol)
     48 @classmethod
     49 def dumps(cls, obj, protocol=None):
     50     buf = io.BytesIO()
---> 51     cls(buf, protocol).dump(obj)
     52     return buf.getbuffer()

File <stringsource>:2, in rasterio._io.DatasetReaderBase.__reduce_cython__()

TypeError: self._hds cannot be converted to a Python object for pickling

@longshuicy longshuicy added bug Something isn't working 3storypoints Between 7-15 hours of work, requiring back and forth communications to clarify a complex problem labels Mar 6, 2024
@longshuicy
Copy link
Member Author

longshuicy commented Mar 6, 2024

# Raster files' path
# Specify the directory and file name for the new JSON file
hurricane = Hurricane.from_json_file(destination_file_path)
hurricane.hazardDatasets[0].from_file((os.path.join(raster_path, "max_wave_height.tif")),
                                      data_type="ncsa:deterministicHurricaneRaster")
hurricane.hazardDatasets[1].from_file(os.path.join(raster_path, "inundation_duration.tif"),
                                      data_type="ncsa:deterministicHurricaneRaster")
hurricane.hazardDatasets[2].from_file(os.path.join(raster_path, "max_surge_dept.tif"),
                                      data_type="ncsa:deterministicHurricaneRaster")
hurricane.hazardDatasets[3].from_file(os.path.join(raster_path, "wave_direction.tif"),
                                      data_type="ncsa:deterministicHurricaneRaster")
hurricane.hazardDatasets[4].from_file(os.path.join(raster_path, "max_wave_velocity.tif"),
                                      data_type="ncsa:deterministicHurricaneRaster")
hurricane.hazardDatasets[5].from_file(os.path.join(raster_path, "max_wind_velocity.tif"),
                                      data_type="ncsa:deterministicHurricaneRaster")

bldg_dataset_id = "63ff6b135c35c0353d5ed3ac"  # island

bldg_dmg = BuildingDamage(client)
bldg_dmg.load_remote_input_dataset("buildings", bldg_dataset_id)

# Hurricane building mapping (with equation)
mapping_id = "62fef3a6cef2881193f2261d"
fragility_service = FragilityService(client)
mapping_set = MappingSet(fragility_service.get_mapping(mapping_id))
bldg_dmg.set_input_dataset('dfr3_mapping_set', mapping_set)
bldg_dmg.set_input_hazard("hazard", hurricane)
bldg_dmg.set_parameter("result_name", "galveston_local_hurr_dmg_result")
bldg_dmg.set_parameter("num_cpu", 4)
bldg_dmg.run_analysis()# Raster files' path

this actually works... not sure what was wrong

@longshuicy
Copy link
Member Author

I found "why" it is not running... there is a problem with the function earthquake.read_hazard_values(), I assume it occurs exactly with earthquake.read_local_raster_hazard_values().
(I have not gone to check in detail why)
If you create a local hazard and avoid calling this function, then you'll be able to run your damage analysis. However, if you call the earthquake.read_hazard_values() function at any point, and after that you try to run the analysis you receive the error self._hds cannot be converted to a Python object for pickling from rasterio._io.DatasetReaderBase.reduce_cython() (I am glad the problem is not with how rasterio reads the raster or it would be more difficult to solve as you commented in the other thread!). Hope it helps for the issue #512!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3storypoints Between 7-15 hours of work, requiring back and forth communications to clarify a complex problem bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant