From aab42f61b01fbdc3c79fdea6665c15ab4840afc8 Mon Sep 17 00:00:00 2001 From: Juwon Date: Wed, 7 Aug 2024 11:33:23 +0900 Subject: [PATCH] Add feature : S3 bp check --- services/s3.py | 167 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 156 insertions(+), 11 deletions(-) diff --git a/services/s3.py b/services/s3.py index da8fe1a..92442cb 100644 --- a/services/s3.py +++ b/services/s3.py @@ -2,64 +2,209 @@ from models import RuleCheckResult import boto3 -# client = boto3.client("") +client = boto3.client("s3") +sts_client = boto3.client("sts") +s3control_client = boto3.client("s3control") +backup_client = boto3.client("backup") def s3_access_point_in_vpc_only(): + compliant_resources = [] + non_compliant_resources = [] + account_id = sts_client.get_caller_identity().get("Account") + access_points = s3control_client.list_access_points(AccountId=account_id)["AccessPointList"] + + for access_point in access_points: + if access_point["NetworkOrigin"] == "VPC": + compliant_resources.append(access_point["AccessPointArn"]) + else: + non_compliant_resources.append(access_point["AccessPointArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def s3_bucket_default_lock_enabled(): + compliant_resources = [] + non_compliant_resources = [] + buckets = client.list_buckets()["Buckets"] + + for bucket in buckets: + try: + response = client.get_object_lock_configuration(Bucket=bucket["Name"]) + compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + except Exception as e: + if e.__class__.__name__ == "ObjectLockConfigurationNotFoundError": + non_compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + else: + raise e + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def s3_bucket_level_public_access_prohibited(): + compliant_resources = [] + non_compliant_resources = [] + buckets = client.list_buckets()["Buckets"] + + for bucket in buckets: + response = client.get_public_access_block(Bucket=bucket["Name"]) + if False not in response["PublicAccessBlockConfiguration"].values(): + compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + else: + non_compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def s3_bucket_logging_enabled(): + compliant_resources = [] + non_compliant_resources = [] + buckets = client.list_buckets()["Buckets"] + + for bucket in buckets: + response = client.get_bucket_logging(Bucket=bucket["Name"]) + if "LoggingEnabled" in response: + compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + else: + non_compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def s3_bucket_ssl_requests_only(): + compliant_resources = [] + non_compliant_resources = [] + buckets = client.list_buckets()["Buckets"] + + for bucket in buckets: + policy = client.get_bucket_policy(Bucket=bucket["Name"])["Policy"] + if "aws:SecureTransport" in policy: + compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + else: + non_compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def s3_bucket_versioning_enabled(): + compliant_resources = [] + non_compliant_resources = [] + buckets = client.list_buckets()["Buckets"] + + for bucket in buckets: + response = client.get_bucket_versioning(Bucket=bucket["Name"]) + if "Status" in response and response["Status"] == "Enabled": + compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + else: + non_compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def s3_default_encryption_kms(): + compliant_resources = [] + non_compliant_resources = [] + buckets = client.list_buckets()["Buckets"] + + for bucket in buckets: + configuration = client.get_bucket_encryption(Bucket=bucket["Name"])["ServerSideEncryptionConfiguration"] + + if configuration["Rules"][0]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] == "aws:kms": + compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + else: + non_compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def s3_event_notifications_enabled(): + compliant_resources = [] + non_compliant_resources = [] + buckets = client.list_buckets()["Buckets"] + + for bucket in buckets: + configuration = client.get_bucket_notification_configuration(Bucket=bucket["Name"]) + if ( + "LambdaFunctionConfigurations" in configuration + or "QueueConfigurations" in configuration + or "TopicConfigurations" in configuration + ): + compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + else: + non_compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def s3_last_backup_recovery_point_created(): + compliant_resources = [] + non_compliant_resources = [] + buckets = client.list_buckets()["Buckets"] + + for bucket in buckets: + backups = backup_client.list_recovery_points_by_resource(ResourceArn=f"arn:aws:s3:::{bucket['Name']}") + + if backups["RecoveryPoints"] != []: + compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + else: + non_compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def s3_lifecycle_policy_check(): + compliant_resources = [] + non_compliant_resources = [] + buckets = client.list_buckets()["Buckets"] + + for bucket in buckets: + try: + configuration = client.get_bucket_lifecycle_configuration(Bucket=bucket["Name"]) + compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + except Exception as e: + if e.__class__.__name__ == "NoSuchLifecycleConfiguration": + non_compliant_resources.append(f"arn:aws:s3:::{bucket['Name']}") + else: + raise e + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, )