Skip to content

Commit

Permalink
add simulations to vcml read/write and vcml driven simulations
Browse files Browse the repository at this point in the history
  • Loading branch information
jcschaff committed Feb 25, 2025
1 parent 7c28ab5 commit d480175
Show file tree
Hide file tree
Showing 14 changed files with 684 additions and 210 deletions.
62 changes: 61 additions & 1 deletion pyvcell/data_model/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pyvcell.api.vcell_client import ApiClient, ApiResponse, Configuration, SolverResourceApi
from pyvcell.data_model.result import Result
from pyvcell.data_model.spatial_model import SpatialModel
from pyvcell.data_model.vcml_spatial_model import VcmlSpatialModel
from pyvcell.solvers.fvsolver import solve as fvsolve


Expand Down Expand Up @@ -40,7 +41,9 @@ def run(self, duration: float | None = None, output_time_step: float | None = No
# create temp file to write sbml document to
sbml_path = self.out_dir / "model.xml"
self.model.export(sbml_path)
response: ApiResponse[bytearray] = solver_api.get_fv_solver_input_from_sbml_with_http_info(str(sbml_path))
response: ApiResponse[bytearray] = solver_api.get_fv_solver_input_from_sbml_with_http_info(
str(sbml_path), duration=duration, output_time_step=output_time_step
)
sbml_path.unlink()
if response.status_code != 200:
raise ValueError(f"Failed to get solver input files: {response}")
Expand Down Expand Up @@ -69,3 +72,60 @@ def run(self, duration: float | None = None, output_time_step: float | None = No

def cleanup(self) -> None:
shutil.rmtree(self.out_dir)


class VcmlSpatialSimulation:
model: VcmlSpatialModel
out_dir: Path

def __init__(self, model: VcmlSpatialModel, out_dir: Path | None = None):
self.model = model
if out_dir is None:
self.out_dir = Path(tempfile.mkdtemp(prefix="out_dir_"))
else:
self.out_dir = out_dir

def run(self, simulation_name: str) -> Result:
# create an unauthenticated API client
api_url: str = "https://vcell-dev.cam.uchc.edu" # vcell base url
api_client = ApiClient(Configuration(host=api_url))
solver_api = SolverResourceApi(api_client)

# prepare solver input files
# 1. upload the SBML model and retrieve generated solver inputs as a zip file
# 2. extract the zip archive into the output directory
# 3. remove the zip archive
# create temp file to write sbml document to
vcml_path = self.out_dir / "model.xml"
self.model.export(vcml_path)
response: ApiResponse[bytearray] = solver_api.get_fv_solver_input_from_vcml_with_http_info(
vcml_file=str(vcml_path), simulation_name=simulation_name
)
vcml_path.unlink()
if response.status_code != 200:
raise ValueError(f"Failed to get solver input files: {response}")
zip_archive = self.out_dir / "solver_input_files.zip"
with open(zip_archive, "wb") as f:
f.write(response.data)
shutil.unpack_archive(zip_archive, self.out_dir)
zip_archive.unlink()

# identify sim_id and job_id from the solver input files
files: list[str] = os.listdir(self.out_dir)
fv_input_file: Path | None = next((self.out_dir / file for file in files if file.endswith(".fvinput")), None)
vcg_input_file: Path | None = next((self.out_dir / file for file in files if file.endswith(".vcg")), None)
if fv_input_file is None or vcg_input_file is None:
raise ValueError(".fvinput file or .vcg file not found")
sim_id = int(fv_input_file.name.split("_")[1])
job_id = int(fv_input_file.name.split("_")[2])

# run the simulation
ret_code = fvsolve(input_file=fv_input_file, vcg_file=vcg_input_file, output_dir=self.out_dir)
if ret_code != 0:
raise ValueError(f"Error in solve: {ret_code}")

# return the result
return Result(solver_output_dir=self.out_dir, sim_id=sim_id, job_id=job_id)

def cleanup(self) -> None:
shutil.rmtree(self.out_dir)
64 changes: 64 additions & 0 deletions pyvcell/data_model/vcml_spatial_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
from pathlib import Path
from typing import Any, Union

import pyvcell.vcml as vc


class VcmlSpatialModel:
"""
Spatial extension of `libsbml.Model`. All class methods are inherited from `libsbml.Model`: see libsbml documentation for more details.
This class is constructed with one of 3 entrypoints: either the filepath to a valid SBMLSpatial model, OR level, version, model_id, OR model_id
"""

_bio_model: vc.Biomodel

def __init__(self, filepath: Path) -> None:
reader: vc.VcmlReader = vc.VcmlReader()
# read filepath as string
with open(filepath, "r") as file:
self._bio_model = reader.parse_biomodel(file.read())

@property
def bio_model(self) -> vc.Biomodel:
return self._bio_model

@property
def model(self) -> vc.Model:
if self._bio_model.model is None:
raise ValueError("Model is not defined in the VCML document.")
return self._bio_model.model

def export(self, filename: Union[os.PathLike[str], str]) -> None:
document = vc.VCMLDocument(biomodel=self._bio_model)
vcml_str: str = vc.VcmlWriter().write_vcml(document=document)
with open(filename, "w") as file:
file.write(vcml_str)

@property
def model_parameters(self) -> list[vc.ModelParameter]:
if self._bio_model.model is None:
raise ValueError("Model is not defined in the VCML document.")
return self._bio_model.model.model_parameters

@property
def kinetics_parameters(self) -> list[vc.KineticsParameter]:
kinetics_parameters: list[vc.KineticsParameter] = []
for reaction in self.model.reactions:
if reaction.kinetics is None:
continue
for param in reaction.kinetics.kinetics_parameters:
param.reaction_name = reaction.name
kinetics_parameters.append(param)
return kinetics_parameters

@property
def applications(self) -> list[vc.Application]:
return self.bio_model.applications

@property
def simulations(self) -> list[vc.Simulation]:
sims: list[vc.Simulation] = []
for app in self.applications:
sims.extend(app.simulations)
return sims
3 changes: 3 additions & 0 deletions pyvcell/vcml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Model,
ModelParameter,
Reaction,
Simulation,
Species,
SpeciesMapping,
SpeciesReference,
Expand Down Expand Up @@ -40,4 +41,6 @@
"SurfaceClass",
"SpeciesMapping",
"BoundaryType",
"Application",
"Simulation",
]
10 changes: 9 additions & 1 deletion pyvcell/vcml/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ModelParameter(Parameter):


class KineticsParameter(Parameter):
pass
reaction_name: str


class Kinetics(VcmlNode):
Expand Down Expand Up @@ -145,13 +145,21 @@ class ReactionMapping(VcmlNode):
included: bool = True


class Simulation(VcmlNode):
name: str
duration: float
output_time_step: float
mesh_size: tuple[int, int, int]


class Application(VcmlNode):
name: str
stochastic: bool
geometry: Geometry
compartment_mappings: list[CompartmentMapping] = Field(default_factory=list)
species_mappings: list[SpeciesMapping] = Field(default_factory=list)
reaction_mappings: list[ReactionMapping] = Field(default_factory=list)
simulations: list[Simulation] = Field(default_factory=list)


class Biomodel(VcmlNode):
Expand Down
38 changes: 36 additions & 2 deletions pyvcell/vcml/vcml_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ def strip_namespace(tag: str) -> str:

class VcmlReader:
@staticmethod
def parse_biomodel(xml_string: str) -> vc.Biomodel | None:
def parse_biomodel(xml_string: str) -> vc.Biomodel:
xml_string = xml_string.replace('<?xml version="1.0" encoding="UTF-8"?>', "")
xml_string = xml_string.replace("<?xml version='1.0' encoding='UTF-8'?>", "")
root = etree.fromstring(xml_string)
document = vc.VCMLDocument()
visitor = BiomodelVisitor(document)
visitor.visit(root, document)
if visitor.document.biomodel is None:
raise ValueError("No biomodel found")
return visitor.document.biomodel

@classmethod
Expand Down Expand Up @@ -135,7 +137,13 @@ def visit_Parameter(self, element: _Element, node: vc.Model | vc.Kinetics) -> No
parameter = model_parameter
elif strip_namespace(parent.tag) == "Kinetics":
kinetics: vc.Kinetics = node # type: ignore[assignment]
kinetics_parameter = vc.KineticsParameter(name=name, value=value, role=role, unit=unit)
reaction_node = parent.getparent()
if reaction_node is None:
raise ValueError("Kinetics element has no parent")
reaction_name = reaction_node.get("Name", default="unknown")
kinetics_parameter = vc.KineticsParameter(
name=name, value=value, role=role, unit=unit, reaction_name=reaction_name
)
kinetics.kinetics_parameters.append(kinetics_parameter)
parameter = kinetics_parameter
else:
Expand All @@ -150,6 +158,32 @@ def visit_SimulationSpec(self, element: _Element, node: vc.Biomodel) -> None:
node.applications.append(application)
self.generic_visit(element, application)

def visit_Simulation(self, element: _Element, node: vc.Application) -> None:
name: str = element.get("Name", default="unnamed")
duration: float | None = None
output_time_step: float | None = None
mesh_size: tuple[int, int, int] | None = None
for sim_child in element:
if strip_namespace(sim_child.tag) == "SolverTaskDescription":
solver_task_description_element = sim_child
for child in solver_task_description_element:
if strip_namespace(child.tag) == "TimeBound":
duration = float(child.get("EndTime", default="5.0"))
elif strip_namespace(child.tag) == "OutputOptions":
output_time_step = float(child.get("OutputTimeStep", default="0.1"))
elif strip_namespace(sim_child.tag) == "MeshSpecification":
mesh_specification_element = sim_child
for mesh_child in mesh_specification_element:
if strip_namespace(mesh_child.tag) == "Size":
mesh_x = int(mesh_child.get("X", default="1"))
mesh_y = int(mesh_child.get("Y", default="1"))
mesh_z = int(mesh_child.get("Z", default="1"))
mesh_size = (mesh_x, mesh_y, mesh_z)
if duration is None or output_time_step is None or mesh_size is None:
raise ValueError("Simulation element is missing required child elements")
simulation = vc.Simulation(name=name, duration=duration, output_time_step=output_time_step, mesh_size=mesh_size)
node.simulations.append(simulation)

def visit_Geometry(self, element: _Element, node: vc.Application) -> None:
name: str = element.get("Name", default="unnamed")
dim = int(element.get("Dimension", default="0"))
Expand Down
63 changes: 60 additions & 3 deletions pyvcell/vcml/vcml_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,19 @@ def write_model(self, model: vc.Model, parent: _Element) -> None:
parameter_element = Element("Parameter", Name=parameter.name, Role=parameter.role, Unit=parameter.unit)
parameter_element.text = str(parameter.value)
model_parameters_element.append(parameter_element)
for species in model.species:
species_type_element = Element("Compound", Name=species.name)
annotation_element = Element("Annotation")
annotation_element.text = species.name
species_type_element.append(annotation_element)
parent.append(species_type_element)
for compartment in model.compartments:
compartment_element = Element("Feature" if compartment.dim == 3 else "Membrane", Name=compartment.name)
parent.append(compartment_element)
for species in model.species:
species_element = Element("LocalizedCompound", Name=species.name, Structure=species.structure_name)
species_element = Element(
"LocalizedCompound", Name=species.name, CompoundRef=species.name, Structure=species.structure_name
)
parent.append(species_element)
for reaction in model.reactions:
reaction_element = Element("SimpleReaction", Structure=reaction.compartment_name, Name=reaction.name)
Expand Down Expand Up @@ -73,6 +81,8 @@ def write_application(self, application: vc.Application, parent: _Element) -> No
geometry_element = Element("Geometry", Name=application.geometry.name, Dimension=str(application.geometry.dim))
parent.append(geometry_element)
self.write_geometry(application.geometry, geometry_element)

# ---- geometry context -----
geometry_context_element = Element("GeometryContext")
parent.append(geometry_context_element)
for compartment_mapping in application.compartment_mappings:
Expand All @@ -94,17 +104,64 @@ def write_application(self, application: vc.Application, parent: _Element) -> No
)
mapping_element.append(boundaries_types_element)
geometry_context_element.append(mapping_element)

# ---- reaction context -----
reaction_context_element = Element("ReactionContext")
parent.append(reaction_context_element)
for species_mapping in application.species_mappings:
mapping_element = Element("LocalizedCompoundSpec", LocalizedCompoundRef=species_mapping.species_name)
parent.append(mapping_element)
reaction_context_element.append(mapping_element)
self.write_species_mapping(species_mapping, mapping_element)
for reaction_mapping in application.reaction_mappings:
mapping_element = Element(
"ReactionSpec",
ReactionStepRef=reaction_mapping.reaction_name,
ReactionMapping="included" if reaction_mapping.included else "excluded",
)
parent.append(mapping_element)
reaction_context_element.append(mapping_element)

# ---- mathDescription ---- (skip this for now)
math_description_element = Element("MathDescription", Name="dummy_math_description")
parent.append(math_description_element)

# ---- simulations -----
for simulation in application.simulations:
simulation_element = Element("Simulation", Name=simulation.name)
parent.append(simulation_element)
solver_task_description_element = Element(
"SolverTaskDescription",
TaskType="Unsteady",
UseSymbolicJacobian="false",
Solver="Sundials Stiff PDE Solver (Variable Time Step)",
)
simulation_element.append(solver_task_description_element)
solver_task_description_element.append(
Element("TimeBound", StartTime="0.0", EndTime=str(simulation.duration))
)
solver_task_description_element.append(
Element("TimeStep", DefaultTime="0.05", MinTime="0.0", MaxTime="0.1")
)
solver_task_description_element.append(Element("ErrorTolerance", Absolut="1.0E-9", Relative="1.0E-7"))
solver_task_description_element.append(
Element("OutputOptions", OutputTimeStep=str(simulation.output_time_step))
)

sundials_solver_options_element = Element("SundialsSolverOptions")
max_order_advection_element = Element("maxOrderAdvection")
max_order_advection_element.text = "2"
sundials_solver_options_element.append(max_order_advection_element)
solver_task_description_element.append(sundials_solver_options_element)
number_processors_element = Element("NumberProcessors")
number_processors_element.text = "1"
solver_task_description_element.append(number_processors_element)
simulation_element.append(Element("MathOverrides"))

mesh_specification_element = Element("MeshSpecification")
size_element = Element(
"Size", X=str(simulation.mesh_size[0]), Y=str(simulation.mesh_size[1]), Z=str(simulation.mesh_size[2])
)
mesh_specification_element.append(size_element)
simulation_element.append(mesh_specification_element)

def write_geometry(self, geometry: vc.Geometry, parent: _Element) -> None:
extent_element = Element(
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from tests.fixtures.data_fixtures import solver_output_path, solver_output_simid_jobid, zarr_path # noqa: F401
from tests.fixtures.model_fixtures import sbml_spatial_model_1d_path, sbml_spatial_model_3d_path # noqa: F401
from tests.fixtures.vcell_model_fixtures import vcml_spatial_model_1d_path # noqa: F401
from tests.fixtures.vcell_model_fixtures import vcml_spatial_model_1d_path, vcml_spatial_small_3d_path # noqa: F401
Loading

0 comments on commit d480175

Please sign in to comment.