diff --git a/services/ecr.py b/services/ecr.py index 93b0254..00701a6 100644 --- a/services/ecr.py +++ b/services/ecr.py @@ -1,29 +1,82 @@ from models import RuleCheckResult import boto3 +import botocore -# client = boto3.client("") +client = boto3.client("ecr") def ecr_private_image_scanning_enabled(): + repositories = client.describe_repositories() + compliant_resource = [] + non_compliant_resources = [] + + for repository in repositories["repositories"]: + if repository["imageScanningConfiguration"]["scanOnPush"] == True: + compliant_resource.append(repository["repositoryArn"]) + else: + non_compliant_resources.append(repository["repositoryArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resource, + non_compliant_resources=non_compliant_resources, ) def ecr_private_lifecycle_policy_configured(): + repositories = client.describe_repositories() + compliant_resource = [] + non_compliant_resources = [] + + for repository in repositories["repositories"]: + try: + response = client.get_lifecycle_policy( + registryId=repository["registryId"], + repositoryName=repository["repositoryName"], + ) + compliant_resource.append(repository["repositoryArn"]) + except botocore.errorfactory.LifecyclePolicyNotFoundException: + non_compliant_resources.append(repository["repositoryArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resource, + non_compliant_resources=non_compliant_resources, ) def ecr_private_tag_immutability_enabled(): + repositories = client.describe_repositories() + compliant_resource = [] + non_compliant_resources = [] + + for repository in repositories["repositories"]: + if repository["imageTagMutability"] == "IMMUTABLE": + compliant_resource.append(repository["repositoryArn"]) + else: + non_compliant_resources.append(repository["repositoryArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resource, + non_compliant_resources=non_compliant_resources, ) def ecr_kms_encryption_1(): + repositories = client.describe_repositories() + compliant_resource = [] + non_compliant_resources = [] + + for repository in repositories["repositories"]: + if repository["encryptionConfiguration"]["encryptionType"] == "KMS": + compliant_resource.append(repository["repositoryArn"]) + else: + non_compliant_resources.append(repository["repositoryArn"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resource, + non_compliant_resources=non_compliant_resources, )