Skip to content

Commit

Permalink
Major changes on ImageDeconvolution, DeconvolutionAlgorithmBase, Rich…
Browse files Browse the repository at this point in the history
…ardsonLucy

DeconvolutionAlgorithm was modified to allow using multiple observations and optimizing multiple background models.
The implemetation of the RL algorithm was modified to handle the multiple observations and background models.
DeconvolutionAlgorithm has the following abstractmethods, and its subclass should overwrite them.
- initialization
- pre_processing
- Estep
- Mstep
- post_processing
- register_result
- check_stopping_criteria
- finalization
  • Loading branch information
hiyoneda committed Jun 10, 2024
1 parent b8e2a80 commit e3c5006
Show file tree
Hide file tree
Showing 3 changed files with 481 additions and 402 deletions.
250 changes: 151 additions & 99 deletions cosipy/image_deconvolution/RichardsonLucy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import os
import copy
import numpy as np
import astropy.units as u
import astropy.io.fits as fits
from tqdm.autonotebook import tqdm
import logging
logger = logging.getLogger(__name__)

from histpy import Histogram

Expand All @@ -13,50 +17,79 @@ class RichardsonLucy(DeconvolutionAlgorithmBase):
The algorithm here is based on Knoedlseder+99, Knoedlseder+05, Siegert+20.
"""

def __init__(self, initial_model_map, data, parameter):
def __init__(self, initial_model, dataset, mask, parameter):

DeconvolutionAlgorithmBase.__init__(self, initial_model_map, data, parameter)
DeconvolutionAlgorithmBase.__init__(self, initial_model, dataset, mask, parameter)

self.do_acceleration = parameter.get('acceleration', False)

self.alpha_max = parameter.get('alpha_max', 1.0)

self.do_response_weighting = parameter.get('response_weighting', False)
if self.do_response_weighting:
self.response_weighting_index = parameter.get('response_weighting_index', 0.5)

self.do_smoothing = parameter.get('smoothing', False)
if self.do_smoothing:
self.smoothing_fwhm = parameter['smoothing_FWHM']['value'] * u.Unit(parameter['smoothing_FWHM']['unit'])
logger.info(f"Gaussian filter with FWHM of {self.smoothing_fwhm} will be applied to delta images ...")

self.do_bkg_norm_fitting = parameter.get('background_normalization_fitting', False)

if self.do_bkg_norm_fitting:
self.bkg_norm_range = parameter.get('background_normalization_range', [0.5, 1.5])
self.dict_bkg_norm_range = parameter.get('background_normalization_range', {key: [0.0, 100.0] for key in self.dict_bkg_norm.keys()})

print("... calculating the expected events with the initial model map ...")
# self.expectation = self.calc_expectation(self.initial_model_map, self.data)
self.expectation = self.data.calc_expectation(self.initial_model_map, bkg_norm = self.bkg_norm)
self.save_results = parameter.get('save_results', False)
self.save_results_directory = parameter.get('save_results_directory', './results')

if self.do_response_weighting:
print("... calculating the response weighting filter...")
if self.save_results is True:
if os.path.isdir(self.save_results_directory):
logger.warning(f"A directory {self.save_results_directory} already exists. Files in {self.save_results_directory} may be overwritten. Make sure that is not a problem.")
else:
os.makedirs(self.save_results_directory)

response_weighting_index = parameter.get('response_weighting_index', 0.5)
def initialization(self):
"""
pre-processing for each iteration
"""
# clear counter
self.iteration_count = 0

self.response_weighting_filter = data.exposure_map.contents / np.max(data.exposure_map.contents)
# clear results
self.results.clear()

self.response_weighting_filter = self.response_weighting_filter**response_weighting_index
# copy model
self.model = copy.deepcopy(self.initial_model)

if self.do_smoothing:
self.smoothing_fwhm = parameter['smoothing_FWHM'] * u.deg
print(f"... We will apply the Gaussian filter with FWHM of {self.smoothing_fwhm} to delta images ...")
# calculate exposure map
self.summed_exposure_map = self.calc_summed_exposure_map()

# mask setting
if self.mask is None and np.any(self.summed_exposure_map.contents == 0):
self.mask = Histogram(self.model.axes, contents = self.summed_exposure_map.contents > 0)
self.model = self.model.mask_pixels(self.mask)
logger.info("There are zero-exposure pixels. A mask to ignore them was set.")

# response-weighting filter
if self.do_response_weighting:
self.response_weighting_filter = (self.summed_exposure_map.contents / np.max(self.summed_exposure_map.contents))**self.response_weighting_index
logger.info("The response weighting filter was calculated.")

# expected count histograms
self.expectation_list = self.calc_expectation_list(model = self.initial_model, dict_bkg_norm = self.dict_bkg_norm)
logger.info("The expected count histograms were calculated with the initial model map.")

# calculate summed background models for M-step
if self.do_bkg_norm_fitting:
self.dict_summed_bkg_model = {}
for key in self.dict_bkg_norm.keys():
self.dict_summed_bkg_model[key] = self.calc_summed_bkg_model(key)

def pre_processing(self):
pass

def Estep(self):
"""
Notes
-----
Expect count histogram is calculated in the post processing.
"""
print("... skip E-step ...")
# Note that self.expectation_list is updated in self.post_processing().
pass

def Mstep(self):
"""
Expand All @@ -68,41 +101,28 @@ def Mstep(self):
Currenly we use a signle normalization parameter.
In the future, the normalization will be optimized for each background group defined in some file.
"""
# Currenly (2024-01-12) this method can work for both local coordinate CDS and in galactic coordinate CDS.
# This is just because in DC2 the rotate response for galactic coordinate CDS does not have an axis for time/scatt binning.
# However it is likely that it will have such an axis in the future in order to consider background variability depending on time and pointign direction etc.
# Then, the implementation here will not work. Thus, keep in mind that we need to modify it once the response format is fixed.

diff = self.data.event_dense / self.expectation - 1
ratio_list = [ data.event / expectation for data, expectation in zip(self.dataset, self.expectation_list) ]

# # part1
# delta_map_part1 = self.model_map / self.data.exposure_map
#
# # part2
# delta_map_part2 = Histogram(self.model_map.axes, unit = self.data.exposure_map.unit)
#
# diff_x_response = np.tensordot(diff.contents, self.data.image_response_dense.contents, axes = ([1,2,3], [2,3,4]))
# # [Time/ScAtt, Em, Phi, PsiChi] x [NuLambda, Ei, Em, Phi, PsiChi] -> [Time/ScAtt, NuLambda, Ei]
#
# delta_map_part2[:] = np.tensordot(self.data.coordsys_conv_matrix.contents, diff_x_response, axes = ([0,2], [0,1])) \
# * diff_x_response.unit * self.data.coordsys_conv_matrix.unit
# # [Time/ScAtt, lb, NuLambda] x [Time/ScAtt, NuLambda, Ei] -> [lb, Ei]
# # note that coordsys_conv_matrix is the sparse, so the unit should be recovered.
# delta model
sum_T_product = self.calc_summed_T_product(ratio_list)
self.delta_model = self.model * (sum_T_product/self.summed_exposure_map - 1)

# delta map
self.delta_map = (self.model_map / self.data.exposure_map) * self.data.calc_T_product(diff)
# background normalization optimization
if self.do_bkg_norm_fitting:
for key in self.dict_bkg_norm.keys():

# mask for zero-exposure pixels
if self.data.mask is not None:
self.delta_map.mask_pixels(self.data.mask, 0)
sum_bkg_T_product = self.calc_summed_bkg_model_product(key, ratio_list)
sum_bkg_model = self.dict_summed_bkg_model[key]
bkg_norm = self.dict_bkg_norm[key] * (sum_bkg_T_product / sum_bkg_model)

if self.do_bkg_norm_fitting:
self.bkg_norm += self.bkg_norm * np.sum(diff * self.data.bkg_dense) / np.sum(self.data.bkg_dense)
bkg_range = self.dict_bkg_norm_range[key]
if bkg_norm < bkg_range[0]:
bkg_norm = bkg_range[0]
elif bkg_norm > bkg_range[1]:
bkg_norm = bkg_range[1]

if self.bkg_norm < self.bkg_norm_range[0]:
self.bkg_norm = self.bkg_norm_range[0]
elif self.bkg_norm > self.bkg_norm_range[1]:
self.bkg_norm = self.bkg_norm_range[1]
self.dict_bkg_norm[key] = bkg_norm

def post_processing(self):
"""
Expand All @@ -112,79 +132,111 @@ def post_processing(self):
- acceleration of RL algirithm: the normalization of delta map is increased as long as the updated image has no non-negative components.
"""

self.processed_delta_model = copy.deepcopy(self.delta_model)

if self.do_response_weighting:
self.delta_map[:,:] *= self.response_weighting_filter
self.processed_delta_model[:] *= self.response_weighting_filter

if self.do_smoothing:
self.delta_map = self.delta_map.smoothing(fwhm = self.smoothing_fwhm)
self.processed_delta_model = self.processed_delta_model.smoothing(fwhm = self.smoothing_fwhm)

if self.do_acceleration:
self.alpha = self.calc_alpha(self.delta_map, self.model_map)
self.alpha = self.calc_alpha(self.processed_delta_model, self.model)
else:
self.alpha = 1.0

model_map_new = self.model_map + self.delta_map * self.alpha
model_map_new[:] = np.where(model_map_new.contents < self.minimum_flux * model_map_new.unit,
self.minimum_flux * model_map_new.unit, model_map_new.contents)
self.model = self.model + self.processed_delta_model * self.alpha
self.model[:] = np.where(self.model.contents < self.minimum_flux, self.minimum_flux, self.model.contents)

self.processed_delta_map = model_map_new - self.model_map
self.model_map = model_map_new
if self.mask is not None:
self.model = self.model.mask_pixels(self.mask)

# update expectation_list
self.expectation_list = self.calc_expectation_list(self.model, dict_bkg_norm = self.dict_bkg_norm)
logger.debug("The expected count histograms were updated with the new model map.")

print("... calculating the expected events with the updated model map ...")
#self.expectation = self.calc_expectation(self.model_map, self.data)
self.expectation = self.data.calc_expectation(self.model_map, bkg_norm = self.bkg_norm)
# update loglikelihood_list
self.loglikelihood_list = self.calc_loglikelihood_list(self.expectation_list)
logger.debug("The loglikelihood list was updated with the new expected count histograms.")

def check_stopping_criteria(self, i_iteration):
def register_result(self):
"""
If i_iteration is smaller than iteration_max, the iterative process will continue.
The values below are stored at the end of each iteration.
- iteration: iteration number
- model: updated image
- delta_model: delta map after M-step
- processed_delta_model: delta map after post-processing
- alpha: acceleration parameter in RL algirithm
- background_normalization: optimized background normalization
- loglikelihood: log-likelihood
"""

this_result = {"iteration": self.iteration_count,
"model": copy.deepcopy(self.model),
"delta_model": copy.deepcopy(self.delta_model),
"processed_delta_model": copy.deepcopy(self.processed_delta_model),
"background_normalization": copy.deepcopy(self.dict_bkg_norm),
"alpha": self.alpha,
"loglikelihood": copy.deepcopy(self.loglikelihood_list)}

# show intermediate results
logger.info(f' alpha: {this_result["alpha"]}')
logger.info(f' background_normalization: {this_result["background_normalization"]}')
logger.info(f' loglikelihood: {this_result["loglikelihood"]}')

# register this_result in self.results
self.results.append(this_result)

def check_stopping_criteria(self):
"""
If iteration_count is smaller than iteration_max, the iterative process will continue.
Returns
-------
bool
"""
if i_iteration < self.iteration_max:
if self.iteration_count < self.iteration_max:
return False
return True

def register_result(self, i_iteration):
def finalization(self):
"""
The values below are stored at the end of each iteration.
- iteration: iteration number
- model_map: updated image
- delta_map: delta map after M-step
- processed_delta_map: delta map after post-processing
- alpha: acceleration parameter in RL algirithm
- background_normalization: optimized background normalization
- loglikelihood: log-likelihood
finalization after running the image deconvolution
"""
loglikelihood = self.calc_loglikelihood(self.data, self.model_map, self.expectation)
if self.save_results == True:
logger.info('Saving results in {self.save_results_directory}')

this_result = {"iteration": i_iteration,
"model_map": copy.deepcopy(self.model_map),
"delta_map": copy.deepcopy(self.delta_map),
"processed_delta_map": copy.copy(self.processed_delta_map),
"alpha": self.alpha,
"background_normalization": self.bkg_norm,
"loglikelihood": loglikelihood}
# model
for this_result in self.results:
iteration_count = this_result["iteration"]

this_result["model"].write(f"{self.save_results_directory}/model_itr{iteration_count}.hdf5", overwrite = True)
this_result["delta_model"].write(f"{self.save_results_directory}/delta_model_itr{iteration_count}.hdf5", overwrite = True)
this_result["processed_delta_model"].write(f"{self.save_results_directory}/processed_delta_model_itr{iteration_count}.hdf5", overwrite = True)

#fits
primary_hdu = fits.PrimaryHDU()

col_iteration = fits.Column(name='iteration', array=[float(result['iteration']) for result in self.results], format='K')
col_alpha = fits.Column(name='alpha', array=[float(result['alpha']) for result in self.results], format='D')
cols_bkg_norm = [fits.Column(name=key, array=[float(result['background_normalization'][key]) for result in self.results], format='D')
for key in self.dict_bkg_norm.keys()]
cols_loglikelihood = [fits.Column(name=f"{self.dataset[i].name}", array=[float(result['loglikelihood'][i]) for result in self.results], format='D')
for i in range(len(self.dataset))]

self.result = this_result
table_alpha = fits.BinTableHDU.from_columns([col_iteration, col_alpha])
table_alpha.name = "alpha"

def save_result(self, i_iteration):
self.result["model_map"].write(f"model_map_itr{i_iteration}.hdf5", overwrite = True)
self.result["delta_map"].write(f"delta_map_itr{i_iteration}.hdf5", overwrite = True)
self.result["processed_delta_map"].write(f"processed_delta_map_itr{i_iteration}.hdf5", overwrite = True)
table_bkg_norm = fits.BinTableHDU.from_columns([col_iteration] + cols_bkg_norm)
table_bkg_norm.name = "bkg_norm"

with open(f"result_itr{i_iteration}.dat", "w") as f:
f.write(f'alpha: {self.result["alpha"]}\n')
f.write(f'loglikelihood: {self.result["loglikelihood"]}\n')
f.write(f'background_normalization: {self.result["background_normalization"]}\n')
table_loglikelihood = fits.BinTableHDU.from_columns([col_iteration] + cols_loglikelihood)
table_loglikelihood.name = "loglikelihood"

def show_result(self, i_iteration):
print(f' alpha: {self.result["alpha"]}')
print(f' loglikelihood: {self.result["loglikelihood"]}')
print(f' background_normalization: {self.result["background_normalization"]}')
hdul = fits.HDUList([primary_hdu, table_alpha, table_bkg_norm, table_loglikelihood])
hdul.writeto(f'{self.save_results_directory}/results.fits', overwrite=True)

def calc_alpha(self, delta, model_map):
def calc_alpha(self, delta_model, model):
"""
Calculate the acceleration parameter in RL algorithm.
Expand All @@ -193,12 +245,12 @@ def calc_alpha(self, delta, model_map):
float
Acceleration parameter
"""
diff = -1 * (model_map / delta).contents
diff = -1 * (model / delta_model).contents

diff[(diff <= 0) | (delta.contents == 0)] = np.inf
diff[(diff <= 0) | (delta_model.contents == 0)] = np.inf

if self.data.mask is not None:
diff[np.invert(self.data.mask.contents)] = np.inf
if self.mask is not None:
diff[np.invert(self.mask.contents)] = np.inf

alpha = min(np.min(diff), self.alpha_max)

Expand Down
Loading

0 comments on commit e3c5006

Please sign in to comment.