bp-check/services/ec2.py

159 lines
5.3 KiB
Python
Raw Permalink Normal View History

2024-08-14 01:05:06 +00:00
from models import RuleCheckResult, RuleChecker
from functools import cached_property
2024-08-05 02:30:34 +00:00
import boto3
2024-08-14 01:05:06 +00:00
class EC2RuleChecker(RuleChecker):
def __init__(self):
self.client = boto3.client("ec2")
self.ssm_client = boto3.client("ssm")
@cached_property
def instances(self):
valid_instances = [
instance
for reservation in self.client.describe_instances()["Reservations"]
for instance in reservation["Instances"]
if instance["State"]["Name"] != "terminated"
]
return valid_instances
def ec2_ebs_encryption_by_default(self):
compliant_resources = []
non_compliant_resources = []
volumes = self.client.describe_volumes()["Volumes"]
for volume in volumes:
if volume["Encrypted"]:
compliant_resources.append(volume["VolumeId"])
else:
non_compliant_resources.append(volume["VolumeId"])
2024-08-05 02:30:34 +00:00
2024-08-14 01:05:06 +00:00
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
2024-08-05 02:30:34 +00:00
2024-08-14 01:05:06 +00:00
def ec2_imdsv2_check(self):
compliant_resources = []
non_compliant_resources = []
2024-08-07 08:06:38 +00:00
2024-08-14 01:05:06 +00:00
for instance in self.instances:
2024-08-07 08:06:38 +00:00
if instance["MetadataOptions"]["HttpTokens"] == "required":
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
2024-08-14 01:05:06 +00:00
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
2024-08-05 02:30:34 +00:00
2024-08-14 01:05:06 +00:00
def ec2_instance_detailed_monitoring_enabled(self):
compliant_resources = []
non_compliant_resources = []
2024-08-07 08:06:38 +00:00
2024-08-14 01:05:06 +00:00
for instance in self.instances:
2024-08-07 08:06:38 +00:00
if instance["Monitoring"]["State"] == "enabled":
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
2024-08-14 01:05:06 +00:00
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
2024-08-05 02:30:34 +00:00
2024-08-14 01:05:06 +00:00
def ec2_instance_managed_by_systems_manager(self):
compliant_resources = []
non_compliant_resources = []
2024-08-05 02:30:34 +00:00
2024-08-14 01:05:06 +00:00
informations = self.ssm_client.describe_instance_information()[
"InstanceInformationList"
]
managed_instance_ids = [
info["InstanceId"] for info in informations if info["PingStatus"]
]
2024-08-07 08:06:38 +00:00
2024-08-14 01:05:06 +00:00
for instance in self.instances:
2024-08-07 08:06:38 +00:00
if instance["InstanceId"] in managed_instance_ids:
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
2024-08-14 01:05:06 +00:00
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
2024-08-05 02:30:34 +00:00
2024-08-14 01:05:06 +00:00
def ec2_instance_profile_attached(self):
compliant_resources = []
non_compliant_resources = []
2024-08-05 02:30:34 +00:00
2024-08-14 01:05:06 +00:00
for instance in self.instances:
2024-08-07 08:06:38 +00:00
if "IamInstanceProfile" in instance:
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
2024-08-14 01:05:06 +00:00
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
2024-08-05 02:30:34 +00:00
2024-08-14 01:05:06 +00:00
def ec2_no_amazon_key_pair(self):
compliant_resources = []
non_compliant_resources = []
2024-08-07 08:06:38 +00:00
2024-08-14 01:05:06 +00:00
for instance in self.instances:
2024-08-07 08:06:38 +00:00
if "KeyName" in instance:
non_compliant_resources.append(instance["InstanceId"])
2024-08-07 12:39:35 +00:00
else:
compliant_resources.append(instance["InstanceId"])
2024-08-07 08:06:38 +00:00
2024-08-14 01:05:06 +00:00
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
2024-08-05 02:30:34 +00:00
2024-08-14 01:05:06 +00:00
def ec2_stopped_instance(self):
compliant_resources = []
non_compliant_resources = []
2024-08-05 02:30:34 +00:00
2024-08-14 01:05:06 +00:00
for instance in self.instances:
2024-08-07 08:06:38 +00:00
if instance["State"]["Name"] != "stopped":
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
2024-08-14 01:05:06 +00:00
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
2024-08-05 02:30:34 +00:00
2024-08-14 01:05:06 +00:00
def ec2_token_hop_limit_check(self):
compliant_resources = []
non_compliant_resources = []
2024-08-07 08:06:38 +00:00
2024-08-14 01:05:06 +00:00
for instance in self.instances:
2024-08-07 08:06:38 +00:00
if instance["MetadataOptions"]["HttpPutResponseHopLimit"] < 2:
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
2024-08-14 01:05:06 +00:00
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
rule_checker = EC2RuleChecker