From 0f8da2246402671f44424e7f91a9555bb3c43eb3 Mon Sep 17 00:00:00 2001 From: Khushi-Sardhara Date: Thu, 29 Jun 2023 11:32:51 +0530 Subject: [PATCH 1/2] refactor: adding lint fix for group helper --- Access/group_helper.py | 368 +++++++++++++++++++++++++++++------------ 1 file changed, 261 insertions(+), 107 deletions(-) diff --git a/Access/group_helper.py b/Access/group_helper.py index c5222af4..edfcae3f 100644 --- a/Access/group_helper.py +++ b/Access/group_helper.py @@ -1,14 +1,20 @@ -from Access.models import GroupAccessMapping, User, GroupV2, MembershipV2, AccessV2 -from Access import helpers, views_helper, notifications, accessrequest_helper -from django.db import transaction +""" This file contains helper functions for group request """ + import datetime import logging +import json +from django.db import transaction +from Access.models import GroupAccessMapping, User, GroupV2, MembershipV2, AccessV2 +from Access import ( + helpers, + views_helper, + notifications, + accessrequest_helper, + background_task_manager, +) from Access.views_helper import execute_group_access -from enigma_automation.settings import MAIL_APPROVER_GROUPS, PERMISSION_CONSTANTS +from enigma_automation.settings import MAIL_APPROVER_GROUPS from . import helpers as helper -from Access.background_task_manager import revoke_request -import json - from .helpers import get_available_access_type logger = logging.getLogger(__name__) @@ -67,6 +73,8 @@ class GroupAccessExistsException(Exception): + """An exception that can be raised when a group access already exists.""" + def __init__(self): self.message = "Group Access Exists" super().__init__(self.message) @@ -110,6 +118,7 @@ def __init__(self): def create_group(request): + """It takes a request object as an argument and handle the creation of a group""" base_datetime_prefix = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S") try: data = request.POST @@ -122,8 +131,8 @@ def create_group(request): ) if data.getlist("selectedUserList[]"): group_members = data.getlist("selectedUserList[]") - except Exception as e: - logger.exception(e) + except Exception as exc: + logger.exception(exc) logger.error("Error in Create New Group request.") context = {} context["error"] = { @@ -183,6 +192,7 @@ def create_group(request): def get_generic_access(group_mapping): + """group_mapping (dictionary): A dictionary that maps groups to specific access permissions""" access_details = {} access_module = helpers.get_available_access_module_from_tag( group_mapping.access.access_tag @@ -196,6 +206,11 @@ def get_generic_access(group_mapping): def get_group_access_list(auth_user, group_name): + """ + auth_user: The authenticated user who is trying to access the group access list. + + group_name: The name of the group for which we want to retrieve the access list. + """ context = {} group = GroupV2.get_active_group_by_name(group_name) @@ -253,6 +268,9 @@ def get_group_access_list(auth_user, group_name): def get_role_based_on_membership(group_detail): + """ + group_detail: information about a group, including the membership status of a user. + """ for user in group_detail["userList"]: if user["is_owner"]: user["role"] = "Owner" @@ -262,6 +280,9 @@ def get_role_based_on_membership(group_detail): def update_owners(request, group_name): + """ + group_name (string): represents the name of a group. + """ context = {} group = GroupV2.get_active_group_by_name(group_name) if not group: @@ -295,8 +316,6 @@ def update_owners(request, group_name): group_members = group.get_all_approved_members().exclude(user=auth_user.user) - # we will only get data["owners"] as owners who are checked in UI - # (exluding disabled checkbox owner who requested the change) with transaction.atomic(): for membership in group_members: if membership.user.email in data["owners"]: @@ -317,6 +336,9 @@ def update_owners(request, group_name): def getGroupMembers(groupMembers): + """ + groupMembers: List containing details of the members of a group. + """ return [ { "name": member.user.name, @@ -330,15 +352,23 @@ def getGroupMembers(groupMembers): def approve_new_group_request(auth_user, group_id): + """ + auth_user: The authenticated user who is approving the new group request. + + group_id: A unique identifier for a group that a user is requesting + to join. + """ try: group = GroupV2.get_pending_group(group_id=group_id) - except Exception as e: - logger.error("Error in approveNewGroup request, Unable to fetch group" + str(e)) + except Exception as exc: + logger.error( + "Error in approveNewGroup request, Unable to fetch group "+ str(exc) + ) context = {} context["error"] = INTERNAL_ERROR_MESSAGE["msg"] return context if not group: - logger.error("No pending group found, Unable to fetch group" + str(e)) + logger.error("No pending group found, Unable to fetch group") context = {} context["error"] = REQUEST_NOT_FOUND_ERROR return context @@ -348,49 +378,51 @@ def approve_new_group_request(auth_user, group_id): context = {} context["error"] = SELF_APPROVAL_ERROR return context - else: - context = {} - context["msg"] = REQUEST_PROCESSING.format(requestId=group_id) - with transaction.atomic(): - group.approve(approved_by=auth_user.user) - group.approve_all_pending_users(approved_by=auth_user.user) - initial_members = group.get_all_members() - initial_member_names = [user.user.name for user in initial_members] - try: - notifications.send_new_group_approved_notification( - group=group, - group_id=group_id, - initial_member_names=initial_member_names, - ) - except Exception as e: - logger.exception(e) - logger.error( - "Group approved, but Error in sending group approval notification" - ) + context = {} + context["msg"] = REQUEST_PROCESSING.format(requestId=group_id) + with transaction.atomic(): + group.approve(approved_by=auth_user.user) + group.approve_all_pending_users(approved_by=auth_user.user) + initial_members = group.get_all_members() + initial_member_names = [user.user.name for user in initial_members] + try: + notifications.send_new_group_approved_notification( + group=group, + group_id=group_id, + initial_member_names=initial_member_names, + ) + except Exception as exc: + logger.exception(exc) + logger.error( + "Group approved, but Error in sending group approval notification" + ) + logger.debug( + "Approved group creation for - " + + group_id + + " - Approver=" + + auth_user.username + ) + if initial_members: logger.debug( - "Approved group creation for - " + "Members added to group - " + group_id - + " - Approver=" - + auth_user.username + + " =" + + ", ".join(initial_member_names) ) - if initial_members: - logger.debug( - "Members added to group - " - + group_id - + " =" - + ", ".join(initial_member_names) - ) - return context - except Exception as e: - logger.exception(e) + return context + except Exception as exc: + logger.exception(exc) logger.error("Error in Approving New Group request.") context = {} - context["error"] = GROUP_APPROVAL_ERROR + str(e) + context["error"] = GROUP_APPROVAL_ERROR + str(exc) return context def get_user_group(request, group_name): + """ + group_name (string) : represents the name of the user group. + """ try: context = {} group = GroupV2.get_approved_group_by_name(group_name=group_name) @@ -416,22 +448,28 @@ def get_user_group(request, group_name): context["groupMembers"] = group_members context["groupName"] = group_name return context - except Exception as e: - logger.exception(e) + except Exception as exc: + logger.exception(exc) logger.error("Error in Add New User to Group request.") context = {} context["error"] = { "error_msg": ERROR_LOADING_PAGE["error_msg"], - "msg": ERROR_LOADING_PAGE["msg"].format(exception=str(e)), + "msg": ERROR_LOADING_PAGE["msg"].format(exception=str(exc)), } return context def get_users_from_groupmembers(group_members): + """ + group_members: list or a collection of users who belong to a particular group. + """ return [member.user for member in group_members] def add_user_to_group(request): + """ + The function adds a user to a group. + """ try: base_datetime_prefix = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S") data = request.POST @@ -499,11 +537,11 @@ def add_user_to_group(request): + request.user.username ) users_added[user.email] = membership_id - except Exception as e: + except Exception as exc: logger.debug( "Error adding User: %s could not be added to the group, Exception: %s ", user.email, - str(e), + str(exc), ) user_not_added.append(user.email) if group.needsAccessApprove: @@ -543,8 +581,8 @@ def add_user_to_group(request): } return context - except Exception as e: - logger.exception(e) + except Exception as exc: + logger.exception(exc) logger.error("Error in Add New User to Group request.") context = {} context["error"] = { @@ -555,6 +593,9 @@ def add_user_to_group(request): def _check_if_members_in_group(group, selected_members): + """ + The function checks if selected members are in a given group. + """ group_members_email = group.get_approved_and_pending_member_emails() duplicate_request_emails = set(selected_members).intersection( set(group_members_email) @@ -562,27 +603,36 @@ def _check_if_members_in_group(group, selected_members): return duplicate_request_emails -def _check_if_members_in_group(group, selected_members): - group_members_email = group.get_approved_and_pending_member_emails() - duplicate_request_emails = set(selected_members).intersection( - set(group_members_email) - ) - return duplicate_request_emails +def is_user_in_group(user_email, group_members_email): + """ + The function checks if a given user email is a member of a group of email addresses. + user_email: The email address of the user we are checking for membership in a group. -def is_user_in_group(user_email, group_members_email): + group_members_email: A list of email addresses representing the members of a group. + """ return user_email in group_members_email -def accept_member(auth_user, requestId, shouldRender=True): +def accept_member(auth_user, requestId): + """ + The function accepts a member with a given request ID and renders the result if specified. + + auth_user: represents the authenticated user who is trying to accept a + membership request. + + requestId: identifier for a specific request that is + being processed or acted upon within the function. + + """ try: membership = MembershipV2.get_membership(membership_id=requestId) if not membership: logger.error("Error request not found OR Invalid request type") context = {} - context["error"] = REQUEST_NOT_FOUND_ERROR + str(e) + context["error"] = REQUEST_NOT_FOUND_ERROR return context - except Exception as e: + except Exception: context["error"] = { "error_msg": INTERNAL_ERROR_MESSAGE["error_msg"], "msg": INTERNAL_ERROR_MESSAGE["msg"], @@ -626,15 +676,23 @@ def accept_member(auth_user, requestId, shouldRender=True): + auth_user.username ) return context - except Exception as e: - logger.exception(e) + except Exception as exc: + logger.exception(exc) logger.error("Error in Accept of New Member in Group request.") context = {} - context["error"] = APPROVAL_ERROR + str(e) + context["error"] = APPROVAL_ERROR + str(exc) return context def get_group_access(form_data, auth_user): + """ + Takes in form data and an authenticated user and returns group + access information. + + form_data: contains the data collected from the form. + + auth_user: user object that represents the currently authenticated user. + """ data = dict(form_data.lists()) group_name = data["groupName"][0] context = {} @@ -674,6 +732,11 @@ def get_group_access(form_data, auth_user): def save_group_access_request(form_data, auth_user): + """ + form_data: contains the data collected from the form. + + auth_user: user object that represents the currently authenticated user. + """ json_response = _validate_group_access_request(form_data, auth_user) if json_response: return json_response @@ -710,8 +773,8 @@ def save_group_access_request(form_data, auth_user): + datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S") ) with transaction.atomic(): - for labelIndex, access_label in enumerate(module_access_labels): - request_id = request_id + "_" + str(labelIndex) + for label_index, access_label in enumerate(module_access_labels): + request_id = request_id + "_" + str(label_index) try: group_access_create_error = _create_group_access_mapping( group=group, @@ -725,26 +788,35 @@ def save_group_access_request(form_data, auth_user): error_msg = "Duplicate request found" + json.dumps(access_label) logger.info(f"{error_msg}") json_response["status"] = { - "title": GROUP_REQUEST_SUCCESS_MSG["title"].format( - access_tag=access_tag - ), - "msg": GROUP_REQUEST_SUCCESS_MSG["msg"] + "title": GROUP_REQUEST_SUCCESS_MSG["title"].format(access_tag=access_tag), + "msg": GROUP_REQUEST_SUCCESS_MSG["msg"], } - # email_destination = access_module.get_approvers() - # member_list = group.get_all_approved_members() - # notifications.send_group_access_add_email( - # destination=email_destination, - # group_name=group_name, - # requester=auth_user.user.email, - # request_id=request_id, - # member_list=member_list, - # ) + return json_response def _create_group_access_mapping( group, user, request_id, access_tag, access_label, access_reason ): + """ + This function creates a mapping between a group, user, request ID, access tag, access label, and + access reason. + + group: The group for which the access mapping is being created. + + user: The user for whom the group access mapping is being created. + + request_id: The unique identifier for the access request being created. + + access_tag: Access tag is a string that represents the type of access being granted to the + user for the group. + + access_label: Access label refers to the name or label given to a specific type of access or + permission granted to a user or group. + + access_reason: The access_reason parameter is likely a string that describes the reason or + justification for granting the user access to the specified group. + """ access = AccessV2.get(access_tag=access_tag, access_label=access_label) if not access: access = AccessV2.create(access_tag=access_tag, access_label=access_label) @@ -760,49 +832,88 @@ def _create_group_access_mapping( def _validate_group_access_request(form_data, auth_user): + """ + The function validates a group access request using form data and the authenticated user. + + form_data: The form data is a dictionary that contains the information submitted by the user + in a form. + + auth_user: The authenticated user object, which contains information about the user making + the request + """ json_response = {} if not form_data: json_response["error"] = { "error_msg": GROUP_REQUEST_ERR_MSG["error_msg"], - "msg": GROUP_REQUEST_ERR_MSG["msg"] + "msg": GROUP_REQUEST_ERR_MSG["msg"], } - logger.debug("Tried a direct Access to groupAccessRequest by-" + auth_user.username) + logger.debug( + "Tried a direct Access to groupAccessRequest by-" + auth_user.username + ) return json_response - if not form_data.get("groupName") or not form_data.get("access_tag") or not form_data.get("accessReason"): + if ( + not form_data.get("groupName") + or not form_data.get("access_tag") + or not form_data.get("accessReason") + ): json_response["error"] = { "error_msg": GROUP_REQUEST_EMPTY_FORM_ERR_MSG["error_msg"], - "msg": GROUP_REQUEST_EMPTY_FORM_ERR_MSG["msg"] + "msg": GROUP_REQUEST_EMPTY_FORM_ERR_MSG["msg"], } return json_response def validate_group_access_create_request(group, auth_user): + """ + group: used to determine whether the authenticated user has permission to + create a new access request for that group. + + auth_user: used to check if the user has the necessary permissions to create a group access + """ json_response = {} if not group: json_response["error"] = { "error_msg": GROUP_REQUEST_NO_GROUP_ERR_MSG["error_msg"], - "msg": GROUP_REQUEST_NO_GROUP_ERR_MSG["msg"] + "msg": GROUP_REQUEST_NO_GROUP_ERR_MSG["msg"], } return json_response if not auth_user.user.is_allowed_admin_actions_on_group(group): json_response["error"] = { "error_msg": NON_OWNER_PERMISSION_DENIED_ERROR["error_msg"], - "msg": NON_OWNER_PERMISSION_DENIED_ERROR["msg"] + "msg": NON_OWNER_PERMISSION_DENIED_ERROR["msg"], } return json_response def revoke_user_access(user, access, revoker, decline_message): + """ + Revokes access of the user. + + user: The user whose access needs to be revoked. + access: The access parameter refers to the type of access that the user has, which is being + revoked. + + revoker: The revoker parameter refers to the user who is revoking the access of another user. + + decline_message: The decline_message parameter is a string that represents the reason for + revoking the user's access. + """ user_identity = user.get_active_identity(access.access_tag) user_identity.decline_non_approved_access_mapping(access, decline_message) access_mapping = user_identity.get_granted_access_mapping(access).first() if not access_mapping: return False - revoke_request(access_mapping, revoker) + background_task_manager.revoke_request(access_mapping, revoker) + def remove_member(request, auth_user): + """ + This function takes in a request and an authenticated user and removes the user from a group. + + auth_user: authenticated user who is making the request to remove a member. + """ try: membership_id = request.POST.get("membershipId") if not membership_id: @@ -842,26 +953,41 @@ def remove_member(request, auth_user): with transaction.atomic(): for access in accesses: - revoke_user_access(user, access, request.user.user, "User removed from the group") + revoke_user_access( + user, access, request.user.user, "User removed from the group" + ) membership.revoke_membership() return {"message": "Successfully removed user from group"} -def access_exist_in_other_groups_of_user(membership, group, access): - other_memberships = ( - membership.user.get_all_approved_memberships() - .exclude(group=membership.group) +def access_exist_in_other_groups_of_user(membership, access): + """ + The function checks if a user has a specific access in other groups they belong to. + + membership: It is a list of dictionaries containing information about the user's group + memberships. + + group: Identifier representing a specific group within a larger system. + + access: The access level that we want to check for existence in other groups of the user. + """ + other_memberships = membership.user.get_all_approved_memberships().exclude( + group=membership.group ) - for membership in other_memberships: - if membership.group.check_access_exist(access): + for member_ship in other_memberships: + if member_ship.group.check_access_exist(access): return True return False def revoke_access_from_group(request): + """ + The function "revoke_access_from_group" takes a request object as input and revokes access + from a group in a system. + """ try: request_id = request.POST.get("request_id") if not request_id: @@ -872,13 +998,16 @@ def revoke_access_from_group(request): if not mapping: logger.debug("Group Access Mapping not found in the database") return {"error": GROUP_ACCESS_MAPPING_NOT_FOUND} - except Exception as e: - logger.exception(str(e)) + except Exception as exc: + logger.exception(str(exc)) return {"error": ERROR_MESSAGE} group = mapping.group auth_user = request.user - if not (auth_user.user.has_permission("ALLOW_USER_OFFBOARD") or group.member_is_owner(auth_user.user)): + if not ( + auth_user.user.has_permission("ALLOW_USER_OFFBOARD") + or group.member_is_owner(auth_user.user) + ): return {"error": USER_UNAUTHORIZED_MESSAGE} revoke_access_memberships = [] @@ -889,7 +1018,12 @@ def revoke_access_from_group(request): with transaction.atomic(): for membership in revoke_access_memberships: - revoke_user_access(membership.user, mapping.access, auth_user.user, "Access revoked for the group") + revoke_user_access( + membership.user, + mapping.access, + auth_user.user, + "Access revoked for the group", + ) mapping.mark_revoked(auth_user.user) @@ -897,6 +1031,11 @@ def revoke_access_from_group(request): def get_selected_users_by_email(user_emails): + """ + Takes a list of user emails as input and returns selected users based on those emails. + + user_emails: A list of email addresses of users + """ selected_users = User.get_users_by_emails(emails=user_emails) selected_users_email = {user.email: user for user in selected_users} not_found_emails = [ @@ -904,17 +1043,16 @@ def get_selected_users_by_email(user_emails): ] if len(not_found_emails) == 1: - raise User.DoesNotExist( - "User with email {} does not exist".format(not_found_emails) - ) - elif len(not_found_emails) > 1: - raise User.DoesNotExist( - "Users with email {} are not found".format(not_found_emails) - ) + raise User.DoesNotExist(f"User with email {not_found_emails} does not exist") + if len(not_found_emails) > 1: + raise User.DoesNotExist(f"User with email {not_found_emails} are not found") return selected_users def get_group_status_list(selected_list): + """ + selected_list: The selected_list parameter is a list of groups. + """ status_list = [] for status in MembershipV2.STATUS: if status[0] not in selected_list: @@ -924,6 +1062,9 @@ def get_group_status_list(selected_list): def get_group_member_access_type(selected_list): + """ + selected_list: The selected_list parameter is a list of groups. + """ access_type = [] all_types = get_available_access_type() for type in all_types: @@ -934,6 +1075,9 @@ def get_group_member_access_type(selected_list): def get_user_states(selected_list): + """ + selected_list: A list of states selected by the user. + """ user_state = [] for state in User.USER_STATUS_CHOICES: current_state = state[1].capitalize() @@ -944,6 +1088,9 @@ def get_user_states(selected_list): def get_user_current_state(): + """ + The function is used to retrieve the current state of the user. + """ current_state = [] for state in User.USER_STATUS_CHOICES: current_state.append(state[1].capitalize()) @@ -952,6 +1099,10 @@ def get_user_current_state(): def get_access_types(group_mappings): + """ + group_mappings: group_mappings is a dictionary that maps group names to a list of access + types. + """ status_list = [] for group_mapping in group_mappings: @@ -966,6 +1117,9 @@ def get_access_types(group_mappings): def get_group_member_role_list(selected_list): + """ + selected_list: A list of group members that have been selected + """ roles = ["Member", "Owner"] role_list = [] for role in roles: From 026ccfd1370131bc798c1776d2d735b6eca14d71 Mon Sep 17 00:00:00 2001 From: Khushi-Sardhara Date: Thu, 29 Jun 2023 12:02:20 +0530 Subject: [PATCH 2/2] refactor: adding lint fix for group helper --- Access/group_helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Access/group_helper.py b/Access/group_helper.py index edfcae3f..627e4cf6 100644 --- a/Access/group_helper.py +++ b/Access/group_helper.py @@ -614,7 +614,7 @@ def is_user_in_group(user_email, group_members_email): return user_email in group_members_email -def accept_member(auth_user, requestId): +def accept_member(auth_user, requestId, shouldRender=True): """ The function accepts a member with a given request ID and renders the result if specified.