-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
this was an old attempt to provide API for calibration
not used at the moment removed
- Loading branch information
Showing
2 changed files
with
17 additions
and
66 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,45 +1,21 @@ | ||
import os | ||
import uuid | ||
import time | ||
import shutil | ||
from rcapi.models.models import Task # Import your data models | ||
import h5py | ||
from ..config.app_config import load_config | ||
from pyambit.nexus_parser import SpectrumParser | ||
|
||
config = load_config() | ||
async def process(task : Task,process_config : dict, nexus_dataset_url: str,base_url: str): | ||
try: | ||
process_class = process_config["class"] | ||
process_class.process(task,nexus_dataset_url,base_url) | ||
|
||
task.status = "Completed" | ||
except (ImportError, AttributeError) as e: | ||
task.status = "Error" | ||
task.error = f"Failed to load plugin or class: {e}" | ||
except Exception as e: | ||
task.status = "Error" | ||
task.error = f"{e}" | ||
task.completed=int(time.time() * 1000) | ||
config = load_config() | ||
UPLOAD_DIR = config.upload_dir | ||
os.makedirs(UPLOAD_DIR, exist_ok=True) | ||
|
||
async def process_new(task : Task,nexus_dataset_url: str,base_url: str): | ||
open_dataset(nexus_dataset_url,base_url) | ||
task.status = "Completed" | ||
task.completed=int(time.time() * 1000) | ||
|
||
|
||
def open_dataset(nexus_dataset_url: str,base_url: str): | ||
if nexus_dataset_url.startswith(base_url): | ||
uuid = nexus_dataset_url.split("/")[-1] | ||
spectrum_parser = SpectrumParser() | ||
spectrum_parser.parse(os.path.join(UPLOAD_DIR,f"{uuid}.nxs")) | ||
# Access the spectrum data | ||
for key in spectrum_parser.parsed_objects: | ||
spe = spectrum_parser.parsed_objects[key] | ||
print("Spectrum data", key, spe) | ||
#spe.plot() | ||
|
||
else: | ||
pass | ||
async def process(task: Task, process_config: dict, | ||
nexus_dataset_url: str, base_url: str): | ||
task.status = "Error" | ||
task.error = "Not implemented" | ||
task.completed = int(time.time() * 1000) | ||
|
||
|
||
async def process_new(task: Task, nexus_dataset_url: str, base_url: str): | ||
task.status = "Error" | ||
task.completed = int(time.time() * 1000) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,45 +1,20 @@ | ||
import os | ||
from rcapi.config.app_config import initialize_dirs | ||
from rcapi.models.models import Task | ||
from pyambit.nexus_parser import SpectrumParser | ||
from pyambit.nexus_spectra import peaks2nxdata | ||
|
||
#from ramanchada2.protocols.calibration import CalibrationModel | ||
|
||
config, UPLOAD_DIR, NEXUS_DIR, TEMPLATE_DIR = initialize_dirs() | ||
|
||
def open_dataset(nexus_dataset_url: str,base_url: str): | ||
print(nexus_dataset_url,base_url) | ||
if nexus_dataset_url.startswith(base_url): | ||
uuid = nexus_dataset_url.split("/")[-1] | ||
spectrum_parser = SpectrumParser() | ||
spectrum_parser.parse(os.path.join(NEXUS_DIR,f"{uuid}.nxs")) | ||
return spectrum_parser | ||
else: | ||
return None | ||
|
||
class ProcessMock: | ||
def process(task : Task,nexus_dataset_url: str,base_url: str): | ||
spectrum_parser : SpectrumParser = open_dataset(nexus_dataset_url,base_url) | ||
for key in spectrum_parser.parsed_objects: | ||
spe = spectrum_parser.parsed_objects[key] | ||
print("Spectrum data", key, spe) | ||
#spe.plot() | ||
def process(task: Task, nexus_dataset_url: str, base_url: str): | ||
pass | ||
|
||
|
||
class ProcessCalibrate: | ||
def process(task : Task,nexus_dataset_url: str,base_url: str): | ||
#calmodel = CalibrationModel(laser_wl) | ||
#calmodel.derive_model_x(spe_neon,spe_neon_units="cm-1",ref_neon=None,ref_neon_units="nm",spe_sil=None,spe_sil_units="cm-1",ref_sil=None,ref_sil_units="cm-1") | ||
def process(task: Task, nexus_dataset_url: str, base_url: str): | ||
pass | ||
|
||
|
||
class ProcessFindPeak: | ||
def process(task : Task,nexus_dataset_url: str,base_url: str): | ||
spectrum_parser : SpectrumParser = open_dataset(nexus_dataset_url,base_url) | ||
for key in spectrum_parser.parsed_objects: | ||
spe = spectrum_parser.parsed_objects[key] | ||
print("Spectrum data", key, spe) | ||
peak_candidates = spe.find_peak_multipeak(sharpening='hht', strategy='topo') | ||
fitres = spe.fit_peak_multimodel(profile='Moffat', candidates=peak_candidates, no_fit=True) | ||
print(fitres.to_dataframe_peaks()) | ||
def process(task: Task, nexus_dataset_url: str, base_url: str): | ||
pass |