diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ciscosecurefirewall_connector.py b/ciscosecurefirewall_connector.py index 13d9d03..9379c49 100644 --- a/ciscosecurefirewall_connector.py +++ b/ciscosecurefirewall_connector.py @@ -168,7 +168,7 @@ def _get_token(self, action_result): self.refresh_count += 1 self.headers[REFRESH_TOKEN_KEY] = self._state[REFRESH_TOKEN_KEY] self.headers[TOKEN_KEY] = self._state[TOKEN_KEY] - ret_val, headers = self.__make_rest_call("post", REFRESH_ENDPOINT, action_result, headers_only=True, first_try=False) + ret_val, headers = self._make_rest_call("post", REFRESH_ENDPOINT, action_result, headers_only=True, first_try=False) if not phantom.is_fail(ret_val): self.token = headers.get(TOKEN_KEY) self.headers[TOKEN_KEY] = self.token @@ -180,7 +180,7 @@ def _get_token(self, action_result): self.debug_print("Fetching a new token") self.headers.pop(REFRESH_TOKEN_KEY, None) auth = requests.auth.HTTPBasicAuth(self.username, self.password) - ret_val, headers = self.__make_rest_call("post", TOKEN_ENDPOINT, action_result, headers_only=True, first_try=True, auth=auth) + ret_val, headers = self._make_rest_call("post", TOKEN_ENDPOINT, action_result, headers_only=True, first_try=True, auth=auth) if phantom.is_fail(ret_val): self.debug_print(f"Error {ret_val} while generating token with response {headers}") self._reset_state_file() @@ -281,7 +281,7 @@ def _process_response(self, r, action_result): return RetVal(action_result.set_status(phantom.APP_ERROR, msg), None) - def __make_rest_call(self, method, resource, action_result, json_body=None, headers_only=False, first_try=True, params=None, auth=None): + def _make_rest_call(self, method, resource, action_result, json_body=None, headers_only=False, first_try=True, params=None, auth=None): """ This method makes a REST call to the API """ @@ -306,7 +306,7 @@ def __make_rest_call(self, method, resource, action_result, json_body=None, head return action_result.get_status(), None self.debug_print(f"Running url that failed because of token error {resource}") - return self.__make_rest_call(method, resource, action_result, json_body, headers_only, first_try=False) + return self._make_rest_call(method, resource, action_result, json_body, headers_only, first_try=False) message = "Error from server. Status Code: {0} Data from server: {1}".format( result.status_code, result.text.replace("{", "{{").replace("}", "}}") @@ -333,7 +333,7 @@ def _handle_test_connectivity(self, param: Dict[str, Any]) -> bool: self.save_progress("Testing connectivity") url = GET_HOSTS_ENDPOINT.format(domain_id="default") - ret_val, _ = self.__make_rest_call("get", url, action_result) + ret_val, _ = self._make_rest_call("get", url, action_result) if phantom.is_fail(ret_val): self.save_progress("Connectivity test failed") return action_result.get_status() @@ -349,7 +349,7 @@ def get_network_objects_of_type(self, object_type, domain_uuid, action_result, n params = {"limit": limit} while True: params["offset"] = offset - ret_val, response = self.__make_rest_call("get", url, action_result, params=params) + ret_val, response = self._make_rest_call("get", url, action_result, params=params) if phantom.is_fail(ret_val): return action_result.get_status() @@ -399,7 +399,7 @@ def _handle_list_network_objects(self, param: Dict[str, Any]) -> bool: def get_network_object(self, domain_id: int, object_id: int) -> Tuple[bool, Dict[str, Any]]: url = NETWORK_OBJECT_ID_ENDPOINT.format(domain_id=domain_id, type="networks", object_id=object_id) - ret_val, response = self.__make_rest_call("get", url, self) + ret_val, response = self._make_rest_call("get", url, self) return ret_val, response def _handle_create_network_object(self, param: Dict[str, Any]) -> bool: @@ -413,7 +413,7 @@ def _handle_create_network_object(self, param: Dict[str, Any]) -> bool: domain_uuid = self.get_domain_id(param.get("domain_name")) url = NETWORK_OBJECTS_ENDPOINT.format(domain_id=domain_uuid, type=object_type.lower() + "s") - ret_val, response = self.__make_rest_call("post", url, action_result, json_body=payload) + ret_val, response = self._make_rest_call("post", url, action_result, json_body=payload) if phantom.is_fail(ret_val): return action_result.get_status() @@ -439,7 +439,7 @@ def _handle_update_network_object(self, param: Dict[str, Any]) -> bool: url = NETWORK_OBJECT_ID_ENDPOINT.format(domain_id=domain_uuid, type=object_type.lower() + "s", object_id=object_id) - ret_val, response = self.__make_rest_call("put", url, action_result, json_body=payload) + ret_val, response = self._make_rest_call("put", url, action_result, json_body=payload) if phantom.is_fail(ret_val): return action_result.get_status() @@ -457,7 +457,7 @@ def _handle_delete_network_object(self, param: Dict[str, Any]) -> bool: domain_uuid = self.get_domain_id(param.get("domain_name")) url = NETWORK_OBJECT_ID_ENDPOINT.format(domain_id=domain_uuid, type=object_type.lower() + "s", object_id=object_id) - ret_val, response = self.__make_rest_call("delete", url, action_result) + ret_val, response = self._make_rest_call("delete", url, action_result) if phantom.is_fail(ret_val): return action_result.get_status() @@ -493,7 +493,7 @@ def _handle_get_network_groups(self, param: Dict[str, Any]) -> bool: params = {"limit": limit, "expanded": True} while True: params["offset"] = offset - ret_val, response = self.__make_rest_call("get", url, action_result, params=params) + ret_val, response = self._make_rest_call("get", url, action_result, params=params) if phantom.is_fail(ret_val): return action_result.get_status() @@ -533,7 +533,7 @@ def _handle_create_network_group(self, param: Dict[str, Any]) -> bool: url = NETWORK_GROUPS_ENDPOINT.format(domain_id=domain_uuid) - ret_val, response = self.__make_rest_call("post", url, action_result, json_body=payload) + ret_val, response = self._make_rest_call("post", url, action_result, json_body=payload) if phantom.is_fail(ret_val): return action_result.get_status() @@ -544,7 +544,7 @@ def _handle_create_network_group(self, param: Dict[str, Any]) -> bool: def get_network_group(self, domain_uuid, group_id): url = NETWORK_GROUPS_ID_ENDPOINT.format(domain_id=domain_uuid, group_id=group_id) - ret_val, response = self.__make_rest_call("get", url, self) + ret_val, response = self._make_rest_call("get", url, self) return ret_val, response def _handle_update_network_group(self, param: Dict[str, Any]) -> bool: @@ -573,7 +573,7 @@ def _handle_update_network_group(self, param: Dict[str, Any]) -> bool: resp["objects"] = objects update_url = NETWORK_GROUPS_ID_ENDPOINT.format(domain_id=domain_uuid, group_id=group_id) - ret_val, response = self.__make_rest_call("put", update_url, action_result, json_body=resp) + ret_val, response = self._make_rest_call("put", update_url, action_result, json_body=resp) if phantom.is_fail(ret_val): return action_result.get_status() @@ -591,7 +591,7 @@ def _handle_delete_network_group(self, param): domain_uuid = self.get_domain_id(param.get("domain_name")) update_url = NETWORK_GROUPS_ID_ENDPOINT.format(domain_id=domain_uuid, group_id=group_id) - ret_val, response = self.__make_rest_call("delete", update_url, action_result) + ret_val, response = self._make_rest_call("delete", update_url, action_result) if phantom.is_fail(ret_val): return action_result.get_status() @@ -612,7 +612,7 @@ def _handle_get_access_policies(self, param): params = {"limit": limit} while True: params["offset"] = offset - ret_val, response = self.__make_rest_call("get", url, action_result, params=params) + ret_val, response = self._make_rest_call("get", url, action_result, params=params) if phantom.is_fail(ret_val): return action_result.get_status() @@ -647,7 +647,7 @@ def _handle_create_access_policy(self, param: Dict[str, Any]) -> bool: payload["description"] = param["description"] url = ACCESS_POLICY_ENDPOINT.format(domain_id=domain_uuid) - ret_val, response = self.__make_rest_call("post", url, action_result, json_body=payload) + ret_val, response = self._make_rest_call("post", url, action_result, json_body=payload) if phantom.is_fail(ret_val): return action_result.get_status() @@ -658,7 +658,7 @@ def _handle_create_access_policy(self, param: Dict[str, Any]) -> bool: def get_access_policy(self, domain_uuid, policy_id): url = ACCESS_POLICY_ID_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id) - ret_val, response = self.__make_rest_call("get", url, self) + ret_val, response = self._make_rest_call("get", url, self) return ret_val, response def _handle_update_access_policy(self, param): @@ -692,7 +692,7 @@ def _handle_update_access_policy(self, param): url = ACCESS_POLICY_ID_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id) print(f"payload is {payload}") - ret_val, response = self.__make_rest_call("put", url, action_result, json_body=payload) + ret_val, response = self._make_rest_call("put", url, action_result, json_body=payload) if phantom.is_fail(ret_val): return action_result.get_status() print(f"updated policy with {response}") @@ -710,7 +710,7 @@ def _handle_delete_access_policy(self, param: Dict[str, Any]) -> bool: policy_id = param["policy_id"] url = ACCESS_POLICY_ID_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id) - ret_val, response = self.__make_rest_call("delete", url, action_result) + ret_val, response = self._make_rest_call("delete", url, action_result) if phantom.is_fail(ret_val): return action_result.get_status() @@ -744,7 +744,7 @@ def _handle_get_access_rules(self, param: Dict[str, Any]) -> bool: params = {"limit": limit} while True: params["offset"] = offset - ret_val, response = self.__make_rest_call("get", url, action_result, params=params) + ret_val, response = self._make_rest_call("get", url, action_result, params=params) if phantom.is_fail(ret_val): return action_result.get_status() @@ -815,7 +815,7 @@ def _handle_create_access_rules(self, param: Dict[str, Any]) -> bool: rule_payload["destinationNetworks"]["objects"] = destination_networks_objects url = ACCESS_RULES_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id) - ret_val, response = self.__make_rest_call("post", url, action_result, json_body=rule_payload) + ret_val, response = self._make_rest_call("post", url, action_result, json_body=rule_payload) if phantom.is_fail(ret_val): return action_result.get_status() @@ -826,7 +826,7 @@ def _handle_create_access_rules(self, param: Dict[str, Any]) -> bool: def get_access_control_rule(self, domain_id: str, policy_id: str, rule_id: str) -> Tuple[bool, Dict[str, Any]]: url = ACCESS_RULES_ID_ENDPOINT.format(domain_id=domain_id, policy_id=policy_id, rule_id=rule_id) - ret_val, response = self.__make_rest_call("get", url, self) + ret_val, response = self._make_rest_call("get", url, self) return ret_val, response def _handle_update_access_rule(self, param: Dict[str, Any]) -> bool: @@ -889,7 +889,7 @@ def _handle_update_access_rule(self, param: Dict[str, Any]) -> bool: rule_payload["destinationNetworks"] = {"objects": filtered_destination_networks} url = ACCESS_RULES_ID_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id, rule_id=rule_id) - ret_val, response = self.__make_rest_call("put", url, action_result, json_body=rule_payload) + ret_val, response = self._make_rest_call("put", url, action_result, json_body=rule_payload) if phantom.is_fail(ret_val): return action_result.get_status() @@ -909,7 +909,7 @@ def _handle_delete_access_rule(self, param: Dict[str, Any]) -> bool: policy_id = param["policy_id"] url = ACCESS_RULES_ID_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id, rule_id=rule_id) - ret_val, response = self.__make_rest_call("delete", url, action_result) + ret_val, response = self._make_rest_call("delete", url, action_result) if phantom.is_fail(ret_val): return action_result.get_status() @@ -931,7 +931,7 @@ def _handle_list_devices(self, param: Dict[str, Any]) -> bool: params = {"limit": limit} while True: params["offset"] = offset - ret_val, response = self.__make_rest_call("get", url, action_result, params=params) + ret_val, response = self._make_rest_call("get", url, action_result, params=params) if phantom.is_fail(ret_val): return action_result.get_status() @@ -962,7 +962,7 @@ def get_deployable_devices(self, domain_id: str) -> Tuple[bool, Any]: params = {"limit": limit, "expanded": True} while True: params["offset"] = offset - ret_val, response = self.__make_rest_call("get", url, self, params=params) + ret_val, response = self._make_rest_call("get", url, self, params=params) if phantom.is_fail(ret_val): return phantom.APP_ERROR, [] @@ -1022,7 +1022,7 @@ def _handle_deploy_devices(self, param: Dict[str, Any]) -> bool: url = DEPLOY_DEVICES_ENDPOINT.format(domain_id=domain_uuid) body = {"type": "DeploymentRequest", "version": "0", "forceDeploy": True, "ignoreWarning": True, "deviceList": devices_to_deploy} - ret_val, response = self.__make_rest_call("post", url, action_result, body) + ret_val, response = self._make_rest_call("post", url, action_result, body) if phantom.is_fail(ret_val): return action_result.get_status() @@ -1041,7 +1041,7 @@ def _handle_get_deployment_status(self, param: Dict[str, Any]) -> bool: deployment_id = param["deployment_id"] url = DEPLOYMENT_STATUS_ENDPOINT.format(domain_id=domain_uuid, task_id=deployment_id) - ret_val, response = self.__make_rest_call("get", url, action_result) + ret_val, response = self._make_rest_call("get", url, action_result) if phantom.is_fail(ret_val): return action_result.get_status()