bp-check/services/efs.py

119 lines
3.9 KiB
Python
Raw Normal View History

2024-08-05 02:30:34 +00:00
from models import RuleCheckResult
import boto3
2024-08-06 05:34:45 +00:00
client = boto3.client("efs")
ec2_client = boto3.client("ec2")
2024-08-05 02:30:34 +00:00
def efs_access_point_enforce_root_directory():
2024-08-06 05:34:45 +00:00
access_points = client.describe_access_points()["AccessPoints"]
compliant_resource = []
non_compliant_resources = []
for access_point in access_points:
if access_point["RootDirectory"]["Path"] != "/":
compliant_resource.append(access_point["AccessPointArn"])
else:
non_compliant_resources.append(access_point["AccessPointArn"])
2024-08-05 02:30:34 +00:00
return RuleCheckResult(
2024-08-06 05:34:45 +00:00
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
2024-08-05 02:30:34 +00:00
)
def efs_access_point_enforce_user_identity():
2024-08-06 05:34:45 +00:00
access_points = client.describe_access_points()["AccessPoints"]
compliant_resource = []
non_compliant_resources = []
for access_point in access_points:
if "PosixUser" in access_point:
compliant_resource.append(access_point["AccessPointArn"])
else:
non_compliant_resources.append(access_point["AccessPointArn"])
2024-08-05 02:30:34 +00:00
return RuleCheckResult(
2024-08-06 05:34:45 +00:00
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
2024-08-05 02:30:34 +00:00
)
def efs_automatic_backups_enabled():
2024-08-06 05:34:45 +00:00
file_systems = client.describe_file_systems()["FileSystems"]
compliant_resource = []
non_compliant_resources = []
for file_system in file_systems:
response = client.describe_backup_policy(
FileSystemId=file_system["FileSystemId"]
)
if response["BackupPolicy"]["Status"] == "ENABLED":
compliant_resource.append(file_system["FileSystemArn"])
else:
non_compliant_resources.append(file_system["FileSystemArn"])
2024-08-05 02:30:34 +00:00
return RuleCheckResult(
2024-08-06 05:34:45 +00:00
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
2024-08-05 02:30:34 +00:00
)
def efs_encrypted_check():
2024-08-06 05:34:45 +00:00
file_systems = client.describe_file_systems()["FileSystems"]
compliant_resource = []
non_compliant_resources = []
for file_system in file_systems:
if file_system["Encrypted"] == True:
compliant_resource.append(file_system["FileSystemArn"])
else:
non_compliant_resources.append(file_system["FileSystemArn"])
2024-08-05 02:30:34 +00:00
return RuleCheckResult(
2024-08-06 05:34:45 +00:00
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
2024-08-05 02:30:34 +00:00
)
def efs_mount_target_public_accessible():
2024-08-06 05:34:45 +00:00
file_systems = client.describe_file_systems()["FileSystems"]
compliant_resource = []
non_compliant_resources = []
for file_system in file_systems:
mount_targets = client.describe_mount_targets(
FileSystemId=file_system["FileSystemId"]
)["MountTargets"]
for mount_target in mount_targets:
subnet_id = mount_target["SubnetId"]
routes = ec2_client.describe_route_tables(
Filters=[{"Name": "association.subnet-id", "Values": [subnet_id]}]
)["RouteTables"][0]["Routes"]
for route in routes:
if (
"DestinationCidrBlock" in route
and route["DestinationCidrBlock"] == "0.0.0.0/0"
and "GatewayId" in route
and route["GatewayId"].startswith("igw-")
):
non_compliant_resources.append(file_system["FileSystemArn"])
break
else:
compliant_resource.append(file_system["FileSystemArn"])
compliant_resource = list(set(compliant_resource))
non_compliant_resources = list(set(non_compliant_resources))
2024-08-05 02:30:34 +00:00
return RuleCheckResult(
2024-08-06 05:34:45 +00:00
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
2024-08-05 02:30:34 +00:00
)