From d377addf7bafeb79f722afe4785825b3d65139ae Mon Sep 17 00:00:00 2001 From: Juwon Date: Tue, 6 Aug 2024 15:36:50 +0900 Subject: [PATCH] Add feature: EKS bp check --- services/eks.py | 55 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/services/eks.py b/services/eks.py index 87017e9..0fd18d3 100644 --- a/services/eks.py +++ b/services/eks.py @@ -2,28 +2,67 @@ from models import RuleCheckResult import boto3 -# client = boto3.client("") +client = boto3.client("eks") def eks_cluster_logging_enabled(): + clusters = client.list_clusters()["clusters"] + compliant_resource = [] + non_compliant_resources = [] + + for cluster in clusters: + response = client.describe_cluster(name=cluster)["cluster"] + if ( + len(response["logging"]["clusterLogging"][0]["types"]) == 5 + and response["logging"]["clusterLogging"][0]["enabled"] == True + ): + compliant_resource.append(response["arn"]) + else: + non_compliant_resources.append(response["arn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resource, + non_compliant_resources=non_compliant_resources, ) def eks_cluster_secrets_encrypted(): + clusters = client.list_clusters()["clusters"] + compliant_resource = [] + non_compliant_resources = [] + + for cluster in clusters: + response = client.describe_cluster(name=cluster)["cluster"] + if ( + "encryptionConfig" in response + and "secrets" in response["encryptionConfig"][0]["resources"] + ): + compliant_resource.append(response["arn"]) + else: + non_compliant_resources.append(response["arn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resource, + non_compliant_resources=non_compliant_resources, ) def eks_endpoint_no_public_access(): - return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] - ) + clusters = client.list_clusters()["clusters"] + compliant_resource = [] + non_compliant_resources = [] + for cluster in clusters: + response = client.describe_cluster(name=cluster)["cluster"] + if response["resourcesVpcConfig"]["endpointPublicAccess"] == False: + compliant_resource.append(response["arn"]) + else: + non_compliant_resources.append(response["arn"]) -def eks_secrets_encrypted(): return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resource, + non_compliant_resources=non_compliant_resources, )