Skip to content

Commit

Permalink
Add sts:AssumeRole action when checking roles (#2244)
Browse files Browse the repository at this point in the history
  • Loading branch information
meln1k authored Oct 17, 2024
1 parent 23cd836 commit 3ba74a9
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 6 deletions.
5 changes: 5 additions & 0 deletions fixlib/fixlib/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ def find_cycle(self) -> Optional[List[EdgeKey]]:
for edge in self.edges(keys=True):
if len(edge) == 3:
key: EdgeKey = edge[2]

# skip iam edges, they're checked by the collector
if key.edge_type == EdgeType.iam:
continue

edges_per_type[key.edge_type].append(edge)
for edges in edges_per_type.values():
typed_graph = self.edge_subgraph(edges)
Expand Down
56 changes: 50 additions & 6 deletions plugins/aws/fix_plugin_aws/access_edges.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from functools import lru_cache
from attr import frozen, define
import networkx
from fix_plugin_aws.resource.base import AwsAccount, AwsResource, GraphBuilder

from typing import Dict, List, Literal, Set, Optional, Tuple, Union, Pattern

from networkx.algorithms.dag import is_directed_acyclic_graph

from fixlib.baseresources import (
PermissionCondition,
PolicySource,
Expand All @@ -21,6 +24,7 @@
from policy_sentry.querying.actions import get_action_data, get_actions_matching_arn
from policy_sentry.querying.all import get_all_actions
from policy_sentry.util.arns import ARN, get_service_from_arn
from fixlib.graph import EdgeKey
import re
import logging

Expand Down Expand Up @@ -63,13 +67,30 @@ def find_allowed_action(policy_document: PolicyDocument, service_prefix: str) ->
return allowed_actions


def find_non_service_actions(resource_arn: str) -> Set[IamAction]:
try:
splitted = resource_arn.split(":")
service_prefix = splitted[2]
if service_prefix == "iam":
resource_type = splitted[5]
resource = resource_type.split("/")[0]
if resource == "role":
return {"sts:AssumeRole"}
except Exception as e:
log.info(f"Error when trying to get non-service actions for ARN {resource_arn}: {e}")
return set()


def find_all_allowed_actions(all_involved_policies: List[PolicyDocument], resource_arn: str) -> Set[IamAction]:
resource_actions = set()
try:
resource_actions = set(get_actions_matching_arn(resource_arn))
except Exception as e:
log.debug(f"Error when trying to get actions matching ARN {resource_arn}: {e}")

if additinal_actions := find_non_service_actions(resource_arn):
resource_actions.update(additinal_actions)

service_prefix = ""
try:
service_prefix = get_service_from_arn(resource_arn)
Expand Down Expand Up @@ -793,15 +814,12 @@ def _get_role_based_policies(self, principal: AwsIamRole) -> List[Tuple[PolicySo

def add_access_edges(self) -> None:

principal_arns = set([p.principal.arn for p in self.principals])

for node in self.builder.nodes(clazz=AwsResource, filter=lambda r: r.arn is not None):

if node.arn in principal_arns:
# do not create cycles
continue

for context in self.principals:
if context.principal.arn == node.arn:
# small graph cycles avoidance optimization
continue

resource_policies: List[Tuple[PolicySource, PolicyDocument]] = []
if isinstance(node, HasResourcePolicy):
Expand All @@ -821,3 +839,29 @@ def add_access_edges(self) -> None:
reported = to_json({"permissions": permissions, "access": access}, strip_nulls=True)

self.builder.add_edge(from_node=context.principal, edge_type=EdgeType.iam, reported=reported, node=node)

all_principal_arns = {p.principal.arn for p in self.principals if p.principal.arn}

# check that there are no cycles in the IAM edges besides the principal -> principal edges
iam_edges_no_double_principal = []
for edge in self.builder.graph.edges(keys=True):
if len(edge) != 3:
continue

# skip non-iam edges
key: EdgeKey = edge[2]
if key.edge_type != EdgeType.iam:
continue

# skip the principal -> principal edges
if key.src.arn in all_principal_arns and key.dst.arn in all_principal_arns:
continue

iam_edges_no_double_principal.append(edge)

# check for loops:
subgraph = self.builder.graph.edge_subgraph(iam_edges_no_double_principal)
if not is_directed_acyclic_graph(subgraph):
cycle = [edge[2] for edge in networkx.algorithms.cycles.find_cycle(subgraph)]
desc = ", ".join(f"{key.edge_type}: {key.src.kdname}-->{key.dst.kdname}" for key in cycle)
log.error(f"IAM graph of account {self.builder.account.arn} is not acyclic! Cycle {desc}")

0 comments on commit 3ba74a9

Please sign in to comment.