Skip to content

Commit

Permalink
Remote table access should use the db public user.
Browse files Browse the repository at this point in the history
https://team-1617704806227.atlassian.net/browse/MIP-746
Added local and public user in node service config.
Added udf result access grant query when result must be sent to remote node.
Minor refactorings.
  • Loading branch information
ThanKarab committed Sep 15, 2023
1 parent e0aac15 commit 35f1ff9
Show file tree
Hide file tree
Showing 30 changed files with 603 additions and 499 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,23 @@
[[nodes]]
id = "localnode1"
role = "LOCALNODE"
monetdb_port=50001
monetdb_password="executor"
rabbitmq_port=5671
monetdb_port=50001
local_monetdb_username="executor"
local_monetdb_password="executor"
public_monetdb_username="guest"
public_monetdb_password="guest"
smpc_client_port=9001
[[nodes]]
id = "localnode2"
role = "LOCALNODE"
monetdb_port=50002
monetdb_password="executor"
rabbitmq_port=5672
monetdb_port=50001
local_monetdb_username="executor"
local_monetdb_password="executor"
public_monetdb_username="guest"
public_monetdb_password="guest"
smpc_client_port=9002
```
Expand Down
6 changes: 4 additions & 2 deletions exareme2/node/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ vhost = "user_vhost"
ip = "$MONETDB_IP"
port = "$MONETDB_PORT"
database = "db"
username = "executor"
password = "$MONETDB_PASSWORD"
local_username = "$MONETDB_LOCAL_USERNAME"
local_password = "$MONETDB_LOCAL_PASSWORD"
public_username = "$MONETDB_PUBLIC_USERNAME"
public_password = "$MONETDB_PUBLIC_PASSWORD"

[smpc]
enabled = "$SMPC_ENABLED"
Expand Down
21 changes: 15 additions & 6 deletions exareme2/node/monetdb_interface/monet_db_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ def _connection():
conn = pymonetdb.connect(
hostname=node_config.monetdb.ip,
port=node_config.monetdb.port,
username=node_config.monetdb.username,
password=node_config.monetdb.password,
username=node_config.monetdb.local_username,
password=node_config.monetdb.local_password,
database=node_config.monetdb.database,
)
yield conn
Expand Down Expand Up @@ -95,14 +95,23 @@ def _lock(query_lock, timeout):
query_lock.release()


def _validate_exception_is_recoverable(exc):
def _validate_exception_could_be_recovered(exc):
"""
Check whether the query needs to be re-executed and return True or False accordingly.
Check the error message and decide if this is an exception that could be successful if re-executed.
ValueError: Cannot be recovered since it's thrown from the udfs.
InsufficientPrivileges: Cannot be recovered since the user provided doesn't have access.
ProgrammingError: Cannot be recovered due to udf error.
"""
if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
return True
elif isinstance(exc, DatabaseError):
return "ValueError" not in str(exc) and not isinstance(exc, ProgrammingError)
if "ValueError" in str(exc):
return False
elif "insufficient privileges" in str(exc):
return False
elif isinstance(exc, ProgrammingError):
return False
return True
else:
return False

Expand All @@ -128,7 +137,7 @@ def error_handling(**kwargs):
try:
return func(**kwargs)
except Exception as exc:
if not _validate_exception_is_recoverable(exc):
if not _validate_exception_could_be_recovered(exc):
logger.error(
f"Error occurred: Exception type: '{type(exc)}' and exception message: '{exc}'"
)
Expand Down
20 changes: 11 additions & 9 deletions exareme2/node/monetdb_interface/remote_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,26 @@ def get_remote_table_names(context_id: str) -> List[str]:


@sql_injection_guard(
name=str.isidentifier,
table_name=str.isidentifier,
monetdb_socket_address=is_socket_address,
schema=is_valid_table_schema,
username=str.isidentifier,
password=str.isidentifier,
table_creator_username=str.isidentifier,
public_username=str.isidentifier,
public_password=str.isidentifier,
)
def create_remote_table(
name: str,
table_name: str,
schema: TableSchema,
monetdb_socket_address: str,
username: str,
password: str,
table_creator_username: str,
public_username: str,
public_password: str,
):
columns_schema = convert_schema_to_sql_query_format(schema)
db_execute_query(
f"""
CREATE REMOTE TABLE {username}.{name}
( {columns_schema}) ON 'mapi:monetdb://{monetdb_socket_address}/db/{username}/{name}'
WITH USER '{username}' PASSWORD '{password}'
CREATE REMOTE TABLE {table_name}
( {columns_schema}) ON 'mapi:monetdb://{monetdb_socket_address}/db/{table_creator_username}/{table_name}'
WITH USER '{public_username}' PASSWORD '{public_password}'
"""
)
12 changes: 7 additions & 5 deletions exareme2/node/tasks/remote_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ def create_remote_table(
The monetdb_socket_address of the monetdb that we want to create the remote table from.
"""
schema = TableSchema.parse_raw(table_schema_json)
username = node_config.monetdb.username
password = node_config.monetdb.password
local_username = node_config.monetdb.local_username
public_username = node_config.monetdb.public_username
public_password = node_config.monetdb.public_password
remote_tables.create_remote_table(
name=table_name,
table_name=table_name,
schema=schema,
monetdb_socket_address=monetdb_socket_address,
username=username,
password=password,
table_creator_username=local_username,
public_username=public_username,
public_password=public_password,
)
56 changes: 51 additions & 5 deletions exareme2/node/tasks/udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from exareme2.node.monetdb_interface.guard import udf_kwargs_validator
from exareme2.node.monetdb_interface.guard import udf_posargs_validator
from exareme2.node.node_logger import initialise_logger
from exareme2.node_tasks_DTOs import ColumnInfo
from exareme2.node_tasks_DTOs import NodeLiteralDTO
from exareme2.node_tasks_DTOs import NodeSMPCDTO
from exareme2.node_tasks_DTOs import NodeTableDTO
Expand Down Expand Up @@ -228,14 +227,15 @@ def _generate_udf_statements(
output_names = _make_output_table_names(outputnum, node_id, context_id, command_id)

# UDF generation
# --------------
udf_definition = udfgen.get_definition(udf_name, output_names)
udf_exec_stmt = udfgen.get_exec_stmt(udf_name, output_names)
udf_results = udfgen.get_results(output_names)

# Create list of udf statements
udf_definitions = [res.create_query for res in udf_results]
udf_definitions.append(udf_definition)
# Create list of udf definitions
table_creation_queries = _get_udf_table_creation_queries(udf_results)
public_username = node_config.monetdb.public_username
table_sharing_queries = _get_udf_table_sharing_queries(udf_results, public_username)
udf_definitions = [*table_creation_queries, *table_sharing_queries, udf_definition]

# Convert results
results = [_convert_result(res) for res in udf_results]
Expand All @@ -259,6 +259,52 @@ def _make_output_table_names(
]


def _get_udf_table_creation_queries(udf_results: List[UDFGenResult]) -> List[str]:
queries = []
for result in udf_results:
if isinstance(result, UDFGenTableResult):
queries.append(result.create_query)
elif isinstance(result, UDFGenSMPCResult):
queries.append(result.template.create_query)
if result.sum_op_values is not None:
queries.append(result.sum_op_values.create_query)
if result.min_op_values is not None:
queries.append(result.min_op_values.create_query)
if result.max_op_values is not None:
queries.append(result.max_op_values.create_query)
else:
raise NotImplementedError
return queries


def _get_table_share_query(tablename: str, public_username: str) -> str:
return f"GRANT SELECT ON TABLE {tablename} TO {public_username}"


def _get_udf_table_sharing_queries(
udf_results: List[UDFGenResult], public_username
) -> List[str]:
"""
Tables should be shared (accessible through the "public" user) in the following cases:
1) The result is of UDFGenTableResult type and the share property is True,
2) The result is of UDFGenSMPCResult type, so the template should be shared, the rest of the tables
will pass through the SMPC.
"""
queries = []
for result in udf_results:
if isinstance(result, UDFGenTableResult):
queries.append(
_get_table_share_query(result.table_name, public_username)
) if result.share else None
elif isinstance(result, UDFGenSMPCResult):
queries.append(
_get_table_share_query(result.template.table_name, public_username)
)
else:
raise NotImplementedError
return queries


def _convert_result(result: UDFGenResult) -> NodeUDFDTO:
if isinstance(result, UDFGenTableResult):
return _convert_table_result(result)
Expand Down
8 changes: 5 additions & 3 deletions exareme2/udfgen/iotypes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC
from abc import abstractproperty
from abc import abstractmethod
from typing import TypeVar

from exareme2 import DType as dt
Expand Down Expand Up @@ -50,7 +50,8 @@ class InputType(IOType):


class OutputType(IOType):
@abstractproperty
@property
@abstractmethod
def schema(self):
raise NotImplementedError

Expand Down Expand Up @@ -100,7 +101,8 @@ def placeholder(name):


class TableType(ABC):
@abstractproperty
@property
@abstractmethod
def schema(self):
raise NotImplementedError

Expand Down
37 changes: 27 additions & 10 deletions exareme2/udfgen/py_udfgenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def output_types(self) -> List[Union[OutputType, LoopbackOutputType]]:
main_output_type = relation(schema=self.output_schema)
return (main_output_type,)

# case: output type has to be infered at execution time
# case: output type has to be inferred at execution time
main_output_type, *_ = self.funcparts.output_types
if (
isinstance(main_output_type, ParametrizedType)
Expand Down Expand Up @@ -520,7 +520,7 @@ def get_results(self, output_table_names: List[str]) -> List[UDFGenResult]:
List[UDFGenResult]
UDF results
"""
builder = UdfResultBuilder(self.output_types, self.smpc_used)
builder = UdfResultBuilder(self.output_types)
results = builder.build_results(output_table_names)
return results

Expand Down Expand Up @@ -920,9 +920,8 @@ def _make_relations_where_clause(relations):
class UdfResultBuilder:
"""Builder class for the UDF result objects"""

def __init__(self, output_types: List[OutputType], smpc_used: bool = False) -> None:
def __init__(self, output_types: List[OutputType]) -> None:
self.output_types = output_types
self.smpc_used = smpc_used

def build_results(self, output_table_names: List[str]) -> List[UDFGenResult]:
main_table_name, *sec_table_names = output_table_names
Expand All @@ -935,26 +934,44 @@ def build_results(self, output_table_names: List[str]) -> List[UDFGenResult]:
]
return udf_outputs

@staticmethod
def _is_output_type_shareable(output_type: OutputType) -> bool:
# TransferType tables are meant to be shared with the remote nodes
if isinstance(output_type, TransferType):
return True

# SecureTransferType should be shared if smpc is disabled.
# If smpc is enabled SecureTransferType results would be cast to SMPCSecureTransferType as well
if isinstance(output_type, SecureTransferType) and not isinstance(
output_type, SMPCSecureTransferType
):
return True
return False

def _make_result(
self,
output_type: OutputType,
table_name: str,
) -> UDFGenResult:
if isinstance(output_type, SecureTransferType) and self.smpc_used:
if isinstance(output_type, SMPCSecureTransferType):
return self._make_smpc_result(output_type, table_name)
else:
return self._make_table_result(output_type, table_name)
return self._make_table_result(
output_type, table_name, self._is_output_type_shareable(output_type)
)

@staticmethod
def _make_table_result(
output_type: OutputType,
table_name: str,
share: bool,
) -> UDFGenResult:
create = CreateTable(table_name, output_type.schema).compile()
return UDFGenTableResult(
table_name=table_name,
table_schema=output_type.schema,
create_query=create,
share=share,
)

def _make_smpc_result(
Expand All @@ -965,15 +982,15 @@ def _make_smpc_result(
placeholders = get_smpc_tablename_placeholders(table_name_prefix)
template_ph, sum_op_ph, min_op_ph, max_op_ph = placeholders

template = self._make_table_result(output_type, template_ph)
template = self._make_table_result(output_type, template_ph, True)

sum_op, min_op, max_op = None, None, None
if output_type.sum_op:
sum_op = self._make_table_result(output_type, sum_op_ph)
sum_op = self._make_table_result(output_type, sum_op_ph, False)
if output_type.min_op:
min_op = self._make_table_result(output_type, min_op_ph)
min_op = self._make_table_result(output_type, min_op_ph, False)
if output_type.max_op:
max_op = self._make_table_result(output_type, max_op_ph)
max_op = self._make_table_result(output_type, max_op_ph, False)

return UDFGenSMPCResult(
template=template,
Expand Down
12 changes: 1 addition & 11 deletions exareme2/udfgen/udfgen_DTOs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,11 @@ class UDFGenTableResult(UDFGenResult):
table_name: str
table_schema: List[Tuple[str, DType]]
create_query: str
share: bool = False


class UDFGenSMPCResult(UDFGenResult):
template: UDFGenTableResult
sum_op_values: Optional[UDFGenTableResult] = None
min_op_values: Optional[UDFGenTableResult] = None
max_op_values: Optional[UDFGenTableResult] = None

@property
def create_query(self):
queries = [self.template.create_query]
if self.sum_op_values is not None:
queries.append(self.sum_op_values.create_query)
if self.min_op_values is not None:
queries.append(self.min_op_values.create_query)
if self.max_op_values is not None:
queries.append(self.max_op_values.create_query)
return "".join(queries)
8 changes: 7 additions & 1 deletion kubernetes/templates/mipengine-globalnode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,14 @@ spec:
fieldPath: status.podIP
- name: MONETDB_PORT
value: "50000"
- name: MONETDB_PASSWORD
- name: MONETDB_LOCAL_USERNAME
value: "executor"
- name: MONETDB_LOCAL_PASSWORD
value: "executor"
- name: MONETDB_PUBLIC_USERNAME
value: "guest"
- name: MONETDB_PUBLIC_PASSWORD
value: "guest"
- name: SMPC_ENABLED
value: {{ quote .Values.smpc.enabled }}
{{ if .Values.smpc.enabled }}
Expand Down
Loading

0 comments on commit 35f1ff9

Please sign in to comment.