From 2a92f6ee05e4582513eeed11afd9fa37fdf405e7 Mon Sep 17 00:00:00 2001 From: Juwon Date: Wed, 7 Aug 2024 17:06:38 +0900 Subject: [PATCH] Add feature: EC2 bp check --- services/ec2.py | 153 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 143 insertions(+), 10 deletions(-) diff --git a/services/ec2.py b/services/ec2.py index b0c8422..41b1820 100644 --- a/services/ec2.py +++ b/services/ec2.py @@ -2,58 +2,191 @@ from models import RuleCheckResult import boto3 -# client = boto3.client("") +client = boto3.client("ec2") +autoscaling_client = boto3.client("autoscaling") +ssm_client = boto3.client("ssm") def autoscaling_launch_template(): + compliant_resources = [] + non_compliant_resources = [] + asgs = autoscaling_client.describe_auto_scaling_groups()["AutoScalingGroups"] + + for asg in asgs: + if "LaunchTemplate" in asg["MixedInstancesPolicy"]: + compliant_resources.append(asg["AutoScalingGroupARN"]) + else: + non_compliant_resources.append(asg["AutoScalingGroupARN"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ec2_ebs_encryption_by_default(): + compliant_resources = [] + non_compliant_resources = [] + ebses = client.describe_volumes()["Volumes"] + + for ebs in ebses: + if ebs["Encrypted"] == True: + compliant_resources.append(ebs["VolumeId"]) + else: + non_compliant_resources.append(ebs["VolumeId"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ec2_imdsv2_check(): + compliant_resources = [] + non_compliant_resources = [] + reservations = client.describe_instances()["Reservations"] + + for reservation in reservations: + for instance in reservation["Instances"]: + if instance["State"]["Name"] == "terminated": + continue + if instance["MetadataOptions"]["HttpTokens"] == "required": + compliant_resources.append(instance["InstanceId"]) + else: + non_compliant_resources.append(instance["InstanceId"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ec2_instance_detailed_monitoring_enabled(): + compliant_resources = [] + non_compliant_resources = [] + reservations = client.describe_instances()["Reservations"] + + for reservation in reservations: + for instance in reservation["Instances"]: + if instance["State"]["Name"] == "terminated": + continue + if instance["Monitoring"]["State"] == "enabled": + compliant_resources.append(instance["InstanceId"]) + else: + non_compliant_resources.append(instance["InstanceId"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ec2_instance_managed_by_systems_manager(): + compliant_resources = [] + non_compliant_resources = [] + reservations = client.describe_instances()["Reservations"] + informations = ssm_client.describe_instance_information()["InstanceInformationList"] + managed_instance_ids = [i["InstanceId"] for i in informations if i["PingStatus"]] + + for reservation in reservations: + for instance in reservation["Instances"]: + if instance["State"]["Name"] == "terminated": + continue + if instance["InstanceId"] in managed_instance_ids: + compliant_resources.append(instance["InstanceId"]) + else: + non_compliant_resources.append(instance["InstanceId"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ec2_instance_profile_attached(): + compliant_resources = [] + non_compliant_resources = [] + reservations = client.describe_instances()["Reservations"] + + for reservation in reservations: + for instance in reservation["Instances"]: + if instance["State"]["Name"] == "terminated": + continue + if "IamInstanceProfile" in instance: + compliant_resources.append(instance["InstanceId"]) + else: + non_compliant_resources.append(instance["InstanceId"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ec2_no_amazon_key_pair(): + compliant_resources = [] + non_compliant_resources = [] + reservations = client.describe_instances()["Reservations"] + + for reservation in reservations: + for instance in reservation["Instances"]: + if instance["State"]["Name"] == "terminated": + continue + if "KeyName" in instance: + compliant_resources.append(instance["InstanceId"]) + else: + non_compliant_resources.append(instance["InstanceId"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ec2_stopped_instance(): + compliant_resources = [] + non_compliant_resources = [] + reservations = client.describe_instances()["Reservations"] + + for reservation in reservations: + for instance in reservation["Instances"]: + if instance["State"]["Name"] == "terminated": + continue + if instance["State"]["Name"] != "stopped": + compliant_resources.append(instance["InstanceId"]) + else: + non_compliant_resources.append(instance["InstanceId"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ec2_token_hop_limit_check(): + compliant_resources = [] + non_compliant_resources = [] + reservations = client.describe_instances()["Reservations"] + + for reservation in reservations: + for instance in reservation["Instances"]: + if instance["State"]["Name"] == "terminated": + continue + if instance["MetadataOptions"]["HttpPutResponseHopLimit"] < 2: + compliant_resources.append(instance["InstanceId"]) + else: + non_compliant_resources.append(instance["InstanceId"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, )