2024-08-14 01:05:06 +00:00
|
|
|
from models import RuleCheckResult, RuleChecker
|
|
|
|
from functools import cached_property
|
2024-08-05 02:30:34 +00:00
|
|
|
import boto3
|
|
|
|
|
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
class EFSRuleChecker(RuleChecker):
|
|
|
|
def __init__(self):
|
|
|
|
self.client = boto3.client("efs")
|
|
|
|
self.ec2_client = boto3.client("ec2")
|
2024-08-05 02:30:34 +00:00
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
@cached_property
|
|
|
|
def access_points(self):
|
|
|
|
return self.client.describe_access_points()["AccessPoints"]
|
2024-08-05 02:30:34 +00:00
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
@cached_property
|
|
|
|
def file_systems(self):
|
|
|
|
return self.client.describe_file_systems()["FileSystems"]
|
2024-08-06 05:34:45 +00:00
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
def efs_access_point_enforce_root_directory(self):
|
|
|
|
compliant_resource = []
|
|
|
|
non_compliant_resources = []
|
2024-08-06 05:34:45 +00:00
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
for access_point in self.access_points:
|
|
|
|
if access_point["RootDirectory"]["Path"] != "/":
|
|
|
|
compliant_resource.append(access_point["AccessPointArn"])
|
|
|
|
else:
|
|
|
|
non_compliant_resources.append(access_point["AccessPointArn"])
|
2024-08-05 02:30:34 +00:00
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
return RuleCheckResult(
|
|
|
|
passed=not non_compliant_resources,
|
|
|
|
compliant_resources=compliant_resource,
|
|
|
|
non_compliant_resources=non_compliant_resources,
|
|
|
|
)
|
2024-08-05 02:30:34 +00:00
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
def efs_access_point_enforce_user_identity(self):
|
|
|
|
compliant_resource = []
|
|
|
|
non_compliant_resources = []
|
2024-08-06 05:34:45 +00:00
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
for access_point in self.access_points:
|
|
|
|
if "PosixUser" in access_point:
|
|
|
|
compliant_resource.append(access_point["AccessPointArn"])
|
|
|
|
else:
|
|
|
|
non_compliant_resources.append(access_point["AccessPointArn"])
|
2024-08-06 05:34:45 +00:00
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
return RuleCheckResult(
|
|
|
|
passed=not non_compliant_resources,
|
|
|
|
compliant_resources=compliant_resource,
|
|
|
|
non_compliant_resources=non_compliant_resources,
|
|
|
|
)
|
2024-08-05 02:30:34 +00:00
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
def efs_automatic_backups_enabled(self):
|
|
|
|
compliant_resource = []
|
|
|
|
non_compliant_resources = []
|
2024-08-05 02:30:34 +00:00
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
for file_system in self.file_systems:
|
|
|
|
response = self.client.describe_backup_policy(
|
|
|
|
FileSystemId=file_system["FileSystemId"]
|
|
|
|
)
|
2024-08-06 05:34:45 +00:00
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
if response["BackupPolicy"]["Status"] == "ENABLED":
|
|
|
|
compliant_resource.append(file_system["FileSystemArn"])
|
2024-08-06 05:34:45 +00:00
|
|
|
else:
|
2024-08-14 01:05:06 +00:00
|
|
|
non_compliant_resources.append(file_system["FileSystemArn"])
|
|
|
|
|
|
|
|
return RuleCheckResult(
|
|
|
|
passed=not non_compliant_resources,
|
|
|
|
compliant_resources=compliant_resource,
|
|
|
|
non_compliant_resources=non_compliant_resources,
|
|
|
|
)
|
|
|
|
|
|
|
|
def efs_encrypted_check(self):
|
|
|
|
compliant_resource = []
|
|
|
|
non_compliant_resources = []
|
|
|
|
|
|
|
|
for file_system in self.file_systems:
|
|
|
|
if file_system["Encrypted"]:
|
2024-08-06 05:34:45 +00:00
|
|
|
compliant_resource.append(file_system["FileSystemArn"])
|
2024-08-14 01:05:06 +00:00
|
|
|
else:
|
|
|
|
non_compliant_resources.append(file_system["FileSystemArn"])
|
|
|
|
|
|
|
|
return RuleCheckResult(
|
|
|
|
passed=not non_compliant_resources,
|
|
|
|
compliant_resources=compliant_resource,
|
|
|
|
non_compliant_resources=non_compliant_resources,
|
|
|
|
)
|
|
|
|
|
|
|
|
def efs_mount_target_public_accessible(self):
|
|
|
|
compliant_resource = []
|
|
|
|
non_compliant_resources = []
|
|
|
|
|
|
|
|
for file_system in self.file_systems:
|
|
|
|
mount_targets = self.client.describe_mount_targets(
|
|
|
|
FileSystemId=file_system["FileSystemId"]
|
|
|
|
)["MountTargets"]
|
|
|
|
|
|
|
|
for mount_target in mount_targets:
|
|
|
|
subnet_id = mount_target["SubnetId"]
|
|
|
|
routes = self.ec2_client.describe_route_tables(
|
|
|
|
Filters=[{"Name": "association.subnet-id", "Values": [subnet_id]}]
|
|
|
|
)["RouteTables"][0]["Routes"]
|
|
|
|
|
|
|
|
for route in routes:
|
|
|
|
if (
|
|
|
|
"DestinationCidrBlock" in route
|
|
|
|
and route["DestinationCidrBlock"] == "0.0.0.0/0"
|
|
|
|
and "GatewayId" in route
|
|
|
|
and route["GatewayId"].startswith("igw-")
|
|
|
|
):
|
|
|
|
non_compliant_resources.append(file_system["FileSystemArn"])
|
|
|
|
break
|
|
|
|
|
|
|
|
non_compliant_resources = list(set(non_compliant_resources))
|
|
|
|
compliant_resource = list(
|
|
|
|
set(compliant_resource) - set(non_compliant_resources)
|
|
|
|
)
|
|
|
|
|
|
|
|
return RuleCheckResult(
|
|
|
|
passed=not non_compliant_resources,
|
|
|
|
compliant_resources=compliant_resource,
|
|
|
|
non_compliant_resources=non_compliant_resources,
|
|
|
|
)
|
2024-08-06 05:34:45 +00:00
|
|
|
|
|
|
|
|
2024-08-14 01:05:06 +00:00
|
|
|
rule_checker = EFSRuleChecker
|