Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #167 #172

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 60 additions & 46 deletions services/s3/drivers/S3Bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,67 @@ def __init__(self, bucket, s3Client):

self.init()

def policyAllowsPublicRead (self, policy_document):
# Check if the policy allows public read access
# Check if the policy allows public read access
response = None
policy_allows_public_read = False
if policy_document:
iam = boto3.client('iam')
response = iam.simulate_principal_policy(
PolicySourceArn=f"arn:aws:s3:::{self.bucket}",
PolicyInputList=[policy_document],
ActionNames=['s3:GetObject', 's3:ListBucket'],
ResourceArns=[f"arn:aws:s3:::{self.bucket}", f"arn:aws:s3:::{self.bucket}/*"],
PermissionsBoundarySet=[],
ContextEntries=[],
ResourceHandlingOption='SINGLE_SERVICE_MULTI_RESOURCE'
)

if response and response['EvaluationResults'][0]['EvalDecision'] == 'allowed':
policy_allows_public_read = True
return policy_allows_public_read

def policyAllowsPublicWrite (self, policy_document):
# Check if the policy allows public read access
# Check if the policy allows public read access
response = None
policy_allows_public_write = True
if policy_document:
iam = boto3.client('iam')
response = iam.simulate_principal_policy(
PolicySourceArn=f"arn:aws:s3:::{self.bucket}",
PolicyInputList=[policy_document],
ActionNames=['s3:PutObject'],
ResourceArns=[f"arn:aws:s3:::{self.bucket}", f"arn:aws:s3:::{self.bucket}/*"],
PermissionsBoundarySet=[],
ContextEntries=[],
ResourceHandlingOption='SINGLE_SERVICE_MULTI_RESOURCE'
)

if response and response['EvaluationResults'][0]['EvalDecision'] == 'allowed':
policy_allows_public_write = True
return policy_allows_public_write
def policyAllowsPublicRead(self, policy_document):
"""
Check if the policy allows public read access

Args:
policy_document (str): The bucket policy as a string

Returns:
bool: True if policy allows public read access, False otherwise
"""
if not policy_document:
return False

try:
policy = json.loads(policy_document)

# Check for public read in policy
for statement in policy['Statement']:
principal = statement.get('Principal', {})
if (principal == '*' or principal.get('AWS') == '*') and \
statement['Effect'] == 'Allow' and \
any(action in ['s3:GetObject', 's3:GetObjectVersion', 's3:ListBucket']
for action in (statement.get('Action', []) if isinstance(statement.get('Action', []), list)
else [statement.get('Action', [])])):
return True

return False

except (json.JSONDecodeError, KeyError):
return False

def policyAllowsPublicWrite(self, policy_document):
"""
Check if the policy allows public write access

Args:
policy_document (str): The bucket policy as a string

Returns:
bool: True if policy allows public write access, False otherwise
"""
if not policy_document:
return False

try:
policy = json.loads(policy_document)

# Check for public write in policy
for statement in policy['Statement']:
principal = statement.get('Principal', {})
if (principal == '*' or principal.get('AWS') == '*') and \
statement['Effect'] == 'Allow' and \
any(action in ['s3:PutObject', 's3:DeleteObject']
for action in (statement.get('Action', []) if isinstance(statement.get('Action', []), list)
else [statement.get('Action', [])])):
return True

return False

except (json.JSONDecodeError, KeyError):
return False

def getBucketPolicy(self):
try:
Expand Down Expand Up @@ -133,12 +152,10 @@ def _checkAccess(self):
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'NoSuchAcl':
return None


# check if S3 bucket has prohibited public reads
policy = self.getBucketPolicy()


if (public_policy_restricted or not self.policyAllowsPublicRead(policy)) and (public_acl_restricted or not self.aclAllowsPublicRead(bucket_acl)):
self.results['PublicReadAccessBlock'] = [1, 'Prohibited']
else:
Expand All @@ -150,7 +167,6 @@ def _checkAccess(self):
else:
self.results['PublicWriteAccessBlock'] = [-1, 'NotProhibited']


def _checkMfaDeleteAndVersioning(self):
self.results['MFADelete'] = [-1, 'Off']
self.results['BucketVersioning'] = [-1, 'Off']
Expand Down Expand Up @@ -197,8 +213,6 @@ def _checkBucketReplication(self):
if e.response['Error']['Code'] == 'ReplicationConfigurationNotFoundError':
self.results['BucketReplication'] = [-1, 'Off']



def _checkLifecycle(self):
self.results['BucketLifecycle'] = [1, 'On']
try:
Expand Down
Loading