From 2c7f90455d04019f085bc300ed000b70a44c13d1 Mon Sep 17 00:00:00 2001 From: skyuecx0630 <48788794+skyuecx0630@users.noreply.github.com> Date: Wed, 7 Aug 2024 20:59:15 +0900 Subject: [PATCH] Add global resources check --- services/wafv2.py | 76 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 20 deletions(-) diff --git a/services/wafv2.py b/services/wafv2.py index ce7455f..6fc4292 100644 --- a/services/wafv2.py +++ b/services/wafv2.py @@ -2,23 +2,35 @@ from models import RuleCheckResult import boto3 -client = boto3.client("wafv2", region_name="us-east-1") - -cloudfront_client = boto3.client("cloudfront", "us-east-1") +client = boto3.client("wafv2") +global_client = boto3.client("wafv2", region_name="us-east-1") def wafv2_logging_enabled(): compliant_resources = [] non_compliant_resources = [] - webacls = client.list_web_acls(Scope="REGIONAL")["WebACLs"] + regional_web_acls = client.list_web_acls(Scope="REGIONAL")["WebACLs"] + cloudfront_web_acls = global_client.list_web_acls(Scope="CLOUDFRONT")["WebACLs"] - for webacl in webacls: - print(webacl["ARN"]) - configuration = client.get_logging_configuration(ResourceArn=webacl["ARN"]) - if configuration["LoggingConfiguration"] != []: - compliant_resources.append(webacl["ARN"]) - else: - non_compliant_resources.append(webacl["ARN"]) + for web_acl in regional_web_acls: + try: + configuration = client.get_logging_configuration(ResourceArn=web_acl["ARN"]) + compliant_resources.append(web_acl["ARN"]) + except Exception as e: + if e.__class__.__name__ == "WAFNonexistentItemException": + non_compliant_resources.append(web_acl["ARN"]) + else: + raise e + + for web_acl in cloudfront_web_acls: + try: + configuration = global_client.get_logging_configuration(ResourceArn=web_acl["ARN"]) + compliant_resources.append(web_acl["ARN"]) + except Exception as e: + if e.__class__.__name__ == "WAFNonexistentItemException": + non_compliant_resources.append(web_acl["ARN"]) + else: + raise e return RuleCheckResult( passed=not non_compliant_resources, @@ -30,15 +42,24 @@ def wafv2_logging_enabled(): def wafv2_rulegroup_logging_enabled(): compliant_resources = [] non_compliant_resources = [] - rule_groups = client.list_rule_groups(Scope="REGIONAL")["RuleGroups"] + regional_rule_groups = client.list_rule_groups(Scope="REGIONAL")["RuleGroups"] + cloudfront_rule_groups = global_client.list_rule_groups(Scope="CLOUDFRONT")["RuleGroups"] - for rule_group in rule_groups: + + for rule_group in regional_rule_groups: configuration = client.get_rule_group(ARN=rule_group["ARN"]) if configuration["RuleGroup"]["VisibilityConfig"]["CloudWatchMetricsEnabled"] == True: compliant_resources.append(rule_group["ARN"]) else: non_compliant_resources.append(rule_group["ARN"]) + for rule_group in cloudfront_rule_groups: + configuration = global_client.get_rule_group(ARN=rule_group["ARN"]) + if configuration["RuleGroup"]["VisibilityConfig"]["CloudWatchMetricsEnabled"] == True: + compliant_resources.append(rule_group["ARN"]) + else: + non_compliant_resources.append(rule_group["ARN"]) + return RuleCheckResult( passed=not non_compliant_resources, compliant_resources=compliant_resources, @@ -49,15 +70,23 @@ def wafv2_rulegroup_logging_enabled(): def wafv2_rulegroup_not_empty(): compliant_resources = [] non_compliant_resources = [] - rule_groups = client.list_rule_groups(Scope="REGIONAL")["RuleGroups"] + regional_rule_groups = client.list_rule_groups(Scope="REGIONAL")["RuleGroups"] + cloudfront_rule_groups = global_client.list_rule_groups(Scope="CLOUDFRONT")["RuleGroups"] - for rule_group in rule_groups: + for rule_group in regional_rule_groups: configuration = client.get_rule_group(ARN=rule_group["ARN"]) if len(configuration["RuleGroup"]["Rules"]) > 0: compliant_resources.append(rule_group["ARN"]) else: non_compliant_resources.append(rule_group["ARN"]) + for rule_group in cloudfront_rule_groups: + configuration = global_client.get_rule_group(ARN=rule_group["ARN"]) + if len(configuration["RuleGroup"]["Rules"]) > 0: + compliant_resources.append(rule_group["ARN"]) + else: + non_compliant_resources.append(rule_group["ARN"]) + return RuleCheckResult( passed=not non_compliant_resources, compliant_resources=compliant_resources, @@ -68,14 +97,21 @@ def wafv2_rulegroup_not_empty(): def wafv2_webacl_not_empty(): compliant_resources = [] non_compliant_resources = [] - webacls = client.list_web_acls(Scope="REGIONAL")["WebACLs"] + regional_web_acls = client.list_web_acls(Scope="REGIONAL")["WebACLs"] + cloudfront_web_acls = global_client.list_web_acls(Scope="CLOUDFRONT")["WebACLs"] - for webacl in webacls: - response = client.get_web_acl(Id=webacl["Id"], Name=webacl["Name"], Scope="REGIONAL") + for web_acl in regional_web_acls: + response = client.get_web_acl(Id=web_acl["Id"], Name=web_acl["Name"], Scope="REGIONAL") if len(response["WebACL"]["Rules"]) > 0: - compliant_resources.append(webacl["ARN"]) + compliant_resources.append(web_acl["ARN"]) else: - non_compliant_resources.append(webacl["ARN"]) + non_compliant_resources.append(web_acl["ARN"]) + for web_acl in cloudfront_web_acls: + response = global_client.get_web_acl(Id=web_acl["Id"], Name=web_acl["Name"], Scope="CLOUDFRONT") + if len(response["WebACL"]["Rules"]) > 0: + compliant_resources.append(web_acl["ARN"]) + else: + non_compliant_resources.append(web_acl["ARN"]) return RuleCheckResult( passed=not non_compliant_resources,