diff --git a/backend/core/views.py b/backend/core/views.py index 0f243c4fc..e84d2254a 100644 --- a/backend/core/views.py +++ b/backend/core/views.py @@ -298,6 +298,22 @@ def quality_check_detail(self, request, pk): else: return Response(status=HTTP_403_FORBIDDEN) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=Project, + ) + for item in Project.objects.filter(id__in=viewable_items): + if my_map.get(item.folder.name) is None: + my_map[item.folder.name] = {} + my_map[item.folder.name].update({item.name: item.id}) + + return Response(my_map) + class ThreatViewSet(BaseModelViewSet): """ @@ -318,6 +334,21 @@ def retrieve(self, request, *args, **kwargs): def threats_count(self, request): return Response({"results": threats_count_per_name(request.user)}) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=Threat, + ) + for item in Threat.objects.filter(id__in=viewable_items): + if my_map.get(item.folder.name) is None: + my_map[item.folder.name] = {} + my_map[item.folder.name].update({item.name: item.id}) + return Response(my_map) + class AssetViewSet(BaseModelViewSet): """ @@ -395,6 +426,21 @@ def graph(self, request): {"nodes": nodes, "links": links, "categories": categories, "meta": meta} ) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=Asset, + ) + for item in Asset.objects.filter(id__in=viewable_items): + if my_map.get(item.folder.name) is None: + my_map[item.folder.name] = {} + my_map[item.folder.name].update({item.name: item.id}) + return Response(my_map) + @action(detail=False, name="Get security objectives") def security_objectives(self, request): return Response({"results": Asset.DEFAULT_SECURITY_OBJECTIVES}) @@ -459,6 +505,22 @@ def used(self, request): ) return Response({"results": used_matrices}) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=RiskMatrix, + ) + for item in RiskMatrix.objects.filter(id__in=viewable_items): + if my_map.get(item.folder.name) is None: + my_map[item.folder.name] = {} + my_map[item.folder.name].update({item.name: item.id}) + + return Response(my_map) + class VulnerabilityViewSet(BaseModelViewSet): """ @@ -649,30 +711,46 @@ def risk_assessment_csv(self, request, pk): writer = csv.writer(response, delimiter=";") columns = [ "ref_id", + "assets", "threats", "name", "description", "existing_controls", - "current_level", - "applied_controls", - "residual_level", + "current_impact", + "current_proba", + "current_risk", + "additional_controls", + "residual_impact", + "residual_proba", + "residual_risk", "treatment", ] writer.writerow(columns) - for scenario in risk_assessment.risk_scenarios.all().order_by("created_at"): - applied_controls = ",".join( - [m.csv_value for m in scenario.applied_controls.all()] + for scenario in risk_assessment.risk_scenarios.all().order_by("ref_id"): + additional_controls = ",".join( + [m.name for m in scenario.applied_controls.all()] + ) + existing_controls = ",".join( + [m.name for m in scenario.existing_applied_controls.all()] ) + threats = ",".join([t.name for t in scenario.threats.all()]) + assets = ",".join([t.name for t in scenario.assets.all()]) + row = [ scenario.ref_id, + assets, threats, scenario.name, scenario.description, - scenario.existing_controls, + existing_controls, + scenario.get_current_impact()["name"], + scenario.get_current_proba()["name"], scenario.get_current_risk()["name"], - applied_controls, + additional_controls, + scenario.get_residual_impact()["name"], + scenario.get_residual_proba()["name"], scenario.get_residual_risk()["name"], scenario.treatment, ] @@ -1070,6 +1148,22 @@ def get_timeline_info(self, request): colorMap[domain.name] = next(color_cycle) return Response({"entries": entries, "colorMap": colorMap}) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=AppliedControl, + ) + for item in AppliedControl.objects.filter(id__in=viewable_items): + if my_map.get(item.folder.name) is None: + my_map[item.folder.name] = {} + my_map[item.folder.name].update({item.name: item.id}) + + return Response(my_map) + class PolicyViewSet(AppliedControlViewSet): model = Policy @@ -1501,6 +1595,19 @@ def org_tree(self, request): return Response(tree) + @action(detail=False, methods=["get"]) + def ids(self, request): + my_map = dict() + + (viewable_items, _, _) = RoleAssignment.get_accessible_object_ids( + folder=Folder.get_root_folder(), + user=request.user, + object_type=Folder, + ) + for item in Folder.objects.filter(id__in=viewable_items): + my_map[item.name] = item.id + return Response(my_map) + @action(detail=False, methods=["get"]) def my_assignments(self, request): risk_assessments = RiskAssessment.objects.filter( diff --git a/cli/RA_sample.csv b/cli/RA_sample.csv new file mode 100644 index 000000000..904518c88 --- /dev/null +++ b/cli/RA_sample.csv @@ -0,0 +1,4 @@ +ref_id;assets;threats;name;description;existing_controls;current_impact;current_proba;current_risk;additional_controls;residual_impact;residual_proba;residual_risk;treatment +R.1;dsafa;Data Encrypted for Impact;Ransomware;;ISMS Scope document,Statement of Applicabilty document;Significant;Likely;Low;Risk management policy,Organization overview document,Main policy,Competency matrix;Critical;Very likely;High;open +R.2;;System Shutdown/Reboot;Unavailability;;Information security awareness and traning policy;Significant;Very likely;Medium;Management review plan document,Main policy,ISMS Scope document,Responsibility matrix;Important;Unlikely;Medium;open +R.3;dsasfad;Scheduled Task,Cloud Administration Command;Insider threats;;;Important;Likely;Medium;;--;--;--;open diff --git a/cli/README.md b/cli/README.md new file mode 100644 index 000000000..be0afcfeb --- /dev/null +++ b/cli/README.md @@ -0,0 +1,6 @@ + +## Examples + +```sh +./clica.py import-risk-assessment --file RA_sample.csv --folder "BU 1" --project "Orion" --matrix "4x4 risk matrix from EBIOS-RM" --name example +``` diff --git a/cli/clica.py b/cli/clica.py index 5ecbb1a89..1b72564bf 100755 --- a/cli/clica.py +++ b/cli/clica.py @@ -6,7 +6,10 @@ import pandas as pd import requests import yaml -from rich import print +import json +from rich import print as rprint + +from icecream import ic cli_cfg = dict() auth_data = dict() @@ -81,7 +84,6 @@ def init_config(): def check_auth(): if Path(".tmp.yaml").exists(): - click.echo("Found auth data. Trying them", err=True) with open(".tmp.yaml", "r") as yfile: auth_data = yaml.safe_load(yfile) return auth_data["token"] @@ -123,6 +125,21 @@ def auth(email, password): print(res.json()) +def ids_map(model, folder=None): + my_map = dict() + url = f"{API_URL}/{model}/ids/" + headers = {"Authorization": f"Token {TOKEN}"} + res = requests.get(url, headers=headers, verify=VERIFY_CERTIFICATE) + if res.status_code != 200: + print("something went wrong. check authentication.") + sys.exit(1) + if folder: + my_map = res.json().get(folder) + else: + my_map = res.json() + return my_map + + def _get_folders(): url = f"{API_URL}/folders/" headers = {"Authorization": f"Token {TOKEN}"} @@ -138,9 +155,183 @@ def _get_folders(): @click.command() def get_folders(): """Get folders.""" - GLOBAL_FOLDER_ID, res = _get_folders() - print("GLOBAL_FOLDER_ID: ", GLOBAL_FOLDER_ID) - print(res) + print(json.dumps(ids_map("folders"), ensure_ascii=False)) + + +@click.command() +def get_projects(): + """getting projects as a json""" + print(json.dumps(ids_map("projects"), ensure_ascii=False)) + + +@click.command() +def get_matrices(): + """getting loaded matrix as a json""" + print(json.dumps(ids_map("risk-matrices", folder="Global"), ensure_ascii=False)) + + +def get_unique_parsed_values(df, column_name): + unique_values = df[column_name].dropna().unique() + parsed_values = [] + + for value in unique_values: + value_str = str(value) + split_values = [v.strip() for v in value_str.split(",")] + parsed_values.extend(split_values) + + return set(parsed_values) + + +def batch_create(model, items, folder_id): + headers = { + "Authorization": f"Token {TOKEN}", + } + output = dict() + url = f"{API_URL}/{model}/" + for item in items: + data = { + "folder": folder_id, + "name": item, + } + res = requests.post(url, json=data, headers=headers) + if res.status_code != 201: + print("something went wrong") + print(res.json()) + else: + output.update({item: res.json()["id"]}) + return output + + +@click.command() +@click.option("--file", required=True, help="") +@click.option("--folder", required=True, help="") +@click.option("--project", required=True, help="") +@click.option("--matrix", required=True, help="") +@click.option("--name", required=True, help="") +@click.option( + "--create_all", + required=False, + is_flag=True, + default=True, + help="Create all associated objects (threats, assets)", +) +def import_risk_assessment(file, folder, project, name, matrix, create_all): + """crawl a risk assessment (see template) and create the assoicated objects""" + df = pd.read_csv(file, delimiter=";") + headers = { + "Authorization": f"Token {TOKEN}", + } + folder_id = ids_map("folders").get(folder) + project_id = ids_map("projects", folder=folder).get(project) + matrix_id = ids_map("risk-matrices", folder="Global").get(matrix) + + # post to create risk assessment + data = { + "name": name, + "folder": folder_id, + "project": project_id, + "risk_matrix": matrix_id, + } + res = requests.post( + f"{API_URL}/risk-assessments/", + json=data, + headers=headers, + verify=VERIFY_CERTIFICATE, + ) + ra_id = None + if res.status_code == 201: + ra_id = res.json().get("id") + print("ok") + else: + print("something went wrong.") + print(res.json()) + + if create_all: + threats = get_unique_parsed_values(df, "threats") + batch_create("threats", threats, folder_id) + assets = get_unique_parsed_values(df, "assets") + batch_create("assets", assets, folder_id) + existing_controls = get_unique_parsed_values(df, "existing_controls") + batch_create("applied-controls", existing_controls, folder_id) + additional_controls = get_unique_parsed_values(df, "additional_controls") + batch_create("applied-controls", additional_controls, folder_id) + + res = requests.get(f"{API_URL}/risk-matrices/{matrix_id}", headers=headers) + if res.status_code == 200: + matrix_def = res.json().get("json_definition") + matrix_def = json.loads(matrix_def) + # ic(matrix_def) + impact_map = dict() + proba_map = dict() + # this can be factored as one map probably + for item in matrix_def["impact"]: + impact_map[item["name"]] = item["id"] + if item.get("translations"): + langs = item.get("translations") + for lang in langs: + impact_map[langs[lang]["name"]] = item["id"] + for item in matrix_def["probability"]: + proba_map[item["name"]] = item["id"] + if item.get("translations"): + langs = item.get("translations") + for lang in langs: + proba_map[langs[lang]["name"]] = item["id"] + + ic(impact_map) + ic(proba_map) + + df = df.fillna("--") + + threats = ids_map("threats", folder) + assets = ids_map("assets", folder) + controls = ids_map("applied-controls", folder) + + for scenario in df.itertuples(): + data = { + "ref_id": scenario.ref_id, + "name": scenario.name, + "risk_assessment": ra_id, + } + if None in [ + impact_map.get(scenario.current_impact), + proba_map.get(scenario.current_proba), + impact_map.get(scenario.residual_impact), + proba_map.get(scenario.residual_proba), + ]: + print("Matrix doesn't match the labels used on your input file") + + if scenario.current_impact != "--": + data.update({"current_impact": impact_map.get(scenario.current_impact)}) + if scenario.current_proba != "--": + data.update({"current_proba": proba_map.get(scenario.current_proba)}) + + if scenario.residual_impact != "--": + data.update({"residual_impact": impact_map.get(scenario.residual_impact)}) + if scenario.residual_proba != "--": + data.update({"residual_proba": proba_map.get(scenario.residual_proba)}) + + if scenario.existing_controls != "--": + items = str(scenario.existing_controls).split(",") + data.update( + {"existing_applied_controls": [controls[item] for item in items]} + ) + + if scenario.additional_controls != "--": + items = str(scenario.additional_controls).split(",") + data.update({"applied_controls": [controls[item] for item in items]}) + + if scenario.assets != "--": + items = str(scenario.assets).split(",") + data.update({"assets": [assets[item] for item in items]}) + + if scenario.threats != "--": + items = str(scenario.threats).split(",") + data.update({"threats": [threats[item] for item in items]}) + + res = requests.post(f"{API_URL}/risk-scenarios/", json=data, headers=headers) + if res.status_code != 201: + rprint(res.json()) + rprint(data) @click.command() @@ -172,9 +363,9 @@ def import_assets(file): ) if res.status_code != 201: click.echo("❌ something went wrong", err=True) - print(res.json()) + rprint(res.json()) else: - print(f"✅ {name} created", file=sys.stderr) + rprint(f"✅ {name} created", file=sys.stderr) @click.command() @@ -208,9 +399,9 @@ def import_controls(file): ) if res.status_code != 201: click.echo("❌ something went wrong", err=True) - print(res.json()) + rprint(res.json()) else: - print(f"✅ {name} created", file=sys.stderr) + rprint(f"✅ {name} created", file=sys.stderr) @click.command() @@ -240,9 +431,9 @@ def import_evidences(file): ) if res.status_code != 201: click.echo("❌ something went wrong", err=True) - print(res.json()) + rprint(res.json()) else: - print(f"✅ {row['name']} created", file=sys.stderr) + rprint(f"✅ {row['name']} created", file=sys.stderr) @click.command() @@ -260,13 +451,13 @@ def upload_attachment(file, name): url, headers=headers, params={"name": name}, verify=VERIFY_CERTIFICATE ) data = res.json() - print(data) + rprint(data) if res.status_code != 200: - print(data) - print(f"Error: check credentials or filename.", file=sys.stderr) + rprint(data) + rprint(f"Error: check credentials or filename.", file=sys.stderr) return if not data["results"]: - print(f"Error: No evidence found with name '{name}'", file=sys.stderr) + rprint(f"Error: No evidence found with name '{name}'", file=sys.stderr) return evidence_id = data["results"][0]["id"] @@ -280,16 +471,19 @@ def upload_attachment(file, name): } with open(file, "rb") as f: res = requests.post(url, headers=headers, data=f, verify=VERIFY_CERTIFICATE) - print(res) - print(res.text) + rprint(res) + rprint(res.text) cli.add_command(get_folders) +cli.add_command(get_projects) cli.add_command(auth) cli.add_command(import_assets) cli.add_command(import_controls) cli.add_command(import_evidences) cli.add_command(init_config) cli.add_command(upload_attachment) +cli.add_command(import_risk_assessment) +cli.add_command(get_matrices) if __name__ == "__main__": cli() diff --git a/cli/requirements.txt b/cli/requirements.txt index a857296db..1dbed4d6a 100644 --- a/cli/requirements.txt +++ b/cli/requirements.txt @@ -3,3 +3,4 @@ rich requests click pyyaml +icecream