From ce09ebb98d57007210f8a3f0d58097ba6cdef41d Mon Sep 17 00:00:00 2001 From: Juwon Date: Wed, 7 Aug 2024 14:02:45 +0900 Subject: [PATCH] Add feature : WAFv2 bp check --- services/wafv2.py | 65 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 5 deletions(-) diff --git a/services/wafv2.py b/services/wafv2.py index 2831d00..bc8de08 100644 --- a/services/wafv2.py +++ b/services/wafv2.py @@ -2,28 +2,83 @@ from models import RuleCheckResult import boto3 -# client = boto3.client("") +client = boto3.client("wafv2", region_name="us-east-1") + +cloudfront_client = boto3.client("cloudfront") def wafv2_logging_enabled(): + compliant_resources = [] + non_compliant_resources = [] + webacls = client.list_web_acls(Scope="REGIONAL")["WebACLs"] + + for webacl in webacls: + print(webacl["ARN"]) + configuration = client.get_logging_configuration(ResourceArn=webacl["ARN"]) + if configuration["LoggingConfiguration"] != []: + compliant_resources.append(webacl["ARN"]) + else: + non_compliant_resources.append(webacl["ARN"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def wafv2_rulegroup_logging_enabled(): + compliant_resources = [] + non_compliant_resources = [] + rule_groups = client.list_rule_groups(Scope="REGIONAL")["RuleGroups"] + + for rule_group in rule_groups: + configuration = client.get_rule_group(ARN=rule_group["ARN"]) + if configuration["RuleGroup"]["VisibilityConfig"]["CloudWatchMetricsEnabled"] == True: + compliant_resources.append(rule_group["ARN"]) + else: + non_compliant_resources.append(rule_group["ARN"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def wafv2_rulegroup_not_empty(): + compliant_resources = [] + non_compliant_resources = [] + rule_groups = client.list_rule_groups(Scope="REGIONAL")["RuleGroups"] + + for rule_group in rule_groups: + configuration = client.get_rule_group(ARN=rule_group["ARN"]) + if len(configuration["RuleGroup"]["Rules"]) > 0: + compliant_resources.append(rule_group["ARN"]) + else: + non_compliant_resources.append(rule_group["ARN"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, ) def wafv2_webacl_not_empty(): + compliant_resources = [] + non_compliant_resources = [] + webacls = client.list_web_acls(Scope="REGIONAL")["WebACLs"] + + for webacl in webacls: + response = client.get_web_acl(Id=webacl["Id"], Name=webacl["Name"], Scope="REGIONAL") + if len(response["WebACL"]["Rules"]) > 0: + compliant_resources.append(webacl["ARN"]) + else: + non_compliant_resources.append(webacl["ARN"]) + return RuleCheckResult( - passed=False, compliant_resources=[], non_compliant_resources=[] + passed=not non_compliant_resources, + compliant_resources=compliant_resources, + non_compliant_resources=non_compliant_resources, )