Skip to content

Commit

Permalink
Merge pull request #12580 from KratosMultiphysics/optapp/responses/ar…
Browse files Browse the repository at this point in the history
…ithmetic_operators

[OptApp] Adding the ability to have arithmetic operators for responses
  • Loading branch information
sunethwarna authored Aug 5, 2024
2 parents 60fea14 + fee23b9 commit 2b11fc6
Show file tree
Hide file tree
Showing 9 changed files with 1,041 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from typing import Optional
from math import log

import KratosMultiphysics as Kratos
import KratosMultiphysics.OptimizationApplication as KratosOA
from KratosMultiphysics.OptimizationApplication.responses.response_function import ResponseFunction
from KratosMultiphysics.OptimizationApplication.responses.response_function import SupportedSensitivityFieldVariableTypes
from KratosMultiphysics.OptimizationApplication.utilities.union_utilities import SupportedSensitivityFieldVariableTypes
from KratosMultiphysics.OptimizationApplication.utilities.model_part_utilities import ModelPartOperation
from KratosMultiphysics.OptimizationApplication.utilities.optimization_problem import OptimizationProblem
from KratosMultiphysics.OptimizationApplication.utilities.component_data_view import ComponentDataView
from KratosMultiphysics.OptimizationApplication.utilities.response_utilities import EvaluateValue
from KratosMultiphysics.OptimizationApplication.utilities.response_utilities import EvaluateGradient
from KratosMultiphysics.OptimizationApplication.utilities.response_utilities import BinaryOperator

class BinaryOperatorResponseFunction(ResponseFunction):
def __init__(self, response_function_1: ResponseFunction, response_function_2: ResponseFunction, binary_operator: BinaryOperator, optimization_problem: OptimizationProblem):
if binary_operator == BinaryOperator.DIVIDE:
# this is because, the optimization_problem data container uses "/" as a path separator.
super().__init__(f"({response_function_1.GetName()}÷{response_function_2.GetName()})")
else:
super().__init__(f"({response_function_1.GetName()}{binary_operator.value}{response_function_2.GetName()})")

self.optimization_problem = optimization_problem
self.response_function_1 = response_function_1
self.response_function_2 = response_function_2
self.binary_operator = binary_operator
self.model_part: Optional[Kratos.ModelPart] = None

def GetImplementedPhysicalKratosVariables(self) -> 'list[SupportedSensitivityFieldVariableTypes]':
vars_list = self.response_function_1.GetImplementedPhysicalKratosVariables()
vars_list.extend(self.response_function_2.GetImplementedPhysicalKratosVariables())
return vars_list

def Initialize(self) -> None:
self.response_function_1.Initialize()
self.response_function_2.Initialize()

if len(self.response_function_1.GetImplementedPhysicalKratosVariables()) != 0 and len(self.response_function_2.GetImplementedPhysicalKratosVariables()) != 0:
self.model_part = ModelPartOperation(self.response_function_1.GetInfluencingModelPart().GetModel(), ModelPartOperation.OperationType.UNION, f"response_{self.GetName()}", [self.response_function_1.GetInfluencingModelPart().FullName(), self.response_function_2.GetInfluencingModelPart().FullName()], False).GetModelPart()
elif len(self.response_function_1.GetImplementedPhysicalKratosVariables()) != 0:
self.model_part = self.response_function_1.GetInfluencingModelPart()
elif len(self.response_function_2.GetImplementedPhysicalKratosVariables()) != 0:
self.model_part = self.response_function_2.GetInfluencingModelPart()

def Check(self) -> None:
self.response_function_1.Check()
self.response_function_2.Check()

def Finalize(self) -> None:
self.response_function_1.Finalize()
self.response_function_2.Finalize()

def GetInfluencingModelPart(self) -> Kratos.ModelPart:
return self.model_part

def CalculateValue(self) -> float:
v1 = EvaluateValue(self.response_function_1, self.optimization_problem)
v2 = EvaluateValue(self.response_function_2, self.optimization_problem)

# now do the binary arithmetics.
if self.binary_operator == BinaryOperator.ADD:
return v1 + v2
elif self.binary_operator == BinaryOperator.SUBTRACT:
return v1 - v2
elif self.binary_operator == BinaryOperator.MULTIPLY:
return v1 * v2
elif self.binary_operator == BinaryOperator.DIVIDE:
return v1 / v2
elif self.binary_operator == BinaryOperator.POWER:
return v1 ** v2

def CalculateGradient(self, physical_variable_collective_expressions: 'dict[SupportedSensitivityFieldVariableTypes, KratosOA.CollectiveExpression]') -> None:
v1 = EvaluateValue(self.response_function_1, self.optimization_problem)
v2 = EvaluateValue(self.response_function_2, self.optimization_problem)
resp_1_physical_variable_collective_expressions = EvaluateGradient(self.response_function_1, physical_variable_collective_expressions, self.optimization_problem)
resp_2_physical_variable_collective_expressions = EvaluateGradient(self.response_function_2, physical_variable_collective_expressions, self.optimization_problem)

for variable, collective_expression in physical_variable_collective_expressions.items():
for result, g1, g2 in zip(collective_expression.GetContainerExpressions(), resp_1_physical_variable_collective_expressions[variable].GetContainerExpressions(), resp_2_physical_variable_collective_expressions[variable].GetContainerExpressions()):
if self.binary_operator == BinaryOperator.ADD:
result.SetExpression((g1 + g2).GetExpression())
elif self.binary_operator == BinaryOperator.SUBTRACT:
result.SetExpression((g1 - g2).GetExpression())
elif self.binary_operator == BinaryOperator.MULTIPLY:
result.SetExpression((g1 * v2 + g2 * v1).GetExpression())
elif self.binary_operator == BinaryOperator.DIVIDE:
result.SetExpression((g1 / v2 - g2 * (v1 / v2 ** 2)).GetExpression())
elif self.binary_operator == BinaryOperator.POWER:
result.SetExpression(((g1 * (v2 / v1) + g2 * log(v1)) * (v1 ** v2)).GetExpression())

def GetChildResponses(self) -> 'list[ResponseFunction]':
return [self.response_function_1, self.response_function_2]

def __str__(self) -> str:
if self.model_part is not None:
return f"Response [type = {self.__class__.__name__}, name = {self.GetName()}, model part name = {self.model_part.FullName()}]"
else:
return f"Response [type = {self.__class__.__name__}, name = {self.GetName()}, model part name = n/a ]"
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,4 @@ def CalculateGradient(self, physical_variable_collective_expressions: 'dict[Supp
exp.SetExpression(exp.GetExpression() + (((values - value_i) ** (-2)) * (values - value_i) * 2.0).GetExpression())
exp.SetExpression(Kratos.Expression.Utils.Collapse(exp).GetExpression())
else:
raise RuntimeError(f"Unsupported sensitivity w.r.t. {physical_variable.Name()} requested. Followings are supported sensitivity variables:\n\tSHAPE")
raise RuntimeError(f"Unsupported sensitivity w.r.t. {physical_variable.Name()} requested. Followings are supported sensitivity variables:\n\t{self.variable.Name()}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import KratosMultiphysics as Kratos
import KratosMultiphysics.OptimizationApplication as KratosOA
from KratosMultiphysics.OptimizationApplication.responses.response_function import ResponseFunction
from KratosMultiphysics.OptimizationApplication.responses.response_function import SupportedSensitivityFieldVariableTypes
from KratosMultiphysics.OptimizationApplication.utilities.union_utilities import SupportedSensitivityFieldVariableTypes
from KratosMultiphysics.OptimizationApplication.utilities.component_data_view import ComponentDataView
from KratosMultiphysics.OptimizationApplication.utilities.optimization_problem import OptimizationProblem
from KratosMultiphysics.OptimizationApplication.utilities.buffered_dict import BufferedDict

class EvaluationResponseFunction(ResponseFunction):
def __init__(self, response_function: ResponseFunction, optimization_problem: OptimizationProblem):
super().__init__(f"Eval_{response_function.GetName()}")
self.response_function = response_function
self.optimization_problem = optimization_problem

def GetImplementedPhysicalKratosVariables(self) -> 'list[SupportedSensitivityFieldVariableTypes]':
return self.response_function.GetImplementedPhysicalKratosVariables()

def Initialize(self) -> None:
self.response_function.Initialize()

def Check(self) -> None:
self.response_function.Check()

def Finalize(self) -> None:
self.response_function.Finalize()

def GetInfluencingModelPart(self) -> Kratos.ModelPart:
return self.response_function.GetInfluencingModelPart()

def CalculateValue(self) -> float:
# this response is used at the top most level of the evaluated chained responses.
# so this creates a new data container under the optimization problem to avoid
# having to compute the same response value twice.

unbuffered_data = ComponentDataView("evaluated_responses", self.optimization_problem).GetUnBufferedData()

# reset data of the evaluation
self.__ResetEvaluationData(self, unbuffered_data, "values")

# now calculate
return self.response_function.CalculateValue()

def CalculateGradient(self, physical_variable_collective_expressions: 'dict[SupportedSensitivityFieldVariableTypes, KratosOA.CollectiveExpression]') -> None:
# this response is used at the top most level of the evaluated chained responses.
# so this creates a new data container under the optimization problem to avoid
# having to compute the same response gradient twice.

unbuffered_data = ComponentDataView("evaluated_responses", self.optimization_problem).GetUnBufferedData()

# reset data of the evaluation
self.__ResetEvaluationData(self, unbuffered_data, "gradients")

return self.response_function.CalculateGradient(physical_variable_collective_expressions)

def GetChildResponses(self) -> 'list[ResponseFunction]':
return [self.response_function]

def __str__(self) -> str:
return f"Response [type = {self.__class__.__name__}, name = {self.GetName()}]"

@staticmethod
def __ResetEvaluationData(response_function: ResponseFunction, unbuffered_data: BufferedDict, prefix: str) -> None:
if unbuffered_data.HasValue(f"{prefix}/{response_function.GetName()}"):
del unbuffered_data[f"{prefix}/{response_function.GetName()}"]
for child_response in response_function.GetChildResponses():
# now reset the children
EvaluationResponseFunction.__ResetEvaluationData(child_response, unbuffered_data, prefix)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import KratosMultiphysics as Kratos
import KratosMultiphysics.OptimizationApplication as KratosOA
from KratosMultiphysics.OptimizationApplication.responses.response_function import ResponseFunction
from KratosMultiphysics.OptimizationApplication.responses.response_function import SupportedSensitivityFieldVariableTypes
from KratosMultiphysics.OptimizationApplication.utilities.union_utilities import SupportedSensitivityFieldVariableTypes

class LiteralValueResponseFunction(ResponseFunction):
def __init__(self, value: float):
super().__init__(str(value))

self.value = value

def GetImplementedPhysicalKratosVariables(self) -> 'list[SupportedSensitivityFieldVariableTypes]':
return []

def Initialize(self) -> None:
pass

def Check(self) -> None:
pass

def Finalize(self) -> None:
pass

def GetInfluencingModelPart(self) -> Kratos.ModelPart:
raise RuntimeError(f"The literal value response function does not have an influencing model part.")

def CalculateValue(self) -> float:
return self.value

def CalculateGradient(self, _: 'dict[SupportedSensitivityFieldVariableTypes, KratosOA.CollectiveExpression]') -> None:
raise RuntimeError(f"The literal value response function does not depend on any variable, hence no gradients.")

def __str__(self) -> str:
return f"Response [type = {self.__class__.__name__}, name = {self.GetName()}]"
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from math import log
import KratosMultiphysics as Kratos
import KratosMultiphysics.OptimizationApplication as KratosOA
from KratosMultiphysics.OptimizationApplication.responses.response_function import ResponseFunction
from KratosMultiphysics.OptimizationApplication.responses.response_function import SupportedSensitivityFieldVariableTypes
from KratosMultiphysics.OptimizationApplication.utilities.union_utilities import SupportedSensitivityFieldVariableTypes
from KratosMultiphysics.OptimizationApplication.utilities.optimization_problem import OptimizationProblem
from KratosMultiphysics.OptimizationApplication.utilities.response_utilities import EvaluateValue
from KratosMultiphysics.OptimizationApplication.utilities.response_utilities import EvaluateGradient

class LogResponseFunction(ResponseFunction):
def __init__(self, response_function: ResponseFunction, optimization_problem: OptimizationProblem):
super().__init__(f"log({response_function.GetName()})")
self.response_function = response_function
self.optimization_problem = optimization_problem

def GetImplementedPhysicalKratosVariables(self) -> 'list[SupportedSensitivityFieldVariableTypes]':
return self.response_function.GetImplementedPhysicalKratosVariables()

def Initialize(self) -> None:
self.response_function.Initialize()

def Check(self) -> None:
self.response_function.Check()

def Finalize(self) -> None:
self.response_function.Finalize()

def GetInfluencingModelPart(self) -> Kratos.ModelPart:
return self.response_function.GetInfluencingModelPart()

def CalculateValue(self) -> float:
return log(EvaluateValue(self.response_function, self.optimization_problem))

def CalculateGradient(self, physical_variable_collective_expressions: 'dict[SupportedSensitivityFieldVariableTypes, KratosOA.CollectiveExpression]') -> None:
v = EvaluateValue(self.response_function, self.optimization_problem)
resp_physical_variable_collective_expressions = EvaluateGradient(self.response_function, physical_variable_collective_expressions, self.optimization_problem)

for variable, collective_expression in physical_variable_collective_expressions.items():
for result, g in zip(collective_expression.GetContainerExpressions(), resp_physical_variable_collective_expressions[variable].GetContainerExpressions()):
result.SetExpression((g / v).GetExpression())

def GetChildResponses(self) -> 'list[ResponseFunction]':
return [self.response_function]

def __str__(self) -> str:
return f"Response [type = {self.__class__.__name__}, name = {self.GetName()}]"
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,16 @@ def GetInfluencingModelPart(self) -> Kratos.ModelPart:
Kratos.ModelPart: Response function model part which influences the response value.
"""
pass

def GetChildResponses(self) -> 'list[ResponseFunction]':
"""Returns the list of child responses.
This method returns list of child responses. If the current response is a leaf response,
then it will return empty list.
Needs to be implemented in non-leaf response functions.
Returns:
list[ResponseFunction]: List of child responses.
"""
return []
Loading

0 comments on commit 2b11fc6

Please sign in to comment.