From 7d5076b570794bf2ee09fa0db6c9b64bf3c74485 Mon Sep 17 00:00:00 2001 From: Juwon Date: Sat, 10 Aug 2024 14:49:34 +0900 Subject: [PATCH] Add feature : ECS bp check --- services/ecs.py | 192 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 183 insertions(+), 9 deletions(-) diff --git a/services/ecs.py b/services/ecs.py index 6e2f04a..d7e4727 100644 --- a/services/ecs.py +++ b/services/ecs.py @@ -2,52 +2,226 @@ from models import RuleCheckResult import boto3 -# client = boto3.client("") +client = boto3.client("ecs") def ecs_awsvpc_networking_enabled(): + compliant_resources = [] + non_compliant_resources = [] + task_definitions = client.list_task_definitions(status="ACTIVE")["taskDefinitionArns"] + latest_task_definitions = {} + + for task_definition in task_definitions: + if latest_task_definitions.get(task_definition.rsplit(":", 1)[0], 0) < int(task_definition.rsplit(":", 1)[1]): + latest_task_definitions[task_definition.rsplit(":", 1)[0]] = int(task_definition.rsplit(":", 1)[1]) + + for task_definition in latest_task_definitions.keys(): + task_definition_arn = f"{task_definition}:{latest_task_definitions[task_definition]}" + task_definition = client.describe_task_definition(taskDefinition=task_definition_arn)["taskDefinition"] + + if task_definition.get("networkMode") == "awsvpc": + compliant_resources.append(task_definition["taskDefinitionArn"]) + else: + non_compliant_resources.append(task_definition["taskDefinitionArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ecs_containers_nonprivileged(): + compliant_resources = [] + non_compliant_resources = [] + task_definitions = client.list_task_definitions(status="ACTIVE")["taskDefinitionArns"] + latest_task_definitions = {} + + for task_definition in task_definitions: + if latest_task_definitions.get(task_definition.rsplit(":", 1)[0], 0) < int(task_definition.rsplit(":", 1)[1]): + latest_task_definitions[task_definition.rsplit(":", 1)[0]] = int(task_definition.rsplit(":", 1)[1]) + + for task_definition in latest_task_definitions.keys(): + task_definition_arn = f"{task_definition}:{latest_task_definitions[task_definition]}" + task_definition = client.describe_task_definition(taskDefinition=task_definition_arn)["taskDefinition"] + containers = task_definition["containerDefinitions"] + + for container in containers: + if container.get("privileged") == True: + non_compliant_resources.append(task_definition["taskDefinitionArn"]) + break + else: + compliant_resources.append(task_definition["taskDefinitionArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ecs_containers_readonly_access(): + compliant_resources = [] + non_compliant_resources = [] + task_definitions = client.list_task_definitions(status="ACTIVE")["taskDefinitionArns"] + latest_task_definitions = {} + + for task_definition in task_definitions: + if latest_task_definitions.get(task_definition.rsplit(":", 1)[0], 0) < int(task_definition.rsplit(":", 1)[1]): + latest_task_definitions[task_definition.rsplit(":", 1)[0]] = int(task_definition.rsplit(":", 1)[1]) + + for task_definition in latest_task_definitions.keys(): + task_definition_arn = f"{task_definition}:{latest_task_definitions[task_definition]}" + task_definition = client.describe_task_definition(taskDefinition=task_definition_arn)["taskDefinition"] + containers = task_definition["containerDefinitions"] + + for container in containers: + if container.get("readonlyRootFilesystem") == False: + non_compliant_resources.append(task_definition["taskDefinitionArn"]) + break + else: + compliant_resources.append(task_definition["taskDefinitionArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ecs_container_insights_enabled(): + compliant_resources = [] + non_compliant_resources = [] + cluster_arns = client.list_clusters()["clusterArns"] + + for cluster_arn in cluster_arns: + clusters = client.describe_clusters(clusters=[cluster_arn], include=["SETTINGS"])["clusters"] + + for cluster in clusters: + settings = cluster["settings"] + + for setting in settings: + if setting["name"] == "containerInsights" and setting["value"] == "enabled": + compliant_resources.append(cluster_arn) + break + else: + non_compliant_resources.append(cluster_arn) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ecs_fargate_latest_platform_version(): + compliant_resources = [] + non_compliant_resources = [] + cluster_arns = client.list_clusters()["clusterArns"] + + for cluster_arn in cluster_arns: + services = client.list_services(cluster=cluster_arn, launchType="FARGATE")["serviceArns"] + + for service in services: + service = client.describe_services(cluster=cluster_arn, services=[service])["services"][0] + + if service["launchType"] == "FARGATE": + if service["platformVersion"] == "LATEST": + compliant_resources.append(service["serviceArn"]) + else: + non_compliant_resources.append(service["serviceArn"]) + else: + compliant_resources.append(service["serviceArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ecs_task_definition_log_configuration(): + compliant_resources = [] + non_compliant_resources = [] + task_definitions = client.list_task_definitions(status="ACTIVE")["taskDefinitionArns"] + latest_task_definitions = {} + + for task_definition in task_definitions: + if latest_task_definitions.get(task_definition.rsplit(":", 1)[0], 0) < int(task_definition.rsplit(":", 1)[1]): + latest_task_definitions[task_definition.rsplit(":", 1)[0]] = int(task_definition.rsplit(":", 1)[1]) + + for task_definition in latest_task_definitions.keys(): + task_definition_arn = f"{task_definition}:{latest_task_definitions[task_definition]}" + task_definition = client.describe_task_definition(taskDefinition=task_definition_arn)["taskDefinition"] + containers = task_definition["containerDefinitions"] + + for container in containers: + if container.get("logConfiguration") == None: + non_compliant_resources.append(task_definition["taskDefinitionArn"]) + break + else: + compliant_resources.append(task_definition["taskDefinitionArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ecs_task_definition_memory_hard_limit(): + compliant_resources = [] + non_compliant_resources = [] + task_definitions = client.list_task_definitions(status="ACTIVE")["taskDefinitionArns"] + latest_task_definitions = {} + + for task_definition in task_definitions: + if latest_task_definitions.get(task_definition.rsplit(":", 1)[0], 0) < int(task_definition.rsplit(":", 1)[1]): + latest_task_definitions[task_definition.rsplit(":", 1)[0]] = int(task_definition.rsplit(":", 1)[1]) + + for task_definition in latest_task_definitions.keys(): + task_definition_arn = f"{task_definition}:{latest_task_definitions[task_definition]}" + task_definition = client.describe_task_definition(taskDefinition=task_definition_arn)["taskDefinition"] + containers = task_definition["containerDefinitions"] + + for container in containers: + if container.get("memory") == None: + non_compliant_resources.append(task_definition["taskDefinitionArn"]) + break + else: + compliant_resources.append(task_definition["taskDefinitionArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def ecs_task_definition_nonroot_user(): + compliant_resources = [] + non_compliant_resources = [] + task_definitions = client.list_task_definitions(status="ACTIVE")["taskDefinitionArns"] + latest_task_definitions = {} + + for task_definition in task_definitions: + if latest_task_definitions.get(task_definition.rsplit(":", 1)[0], 0) < int(task_definition.rsplit(":", 1)[1]): + latest_task_definitions[task_definition.rsplit(":", 1)[0]] = int(task_definition.rsplit(":", 1)[1]) + + for task_definition in latest_task_definitions.keys(): + task_definition_arn = f"{task_definition}:{latest_task_definitions[task_definition]}" + task_definition = client.describe_task_definition(taskDefinition=task_definition_arn)["taskDefinition"] + containers = task_definition["containerDefinitions"] + + for container in containers: + if container.get("user") in [None, "root"]: + non_compliant_resources.append(task_definition["taskDefinitionArn"]) + break + else: + compliant_resources.append(task_definition["taskDefinitionArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, )