bp-check/services/ec2.py
2024-08-07 12:39:49 +00:00

193 lines
6.4 KiB
Python

from models import RuleCheckResult
import boto3
client = boto3.client("ec2")
autoscaling_client = boto3.client("autoscaling")
ssm_client = boto3.client("ssm")
def autoscaling_launch_template():
compliant_resources = []
non_compliant_resources = []
asgs = autoscaling_client.describe_auto_scaling_groups()["AutoScalingGroups"]
for asg in asgs:
if "LaunchTemplate" in asg["MixedInstancesPolicy"]:
compliant_resources.append(asg["AutoScalingGroupARN"])
else:
non_compliant_resources.append(asg["AutoScalingGroupARN"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def ec2_ebs_encryption_by_default():
compliant_resources = []
non_compliant_resources = []
ebses = client.describe_volumes()["Volumes"]
for ebs in ebses:
if ebs["Encrypted"] == True:
compliant_resources.append(ebs["VolumeId"])
else:
non_compliant_resources.append(ebs["VolumeId"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def ec2_imdsv2_check():
compliant_resources = []
non_compliant_resources = []
reservations = client.describe_instances()["Reservations"]
for reservation in reservations:
for instance in reservation["Instances"]:
if instance["State"]["Name"] == "terminated":
continue
if instance["MetadataOptions"]["HttpTokens"] == "required":
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def ec2_instance_detailed_monitoring_enabled():
compliant_resources = []
non_compliant_resources = []
reservations = client.describe_instances()["Reservations"]
for reservation in reservations:
for instance in reservation["Instances"]:
if instance["State"]["Name"] == "terminated":
continue
if instance["Monitoring"]["State"] == "enabled":
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def ec2_instance_managed_by_systems_manager():
compliant_resources = []
non_compliant_resources = []
reservations = client.describe_instances()["Reservations"]
informations = ssm_client.describe_instance_information()["InstanceInformationList"]
managed_instance_ids = [i["InstanceId"] for i in informations if i["PingStatus"]]
for reservation in reservations:
for instance in reservation["Instances"]:
if instance["State"]["Name"] == "terminated":
continue
if instance["InstanceId"] in managed_instance_ids:
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def ec2_instance_profile_attached():
compliant_resources = []
non_compliant_resources = []
reservations = client.describe_instances()["Reservations"]
for reservation in reservations:
for instance in reservation["Instances"]:
if instance["State"]["Name"] == "terminated":
continue
if "IamInstanceProfile" in instance:
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def ec2_no_amazon_key_pair():
compliant_resources = []
non_compliant_resources = []
reservations = client.describe_instances()["Reservations"]
for reservation in reservations:
for instance in reservation["Instances"]:
if instance["State"]["Name"] == "terminated":
continue
if "KeyName" in instance:
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def ec2_stopped_instance():
compliant_resources = []
non_compliant_resources = []
reservations = client.describe_instances()["Reservations"]
for reservation in reservations:
for instance in reservation["Instances"]:
if instance["State"]["Name"] == "terminated":
continue
if instance["State"]["Name"] != "stopped":
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def ec2_token_hop_limit_check():
compliant_resources = []
non_compliant_resources = []
reservations = client.describe_instances()["Reservations"]
for reservation in reservations:
for instance in reservation["Instances"]:
if instance["State"]["Name"] == "terminated":
continue
if instance["MetadataOptions"]["HttpPutResponseHopLimit"] < 2:
compliant_resources.append(instance["InstanceId"])
else:
non_compliant_resources.append(instance["InstanceId"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)