Skip to content

Commit

Permalink
PAPP-35185: adding init file
Browse files Browse the repository at this point in the history
  • Loading branch information
tapishj-splunk committed Jan 28, 2025
1 parent 69e4a12 commit aa732da
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 29 deletions.
Empty file added __init__.py
Empty file.
58 changes: 29 additions & 29 deletions ciscosecurefirewall_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def _get_token(self, action_result):
self.refresh_count += 1
self.headers[REFRESH_TOKEN_KEY] = self._state[REFRESH_TOKEN_KEY]
self.headers[TOKEN_KEY] = self._state[TOKEN_KEY]
ret_val, headers = self.__make_rest_call("post", REFRESH_ENDPOINT, action_result, headers_only=True, first_try=False)
ret_val, headers = self._make_rest_call("post", REFRESH_ENDPOINT, action_result, headers_only=True, first_try=False)
if not phantom.is_fail(ret_val):
self.token = headers.get(TOKEN_KEY)
self.headers[TOKEN_KEY] = self.token
Expand All @@ -180,7 +180,7 @@ def _get_token(self, action_result):
self.debug_print("Fetching a new token")
self.headers.pop(REFRESH_TOKEN_KEY, None)
auth = requests.auth.HTTPBasicAuth(self.username, self.password)
ret_val, headers = self.__make_rest_call("post", TOKEN_ENDPOINT, action_result, headers_only=True, first_try=True, auth=auth)
ret_val, headers = self._make_rest_call("post", TOKEN_ENDPOINT, action_result, headers_only=True, first_try=True, auth=auth)
if phantom.is_fail(ret_val):
self.debug_print(f"Error {ret_val} while generating token with response {headers}")
self._reset_state_file()
Expand Down Expand Up @@ -281,7 +281,7 @@ def _process_response(self, r, action_result):

return RetVal(action_result.set_status(phantom.APP_ERROR, msg), None)

def __make_rest_call(self, method, resource, action_result, json_body=None, headers_only=False, first_try=True, params=None, auth=None):
def _make_rest_call(self, method, resource, action_result, json_body=None, headers_only=False, first_try=True, params=None, auth=None):
"""
This method makes a REST call to the API
"""
Expand All @@ -306,7 +306,7 @@ def __make_rest_call(self, method, resource, action_result, json_body=None, head
return action_result.get_status(), None

self.debug_print(f"Running url that failed because of token error {resource}")
return self.__make_rest_call(method, resource, action_result, json_body, headers_only, first_try=False)
return self._make_rest_call(method, resource, action_result, json_body, headers_only, first_try=False)

message = "Error from server. Status Code: {0} Data from server: {1}".format(
result.status_code, result.text.replace("{", "{{").replace("}", "}}")
Expand All @@ -333,7 +333,7 @@ def _handle_test_connectivity(self, param: Dict[str, Any]) -> bool:
self.save_progress("Testing connectivity")

url = GET_HOSTS_ENDPOINT.format(domain_id="default")
ret_val, _ = self.__make_rest_call("get", url, action_result)
ret_val, _ = self._make_rest_call("get", url, action_result)
if phantom.is_fail(ret_val):
self.save_progress("Connectivity test failed")
return action_result.get_status()
Expand All @@ -349,7 +349,7 @@ def get_network_objects_of_type(self, object_type, domain_uuid, action_result, n
params = {"limit": limit}
while True:
params["offset"] = offset
ret_val, response = self.__make_rest_call("get", url, action_result, params=params)
ret_val, response = self._make_rest_call("get", url, action_result, params=params)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand Down Expand Up @@ -399,7 +399,7 @@ def _handle_list_network_objects(self, param: Dict[str, Any]) -> bool:

def get_network_object(self, domain_id: int, object_id: int) -> Tuple[bool, Dict[str, Any]]:
url = NETWORK_OBJECT_ID_ENDPOINT.format(domain_id=domain_id, type="networks", object_id=object_id)
ret_val, response = self.__make_rest_call("get", url, self)
ret_val, response = self._make_rest_call("get", url, self)
return ret_val, response

def _handle_create_network_object(self, param: Dict[str, Any]) -> bool:
Expand All @@ -413,7 +413,7 @@ def _handle_create_network_object(self, param: Dict[str, Any]) -> bool:
domain_uuid = self.get_domain_id(param.get("domain_name"))
url = NETWORK_OBJECTS_ENDPOINT.format(domain_id=domain_uuid, type=object_type.lower() + "s")

ret_val, response = self.__make_rest_call("post", url, action_result, json_body=payload)
ret_val, response = self._make_rest_call("post", url, action_result, json_body=payload)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand All @@ -439,7 +439,7 @@ def _handle_update_network_object(self, param: Dict[str, Any]) -> bool:

url = NETWORK_OBJECT_ID_ENDPOINT.format(domain_id=domain_uuid, type=object_type.lower() + "s", object_id=object_id)

ret_val, response = self.__make_rest_call("put", url, action_result, json_body=payload)
ret_val, response = self._make_rest_call("put", url, action_result, json_body=payload)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand All @@ -457,7 +457,7 @@ def _handle_delete_network_object(self, param: Dict[str, Any]) -> bool:
domain_uuid = self.get_domain_id(param.get("domain_name"))

url = NETWORK_OBJECT_ID_ENDPOINT.format(domain_id=domain_uuid, type=object_type.lower() + "s", object_id=object_id)
ret_val, response = self.__make_rest_call("delete", url, action_result)
ret_val, response = self._make_rest_call("delete", url, action_result)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand Down Expand Up @@ -493,7 +493,7 @@ def _handle_get_network_groups(self, param: Dict[str, Any]) -> bool:
params = {"limit": limit, "expanded": True}
while True:
params["offset"] = offset
ret_val, response = self.__make_rest_call("get", url, action_result, params=params)
ret_val, response = self._make_rest_call("get", url, action_result, params=params)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand Down Expand Up @@ -533,7 +533,7 @@ def _handle_create_network_group(self, param: Dict[str, Any]) -> bool:

url = NETWORK_GROUPS_ENDPOINT.format(domain_id=domain_uuid)

ret_val, response = self.__make_rest_call("post", url, action_result, json_body=payload)
ret_val, response = self._make_rest_call("post", url, action_result, json_body=payload)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand All @@ -544,7 +544,7 @@ def _handle_create_network_group(self, param: Dict[str, Any]) -> bool:

def get_network_group(self, domain_uuid, group_id):
url = NETWORK_GROUPS_ID_ENDPOINT.format(domain_id=domain_uuid, group_id=group_id)
ret_val, response = self.__make_rest_call("get", url, self)
ret_val, response = self._make_rest_call("get", url, self)
return ret_val, response

def _handle_update_network_group(self, param: Dict[str, Any]) -> bool:
Expand Down Expand Up @@ -573,7 +573,7 @@ def _handle_update_network_group(self, param: Dict[str, Any]) -> bool:
resp["objects"] = objects

update_url = NETWORK_GROUPS_ID_ENDPOINT.format(domain_id=domain_uuid, group_id=group_id)
ret_val, response = self.__make_rest_call("put", update_url, action_result, json_body=resp)
ret_val, response = self._make_rest_call("put", update_url, action_result, json_body=resp)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand All @@ -591,7 +591,7 @@ def _handle_delete_network_group(self, param):
domain_uuid = self.get_domain_id(param.get("domain_name"))

update_url = NETWORK_GROUPS_ID_ENDPOINT.format(domain_id=domain_uuid, group_id=group_id)
ret_val, response = self.__make_rest_call("delete", update_url, action_result)
ret_val, response = self._make_rest_call("delete", update_url, action_result)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand All @@ -612,7 +612,7 @@ def _handle_get_access_policies(self, param):
params = {"limit": limit}
while True:
params["offset"] = offset
ret_val, response = self.__make_rest_call("get", url, action_result, params=params)
ret_val, response = self._make_rest_call("get", url, action_result, params=params)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand Down Expand Up @@ -647,7 +647,7 @@ def _handle_create_access_policy(self, param: Dict[str, Any]) -> bool:
payload["description"] = param["description"]

url = ACCESS_POLICY_ENDPOINT.format(domain_id=domain_uuid)
ret_val, response = self.__make_rest_call("post", url, action_result, json_body=payload)
ret_val, response = self._make_rest_call("post", url, action_result, json_body=payload)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand All @@ -658,7 +658,7 @@ def _handle_create_access_policy(self, param: Dict[str, Any]) -> bool:

def get_access_policy(self, domain_uuid, policy_id):
url = ACCESS_POLICY_ID_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id)
ret_val, response = self.__make_rest_call("get", url, self)
ret_val, response = self._make_rest_call("get", url, self)
return ret_val, response

def _handle_update_access_policy(self, param):
Expand Down Expand Up @@ -692,7 +692,7 @@ def _handle_update_access_policy(self, param):

url = ACCESS_POLICY_ID_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id)
print(f"payload is {payload}")
ret_val, response = self.__make_rest_call("put", url, action_result, json_body=payload)
ret_val, response = self._make_rest_call("put", url, action_result, json_body=payload)
if phantom.is_fail(ret_val):
return action_result.get_status()
print(f"updated policy with {response}")
Expand All @@ -710,7 +710,7 @@ def _handle_delete_access_policy(self, param: Dict[str, Any]) -> bool:
policy_id = param["policy_id"]

url = ACCESS_POLICY_ID_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id)
ret_val, response = self.__make_rest_call("delete", url, action_result)
ret_val, response = self._make_rest_call("delete", url, action_result)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand Down Expand Up @@ -744,7 +744,7 @@ def _handle_get_access_rules(self, param: Dict[str, Any]) -> bool:
params = {"limit": limit}
while True:
params["offset"] = offset
ret_val, response = self.__make_rest_call("get", url, action_result, params=params)
ret_val, response = self._make_rest_call("get", url, action_result, params=params)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand Down Expand Up @@ -815,7 +815,7 @@ def _handle_create_access_rules(self, param: Dict[str, Any]) -> bool:
rule_payload["destinationNetworks"]["objects"] = destination_networks_objects

url = ACCESS_RULES_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id)
ret_val, response = self.__make_rest_call("post", url, action_result, json_body=rule_payload)
ret_val, response = self._make_rest_call("post", url, action_result, json_body=rule_payload)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand All @@ -826,7 +826,7 @@ def _handle_create_access_rules(self, param: Dict[str, Any]) -> bool:

def get_access_control_rule(self, domain_id: str, policy_id: str, rule_id: str) -> Tuple[bool, Dict[str, Any]]:
url = ACCESS_RULES_ID_ENDPOINT.format(domain_id=domain_id, policy_id=policy_id, rule_id=rule_id)
ret_val, response = self.__make_rest_call("get", url, self)
ret_val, response = self._make_rest_call("get", url, self)
return ret_val, response

def _handle_update_access_rule(self, param: Dict[str, Any]) -> bool:
Expand Down Expand Up @@ -889,7 +889,7 @@ def _handle_update_access_rule(self, param: Dict[str, Any]) -> bool:
rule_payload["destinationNetworks"] = {"objects": filtered_destination_networks}

url = ACCESS_RULES_ID_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id, rule_id=rule_id)
ret_val, response = self.__make_rest_call("put", url, action_result, json_body=rule_payload)
ret_val, response = self._make_rest_call("put", url, action_result, json_body=rule_payload)

if phantom.is_fail(ret_val):
return action_result.get_status()
Expand All @@ -909,7 +909,7 @@ def _handle_delete_access_rule(self, param: Dict[str, Any]) -> bool:
policy_id = param["policy_id"]

url = ACCESS_RULES_ID_ENDPOINT.format(domain_id=domain_uuid, policy_id=policy_id, rule_id=rule_id)
ret_val, response = self.__make_rest_call("delete", url, action_result)
ret_val, response = self._make_rest_call("delete", url, action_result)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand All @@ -931,7 +931,7 @@ def _handle_list_devices(self, param: Dict[str, Any]) -> bool:
params = {"limit": limit}
while True:
params["offset"] = offset
ret_val, response = self.__make_rest_call("get", url, action_result, params=params)
ret_val, response = self._make_rest_call("get", url, action_result, params=params)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand Down Expand Up @@ -962,7 +962,7 @@ def get_deployable_devices(self, domain_id: str) -> Tuple[bool, Any]:
params = {"limit": limit, "expanded": True}
while True:
params["offset"] = offset
ret_val, response = self.__make_rest_call("get", url, self, params=params)
ret_val, response = self._make_rest_call("get", url, self, params=params)
if phantom.is_fail(ret_val):
return phantom.APP_ERROR, []

Expand Down Expand Up @@ -1022,7 +1022,7 @@ def _handle_deploy_devices(self, param: Dict[str, Any]) -> bool:
url = DEPLOY_DEVICES_ENDPOINT.format(domain_id=domain_uuid)
body = {"type": "DeploymentRequest", "version": "0", "forceDeploy": True, "ignoreWarning": True, "deviceList": devices_to_deploy}

ret_val, response = self.__make_rest_call("post", url, action_result, body)
ret_val, response = self._make_rest_call("post", url, action_result, body)

if phantom.is_fail(ret_val):
return action_result.get_status()
Expand All @@ -1041,7 +1041,7 @@ def _handle_get_deployment_status(self, param: Dict[str, Any]) -> bool:
deployment_id = param["deployment_id"]

url = DEPLOYMENT_STATUS_ENDPOINT.format(domain_id=domain_uuid, task_id=deployment_id)
ret_val, response = self.__make_rest_call("get", url, action_result)
ret_val, response = self._make_rest_call("get", url, action_result)
if phantom.is_fail(ret_val):
return action_result.get_status()

Expand Down

0 comments on commit aa732da

Please sign in to comment.