From 0418d0ca835fe6f5b5fa5b726773872183301e3d Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Fri, 29 Nov 2024 19:15:21 +0100 Subject: [PATCH 01/15] Fix the exporter as a starting point --- backend/core/views.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/backend/core/views.py b/backend/core/views.py index 0f243c4fc..ff175389f 100644 --- a/backend/core/views.py +++ b/backend/core/views.py @@ -653,26 +653,38 @@ def risk_assessment_csv(self, request, pk): "name", "description", "existing_controls", + "current_impact", + "current_proba", "current_level", "applied_controls", + "residual_impact", + "residual_proba", "residual_level", "treatment", ] writer.writerow(columns) - for scenario in risk_assessment.risk_scenarios.all().order_by("created_at"): - applied_controls = ",".join( + for scenario in risk_assessment.risk_scenarios.all().order_by("ref_id"): + extra_controls = ",".join( [m.csv_value for m in scenario.applied_controls.all()] ) + existing_controls = ",".join( + [m.csv_value for m in scenario.existing_applied_controls.all()] + ) + threats = ",".join([t.name for t in scenario.threats.all()]) row = [ scenario.ref_id, threats, scenario.name, scenario.description, - scenario.existing_controls, + existing_controls, + scenario.get_current_impact()["name"], + scenario.get_current_proba()["name"], scenario.get_current_risk()["name"], - applied_controls, + extra_controls, + scenario.get_residual_impact()["name"], + scenario.get_residual_proba()["name"], scenario.get_residual_risk()["name"], scenario.treatment, ] From 6f9ffa9404ef9ad5a7e514f87ed640842ca095e1 Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Fri, 29 Nov 2024 20:16:14 +0100 Subject: [PATCH 02/15] Enablers on CLICA --- cli/clica.py | 61 ++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/cli/clica.py b/cli/clica.py index 5ecbb1a89..8b80bc3ec 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -6,7 +6,8 @@ import pandas as pd import requests import yaml -from rich import print +import json +from rich import print as rprint cli_cfg = dict() auth_data = dict() @@ -81,7 +82,6 @@ def init_config(): def check_auth(): if Path(".tmp.yaml").exists(): - click.echo("Found auth data. Trying them", err=True) with open(".tmp.yaml", "r") as yfile: auth_data = yaml.safe_load(yfile) return auth_data["token"] @@ -143,6 +143,36 @@ def get_folders(): print(res) +@click.command() +def get_projects(): + """getting projects as a json""" + url = f"{API_URL}/projects/" + headers = {"Authorization": f"Token {TOKEN}"} + res = requests.get(url, headers=headers, verify=VERIFY_CERTIFICATE) + if res.status_code == 200: + print(json.dumps(res.json(), ensure_ascii=False)) + + +@click.command() +def get_loaded_matrix(): + """getting loaded matrix as a json""" + url = f"{API_URL}/risk-matrices/" + headers = {"Authorization": f"Token {TOKEN}"} + res = requests.get(url, headers=headers, verify=VERIFY_CERTIFICATE) + if res.status_code == 200: + print(json.dumps(res.json(), ensure_ascii=False)) + + +@click.command() +@click.option("--file", required=True, help="") +@click.option("--file", required=True, help="") +@click.option("--file", required=True, help="") +@click.option("--file", required=True, help="") +def import_risk_assessment(file, project, name, matrix): + """this will parse the items of a risk assessment and create the assoicated objects""" + pass + + @click.command() @click.option("--file", required=True, help="Path of the csv file with assets") def import_assets(file): @@ -172,9 +202,9 @@ def import_assets(file): ) if res.status_code != 201: click.echo("❌ something went wrong", err=True) - print(res.json()) + rprint(res.json()) else: - print(f"✅ {name} created", file=sys.stderr) + rprint(f"✅ {name} created", file=sys.stderr) @click.command() @@ -208,9 +238,9 @@ def import_controls(file): ) if res.status_code != 201: click.echo("❌ something went wrong", err=True) - print(res.json()) + rprint(res.json()) else: - print(f"✅ {name} created", file=sys.stderr) + rprint(f"✅ {name} created", file=sys.stderr) @click.command() @@ -240,9 +270,9 @@ def import_evidences(file): ) if res.status_code != 201: click.echo("❌ something went wrong", err=True) - print(res.json()) + rprint(res.json()) else: - print(f"✅ {row['name']} created", file=sys.stderr) + rprint(f"✅ {row['name']} created", file=sys.stderr) @click.command() @@ -260,13 +290,13 @@ def upload_attachment(file, name): url, headers=headers, params={"name": name}, verify=VERIFY_CERTIFICATE ) data = res.json() - print(data) + rprint(data) if res.status_code != 200: - print(data) - print(f"Error: check credentials or filename.", file=sys.stderr) + rprint(data) + rprint(f"Error: check credentials or filename.", file=sys.stderr) return if not data["results"]: - print(f"Error: No evidence found with name '{name}'", file=sys.stderr) + rprint(f"Error: No evidence found with name '{name}'", file=sys.stderr) return evidence_id = data["results"][0]["id"] @@ -280,16 +310,19 @@ def upload_attachment(file, name): } with open(file, "rb") as f: res = requests.post(url, headers=headers, data=f, verify=VERIFY_CERTIFICATE) - print(res) - print(res.text) + rprint(res) + rprint(res.text) cli.add_command(get_folders) +cli.add_command(get_projects) cli.add_command(auth) cli.add_command(import_assets) cli.add_command(import_controls) cli.add_command(import_evidences) cli.add_command(init_config) cli.add_command(upload_attachment) +cli.add_command(import_risk_assessment) +cli.add_command(get_loaded_matrix) if __name__ == "__main__": cli() From 1541a85a9526e5b9297f2c8bdbd8635e0ff5a00b Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Fri, 29 Nov 2024 21:05:49 +0100 Subject: [PATCH 03/15] wip --- cli/clica.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/cli/clica.py b/cli/clica.py index 8b80bc3ec..615070cfb 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -165,12 +165,21 @@ def get_loaded_matrix(): @click.command() @click.option("--file", required=True, help="") -@click.option("--file", required=True, help="") -@click.option("--file", required=True, help="") -@click.option("--file", required=True, help="") +@click.option("--project", required=True, help="") +@click.option("--matrix", required=True, help="") +@click.option("--name", required=True, help="") def import_risk_assessment(file, project, name, matrix): """this will parse the items of a risk assessment and create the assoicated objects""" - pass + df = pd.read_csv(file) + url = f"{API_URL}/assets/" + headers = { + "Authorization": f"Token {TOKEN}", + } + # post to create risk assessment + # get the id + # sequential post to create the assets if any + # sequential post to create the threats if any + # sequential post over the scenarios @click.command() From 03c31dbddc670a931ee3850e02942c3f820a35d8 Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sat, 30 Nov 2024 15:46:01 +0100 Subject: [PATCH 04/15] More enablers --- backend/core/views.py | 63 +++++++++++++++++++++++++++++++++++++++++++ cli/clica.py | 30 ++++++++++++++++++--- 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/backend/core/views.py b/backend/core/views.py index ff175389f..c9298df16 100644 --- a/backend/core/views.py +++ b/backend/core/views.py @@ -298,6 +298,22 @@ def quality_check_detail(self, request, pk): else: return Response(status=HTTP_403_FORBIDDEN) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=Project, + ) + for item in Project.objects.filter(id__in=viewable_items): + if my_map.get(item.folder.name) is None: + my_map[item.folder.name] = {} + my_map[item.folder.name].update({item.name: item.id}) + + return Response(my_map) + class ThreatViewSet(BaseModelViewSet): """ @@ -318,6 +334,21 @@ def retrieve(self, request, *args, **kwargs): def threats_count(self, request): return Response({"results": threats_count_per_name(request.user)}) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=Threat, + ) + for item in Threat.objects.filter(id__in=viewable_items): + if my_map.get(item.folder.name) is None: + my_map[item.folder.name] = {} + my_map[item.folder.name].update({item.name: item.id}) + return Response(my_map) + class AssetViewSet(BaseModelViewSet): """ @@ -395,6 +426,21 @@ def graph(self, request): {"nodes": nodes, "links": links, "categories": categories, "meta": meta} ) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=Asset, + ) + for item in Asset.objects.filter(id__in=viewable_items): + if my_map.get(item.folder.name) is None: + my_map[item.folder.name] = {} + my_map[item.folder.name].update({item.name: item.id}) + return Response(my_map) + @action(detail=False, name="Get security objectives") def security_objectives(self, request): return Response({"results": Asset.DEFAULT_SECURITY_OBJECTIVES}) @@ -649,6 +695,7 @@ def risk_assessment_csv(self, request, pk): writer = csv.writer(response, delimiter=";") columns = [ "ref_id", + "assets", "threats", "name", "description", @@ -673,8 +720,11 @@ def risk_assessment_csv(self, request, pk): ) threats = ",".join([t.name for t in scenario.threats.all()]) + assets = ",".join([t.name for t in scenario.assets.all()]) + row = [ scenario.ref_id, + assets, threats, scenario.name, scenario.description, @@ -1513,6 +1563,19 @@ def org_tree(self, request): return Response(tree) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=Folder, + ) + for item in Folder.objects.filter(id__in=viewable_items): + my_map[item.name] = item.id + return Response(my_map) + @action(detail=False, methods=["get"]) def my_assignments(self, request): risk_assessments = RiskAssessment.objects.filter( diff --git a/cli/clica.py b/cli/clica.py index 615070cfb..f5aea68a5 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -165,20 +165,44 @@ def get_loaded_matrix(): @click.command() @click.option("--file", required=True, help="") +@click.option("--folder", required=True, help="") @click.option("--project", required=True, help="") @click.option("--matrix", required=True, help="") @click.option("--name", required=True, help="") -def import_risk_assessment(file, project, name, matrix): +@click.option( + "--create-all", + required=False, + is_flag=True, + default=True, + help="Create all associated objects (threats, assets)", +) +def import_risk_assessment(file, folder, project, name, matrix): """this will parse the items of a risk assessment and create the assoicated objects""" df = pd.read_csv(file) url = f"{API_URL}/assets/" headers = { "Authorization": f"Token {TOKEN}", } + # post to create risk assessment - # get the id - # sequential post to create the assets if any + data = { + "name": name, + "folder": folder, + "project": project, + "risk_matrix": matrix, + } + res = requests.post( + f"{API_URL}/risk-assessments/", + json=data, + headers=headers, + verify=VERIFY_CERTIFICATE, + ) + ra_id = None + if res.status_code == 200: + ra_id = res.json().get("id") + # sequential post to create the threats if any + # sequential post to create the assets if any # sequential post over the scenarios From 13e62ba2a1529a5f8700ce74f2393146c5642d90 Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sat, 30 Nov 2024 16:25:39 +0100 Subject: [PATCH 05/15] enablers for the cli --- backend/core/views.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/backend/core/views.py b/backend/core/views.py index c9298df16..d3f2bf2e7 100644 --- a/backend/core/views.py +++ b/backend/core/views.py @@ -1132,6 +1132,22 @@ def get_timeline_info(self, request): colorMap[domain.name] = next(color_cycle) return Response({"entries": entries, "colorMap": colorMap}) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=AppliedControl, + ) + for item in AppliedControl.objects.filter(id__in=viewable_items): + if my_map.get(item.folder.name) is None: + my_map[item.folder.name] = {} + my_map[item.folder.name].update({item.name: item.id}) + + return Response(my_map) + class PolicyViewSet(AppliedControlViewSet): model = Policy From 68fb96cb684c875aba4200751e179afd5029072d Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sat, 30 Nov 2024 18:53:35 +0100 Subject: [PATCH 06/15] cleaner implem and more enablers --- backend/core/views.py | 26 +++++++++++++---- cli/clica.py | 66 +++++++++++++++++++++++++++---------------- 2 files changed, 62 insertions(+), 30 deletions(-) diff --git a/backend/core/views.py b/backend/core/views.py index d3f2bf2e7..278c6d53a 100644 --- a/backend/core/views.py +++ b/backend/core/views.py @@ -505,6 +505,22 @@ def used(self, request): ) return Response({"results": used_matrices}) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=RiskMatrix, + ) + for item in RiskMatrix.objects.filter(id__in=viewable_items): + if my_map.get(item.folder.name) is None: + my_map[item.folder.name] = {} + my_map[item.folder.name].update({item.name: item.id}) + + return Response(my_map) + class VulnerabilityViewSet(BaseModelViewSet): """ @@ -703,7 +719,7 @@ def risk_assessment_csv(self, request, pk): "current_impact", "current_proba", "current_level", - "applied_controls", + "additional_controls", "residual_impact", "residual_proba", "residual_level", @@ -712,11 +728,11 @@ def risk_assessment_csv(self, request, pk): writer.writerow(columns) for scenario in risk_assessment.risk_scenarios.all().order_by("ref_id"): - extra_controls = ",".join( - [m.csv_value for m in scenario.applied_controls.all()] + additional_controls = ",".join( + [m.name for m in scenario.applied_controls.all()] ) existing_controls = ",".join( - [m.csv_value for m in scenario.existing_applied_controls.all()] + [m.name for m in scenario.existing_applied_controls.all()] ) threats = ",".join([t.name for t in scenario.threats.all()]) @@ -732,7 +748,7 @@ def risk_assessment_csv(self, request, pk): scenario.get_current_impact()["name"], scenario.get_current_proba()["name"], scenario.get_current_risk()["name"], - extra_controls, + additional_controls, scenario.get_residual_impact()["name"], scenario.get_residual_proba()["name"], scenario.get_residual_risk()["name"], diff --git a/cli/clica.py b/cli/clica.py index f5aea68a5..ff6da04bc 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -7,7 +7,7 @@ import requests import yaml import json -from rich import print as rprint +from rich import print as rprint, print_json cli_cfg = dict() auth_data = dict() @@ -123,6 +123,18 @@ def auth(email, password): print(res.json()) +def ids_map(model, folder=None): + my_map = dict() + url = f"{API_URL}/{model}/ids/" + headers = {"Authorization": f"Token {TOKEN}"} + res = requests.get(url, headers=headers, verify=VERIFY_CERTIFICATE) + if folder: + my_map = res.json().get(folder) + else: + my_map = res.json() + return my_map + + def _get_folders(): url = f"{API_URL}/folders/" headers = {"Authorization": f"Token {TOKEN}"} @@ -138,29 +150,19 @@ def _get_folders(): @click.command() def get_folders(): """Get folders.""" - GLOBAL_FOLDER_ID, res = _get_folders() - print("GLOBAL_FOLDER_ID: ", GLOBAL_FOLDER_ID) - print(res) + print(json.dumps(ids_map("folders"), ensure_ascii=False)) @click.command() def get_projects(): """getting projects as a json""" - url = f"{API_URL}/projects/" - headers = {"Authorization": f"Token {TOKEN}"} - res = requests.get(url, headers=headers, verify=VERIFY_CERTIFICATE) - if res.status_code == 200: - print(json.dumps(res.json(), ensure_ascii=False)) + print(json.dumps(ids_map("projects"), ensure_ascii=False)) @click.command() -def get_loaded_matrix(): +def get_matrices(): """getting loaded matrix as a json""" - url = f"{API_URL}/risk-matrices/" - headers = {"Authorization": f"Token {TOKEN}"} - res = requests.get(url, headers=headers, verify=VERIFY_CERTIFICATE) - if res.status_code == 200: - print(json.dumps(res.json(), ensure_ascii=False)) + print(json.dumps(ids_map("risk-matrices", folder="Global"), ensure_ascii=False)) @click.command() @@ -170,26 +172,28 @@ def get_loaded_matrix(): @click.option("--matrix", required=True, help="") @click.option("--name", required=True, help="") @click.option( - "--create-all", + "--create_all", required=False, is_flag=True, default=True, help="Create all associated objects (threats, assets)", ) -def import_risk_assessment(file, folder, project, name, matrix): - """this will parse the items of a risk assessment and create the assoicated objects""" - df = pd.read_csv(file) - url = f"{API_URL}/assets/" +def import_risk_assessment(file, folder, project, name, matrix, create_all): + """crawl a risk assessment (see template) and create the assoicated objects""" + df = pd.read_csv(file, delimiter=";") headers = { "Authorization": f"Token {TOKEN}", } + folder_id = ids_map("folders").get(folder) + project_id = ids_map("projects", folder=folder).get(project) + matrix_id = ids_map("risk-matrices", folder="Global").get(matrix) # post to create risk assessment data = { "name": name, - "folder": folder, - "project": project, - "risk_matrix": matrix, + "folder": folder_id, + "project": project_id, + "risk_matrix": matrix_id, } res = requests.post( f"{API_URL}/risk-assessments/", @@ -198,11 +202,23 @@ def import_risk_assessment(file, folder, project, name, matrix): verify=VERIFY_CERTIFICATE, ) ra_id = None - if res.status_code == 200: + if res.status_code == 201: ra_id = res.json().get("id") + print("ok") + else: + print("something went wrong.") + print(res.json()) # sequential post to create the threats if any + rprint(df["threats"]) + rprint(df["assets"]) + rprint(df["existing_controls"]) + rprint(df["additional_controls"]) + # sequential post to create the assets if any + + # sequential post to create the controls if any + # sequential post over the scenarios @@ -356,6 +372,6 @@ def upload_attachment(file, name): cli.add_command(init_config) cli.add_command(upload_attachment) cli.add_command(import_risk_assessment) -cli.add_command(get_loaded_matrix) +cli.add_command(get_matrices) if __name__ == "__main__": cli() From e352039006c9ab9e3f86d88b2bb7331e45e11cd1 Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sat, 30 Nov 2024 20:13:14 +0100 Subject: [PATCH 07/15] create associated objects --- cli/clica.py | 42 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/cli/clica.py b/cli/clica.py index ff6da04bc..2c7394396 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -165,6 +165,34 @@ def get_matrices(): print(json.dumps(ids_map("risk-matrices", folder="Global"), ensure_ascii=False)) +def get_unique_parsed_values(df, column_name): + unique_values = df[column_name].dropna().unique() + parsed_values = [] + + for value in unique_values: + value_str = str(value) + split_values = [v.strip() for v in value_str.split(",")] + parsed_values.extend(split_values) + + return set(parsed_values) + + +def batch_create(model, items, folder_id): + headers = { + "Authorization": f"Token {TOKEN}", + } + url = f"{API_URL}/{model}/" + for item in items: + data = { + "folder": folder_id, + "name": item, + } + res = requests.post(url, json=data, headers=headers) + if res.status_code != 201: + print("something went wrong") + print(res.json()) + + @click.command() @click.option("--file", required=True, help="") @click.option("--folder", required=True, help="") @@ -209,11 +237,15 @@ def import_risk_assessment(file, folder, project, name, matrix, create_all): print("something went wrong.") print(res.json()) - # sequential post to create the threats if any - rprint(df["threats"]) - rprint(df["assets"]) - rprint(df["existing_controls"]) - rprint(df["additional_controls"]) + if create_all: + threats = get_unique_parsed_values(df, "threats") + batch_create("threats", threats, folder_id) + assets = get_unique_parsed_values(df, "assets") + batch_create("assets", assets, folder_id) + existing_controls = get_unique_parsed_values(df, "existing_controls") + batch_create("applied-controls", existing_controls, folder_id) + additional_controls = get_unique_parsed_values(df, "additional_controls") + batch_create("applied-controls", additional_controls, folder_id) # sequential post to create the assets if any From 949b688c37e0684f6202ad76fdbb4bd633738300 Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sat, 30 Nov 2024 20:30:22 +0100 Subject: [PATCH 08/15] Refactor code --- cli/RA_sample.csv | 4 ++++ cli/clica.py | 15 +++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) create mode 100644 cli/RA_sample.csv diff --git a/cli/RA_sample.csv b/cli/RA_sample.csv new file mode 100644 index 000000000..27c6048f5 --- /dev/null +++ b/cli/RA_sample.csv @@ -0,0 +1,4 @@ +ref_id;assets;threats;name;description;existing_controls;current_impact;current_proba;current_level;additional_controls;residual_impact;residual_proba;residual_level;treatment +R.1;dsafa;Data Encrypted for Impact;Ransomware;;ISMS Scope document,Statement of Applicabilty document;Significant;Likely;Low;Risk management policy,Organization overview document,Main policy,Competency matrix;Critical;Very likely;High;open +R.2;;System Shutdown/Reboot;Unavailability;;Information security awareness and traning policy;Significant;Very likely;Medium;Management review plan document,Main policy,ISMS Scope document,Responsibility matrix;Important;Unlikely;Medium;open +R.3;dsasfad;Scheduled Task,Cloud Administration Command;Insider threats;;;Important;Likely;Medium;;--;--;--;open diff --git a/cli/clica.py b/cli/clica.py index 2c7394396..8c2b53a9c 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -247,11 +247,18 @@ def import_risk_assessment(file, folder, project, name, matrix, create_all): additional_controls = get_unique_parsed_values(df, "additional_controls") batch_create("applied-controls", additional_controls, folder_id) - # sequential post to create the assets if any - - # sequential post to create the controls if any - + df = df.fillna("") # sequential post over the scenarios + for scenario in df.itertuples(): + print( + scenario.ref_id, + scenario.name, + scenario.description, + scenario.current_impact, + scenario.current_proba, + scenario.residual_impact, + scenario.residual_proba, + ) @click.command() From b4b0df963b8df4e7f5642df68991f2e6ede0858b Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sat, 30 Nov 2024 20:41:50 +0100 Subject: [PATCH 09/15] Examples --- cli/README.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 cli/README.md diff --git a/cli/README.md b/cli/README.md new file mode 100644 index 000000000..be0afcfeb --- /dev/null +++ b/cli/README.md @@ -0,0 +1,6 @@ + +## Examples + +```sh +./clica.py import-risk-assessment --file RA_sample.csv --folder "BU 1" --project "Orion" --matrix "4x4 risk matrix from EBIOS-RM" --name example +``` From 8ecf5acbd7f9d4c9581c7feb607a05413e9203c6 Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sat, 30 Nov 2024 22:27:31 +0100 Subject: [PATCH 10/15] wip: matrix reverse map --- backend/core/views.py | 4 ++-- cli/RA_sample.csv | 2 +- cli/clica.py | 22 ++++++++++++++++++++++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/backend/core/views.py b/backend/core/views.py index 278c6d53a..e84d2254a 100644 --- a/backend/core/views.py +++ b/backend/core/views.py @@ -718,11 +718,11 @@ def risk_assessment_csv(self, request, pk): "existing_controls", "current_impact", "current_proba", - "current_level", + "current_risk", "additional_controls", "residual_impact", "residual_proba", - "residual_level", + "residual_risk", "treatment", ] writer.writerow(columns) diff --git a/cli/RA_sample.csv b/cli/RA_sample.csv index 27c6048f5..904518c88 100644 --- a/cli/RA_sample.csv +++ b/cli/RA_sample.csv @@ -1,4 +1,4 @@ -ref_id;assets;threats;name;description;existing_controls;current_impact;current_proba;current_level;additional_controls;residual_impact;residual_proba;residual_level;treatment +ref_id;assets;threats;name;description;existing_controls;current_impact;current_proba;current_risk;additional_controls;residual_impact;residual_proba;residual_risk;treatment R.1;dsafa;Data Encrypted for Impact;Ransomware;;ISMS Scope document,Statement of Applicabilty document;Significant;Likely;Low;Risk management policy,Organization overview document,Main policy,Competency matrix;Critical;Very likely;High;open R.2;;System Shutdown/Reboot;Unavailability;;Information security awareness and traning policy;Significant;Very likely;Medium;Management review plan document,Main policy,ISMS Scope document,Responsibility matrix;Important;Unlikely;Medium;open R.3;dsasfad;Scheduled Task,Cloud Administration Command;Insider threats;;;Important;Likely;Medium;;--;--;--;open diff --git a/cli/clica.py b/cli/clica.py index 8c2b53a9c..9d0eabdaf 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -247,6 +247,26 @@ def import_risk_assessment(file, folder, project, name, matrix, create_all): additional_controls = get_unique_parsed_values(df, "additional_controls") batch_create("applied-controls", additional_controls, folder_id) + res = requests.get(f"{API_URL}/risk-matrices/{matrix_id}", headers=headers) + if res.status_code == 200: + matrix_def = res.json().get("json_definition") + matrix_def = json.loads(matrix_def) + # rprint(matrix_def) + impact_map = dict() + probability_map = dict() + risk_map = dict() + # this can be factored as one map probably + for item in matrix_def["impact"]: + impact_map[item["name"]] = item["id"] + for item in matrix_def["probability"]: + probability_map[item["name"]] = item["id"] + for item in matrix_def["risk"]: + risk_map[item["name"]] = item["id"] + + rprint(risk_map) + rprint(impact_map) + rprint(probability_map) + df = df.fillna("") # sequential post over the scenarios for scenario in df.itertuples(): @@ -256,8 +276,10 @@ def import_risk_assessment(file, folder, project, name, matrix, create_all): scenario.description, scenario.current_impact, scenario.current_proba, + scenario.current_risk, scenario.residual_impact, scenario.residual_proba, + scenario.residual_risk, ) From 090649cc16da2fb778602e2da7eb42f49cf804bb Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sun, 1 Dec 2024 11:19:58 +0100 Subject: [PATCH 11/15] Functional now with partial data --- cli/clica.py | 71 +++++++++++++++++++++++++++++++++----------- cli/requirements.txt | 1 + 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/cli/clica.py b/cli/clica.py index 9d0eabdaf..d3406e20e 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -9,6 +9,8 @@ import json from rich import print as rprint, print_json +# from icecream import ic + cli_cfg = dict() auth_data = dict() @@ -128,6 +130,9 @@ def ids_map(model, folder=None): url = f"{API_URL}/{model}/ids/" headers = {"Authorization": f"Token {TOKEN}"} res = requests.get(url, headers=headers, verify=VERIFY_CERTIFICATE) + if res.status_code != 200: + print("something went wrong. check authentication.") + sys.exit(1) if folder: my_map = res.json().get(folder) else: @@ -181,6 +186,7 @@ def batch_create(model, items, folder_id): headers = { "Authorization": f"Token {TOKEN}", } + output = dict() url = f"{API_URL}/{model}/" for item in items: data = { @@ -191,6 +197,9 @@ def batch_create(model, items, folder_id): if res.status_code != 201: print("something went wrong") print(res.json()) + else: + output.update({item: res.json()["id"]}) + return output @click.command() @@ -253,34 +262,60 @@ def import_risk_assessment(file, folder, project, name, matrix, create_all): matrix_def = json.loads(matrix_def) # rprint(matrix_def) impact_map = dict() - probability_map = dict() + proba_map = dict() risk_map = dict() # this can be factored as one map probably for item in matrix_def["impact"]: impact_map[item["name"]] = item["id"] for item in matrix_def["probability"]: - probability_map[item["name"]] = item["id"] + proba_map[item["name"]] = item["id"] for item in matrix_def["risk"]: risk_map[item["name"]] = item["id"] - rprint(risk_map) - rprint(impact_map) - rprint(probability_map) + df = df.fillna("--") + + threats = ids_map("threats", folder) + assets = ids_map("assets", folder) + controls = ids_map("applied-controls", folder) - df = df.fillna("") - # sequential post over the scenarios for scenario in df.itertuples(): - print( - scenario.ref_id, - scenario.name, - scenario.description, - scenario.current_impact, - scenario.current_proba, - scenario.current_risk, - scenario.residual_impact, - scenario.residual_proba, - scenario.residual_risk, - ) + data = { + "ref_id": scenario.ref_id, + "name": scenario.name, + "risk_assessment": ra_id, + } + if scenario.current_impact != "--": + data.update({"current_impact": impact_map.get(scenario.current_impact)}) + if scenario.current_proba != "--": + data.update({"current_proba": proba_map.get(scenario.current_proba)}) + + if scenario.residual_impact != "--": + data.update({"residual_impact": impact_map.get(scenario.residual_impact)}) + if scenario.residual_proba != "--": + data.update({"residual_proba": proba_map.get(scenario.residual_proba)}) + + if scenario.existing_controls != "--": + items = str(scenario.existing_controls).split(",") + data.update( + {"existing_applied_controls": [controls[item] for item in items]} + ) + + if scenario.additional_controls != "--": + items = str(scenario.additional_controls).split(",") + data.update({"applied_controls": [controls[item] for item in items]}) + + if scenario.assets != "--": + items = str(scenario.assets).split(",") + data.update({"assets": [assets[item] for item in items]}) + + if scenario.threats != "--": + items = str(scenario.threats).split(",") + data.update({"threats": [threats[item] for item in items]}) + + res = requests.post(f"{API_URL}/risk-scenarios/", json=data, headers=headers) + if res.status_code != 201: + rprint(res.json()) + rprint(data) @click.command() diff --git a/cli/requirements.txt b/cli/requirements.txt index a857296db..1dbed4d6a 100644 --- a/cli/requirements.txt +++ b/cli/requirements.txt @@ -3,3 +3,4 @@ rich requests click pyyaml +icecream From 1aacf148046dd2a30466d74d2fee11643814ff86 Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sun, 1 Dec 2024 11:27:14 +0100 Subject: [PATCH 12/15] fixup --- cli/clica.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/clica.py b/cli/clica.py index d3406e20e..4d21727a2 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -7,7 +7,7 @@ import requests import yaml import json -from rich import print as rprint, print_json +from rich import print as rprint # from icecream import ic From 6445f118df656b35f40e375a9944658c07fb7684 Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sun, 1 Dec 2024 12:35:12 +0100 Subject: [PATCH 13/15] manage translations of matrix --- cli/clica.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/cli/clica.py b/cli/clica.py index 4d21727a2..56496b8e7 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -9,7 +9,7 @@ import json from rich import print as rprint -# from icecream import ic +from icecream import ic cli_cfg = dict() auth_data = dict() @@ -260,17 +260,25 @@ def import_risk_assessment(file, folder, project, name, matrix, create_all): if res.status_code == 200: matrix_def = res.json().get("json_definition") matrix_def = json.loads(matrix_def) - # rprint(matrix_def) + ic(matrix_def) impact_map = dict() proba_map = dict() - risk_map = dict() # this can be factored as one map probably for item in matrix_def["impact"]: impact_map[item["name"]] = item["id"] + if item.get("translations"): + langs = item.get("translations") + for lang in langs: + impact_map[langs[lang]["name"]] = item["id"] for item in matrix_def["probability"]: proba_map[item["name"]] = item["id"] - for item in matrix_def["risk"]: - risk_map[item["name"]] = item["id"] + if item.get("translations"): + langs = item.get("translations") + for lang in langs: + proba_map[langs[lang]["name"]] = item["id"] + + ic(impact_map) + ic(proba_map) df = df.fillna("--") From f79bb8d9c215e99600576da54ebaf09cbbe623ab Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sun, 1 Dec 2024 13:23:51 +0100 Subject: [PATCH 14/15] Fail on matrix labels mismatch --- cli/clica.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cli/clica.py b/cli/clica.py index 56496b8e7..5a930ea31 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -260,7 +260,7 @@ def import_risk_assessment(file, folder, project, name, matrix, create_all): if res.status_code == 200: matrix_def = res.json().get("json_definition") matrix_def = json.loads(matrix_def) - ic(matrix_def) + # ic(matrix_def) impact_map = dict() proba_map = dict() # this can be factored as one map probably @@ -292,6 +292,14 @@ def import_risk_assessment(file, folder, project, name, matrix, create_all): "name": scenario.name, "risk_assessment": ra_id, } + if None in [ + impact_map.get(scenario.current_impact), + proba_map.get(scenario.current_proba), + impact_map.get(scenario.residual_impact), + proba_map.get(scenario.residual_proba), + ]: + print("Matrix doesn't match the labels used on your input file") + sys.exit(1) if scenario.current_impact != "--": data.update({"current_impact": impact_map.get(scenario.current_impact)}) if scenario.current_proba != "--": From 263da462d2462f68e9a9c93e6f08b3dee7bbacc1 Mon Sep 17 00:00:00 2001 From: Abderrahmane Smimite Date: Sun, 1 Dec 2024 13:26:55 +0100 Subject: [PATCH 15/15] fixup --- cli/clica.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/clica.py b/cli/clica.py index 5a930ea31..1b72564bf 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -299,7 +299,7 @@ def import_risk_assessment(file, folder, project, name, matrix, create_all): proba_map.get(scenario.residual_proba), ]: print("Matrix doesn't match the labels used on your input file") - sys.exit(1) + if scenario.current_impact != "--": data.update({"current_impact": impact_map.get(scenario.current_impact)}) if scenario.current_proba != "--":