145 lines
5.1 KiB
Python
145 lines
5.1 KiB
Python
from models import RuleCheckResult, RuleChecker
|
|
from functools import cached_property
|
|
import boto3
|
|
|
|
|
|
class WAFv2RuleChecker(RuleChecker):
|
|
def __init__(self):
|
|
self.client = boto3.client("wafv2")
|
|
self.global_client = boto3.client("wafv2", region_name="us-east-1")
|
|
|
|
@cached_property
|
|
def regional_web_acls(self):
|
|
return self.client.list_web_acls(Scope="REGIONAL")["WebACLs"]
|
|
|
|
@cached_property
|
|
def cloudfront_web_acls(self):
|
|
return self.global_client.list_web_acls(Scope="CLOUDFRONT")["WebACLs"]
|
|
|
|
@cached_property
|
|
def regional_rule_groups(self):
|
|
rule_groups = self.client.list_rule_groups(Scope="REGIONAL")["RuleGroups"]
|
|
return [
|
|
self.client.get_rule_group(ARN=rule_group["ARN"])["RuleGroup"]
|
|
for rule_group in rule_groups
|
|
]
|
|
|
|
@cached_property
|
|
def cloudfront_rule_groups(self):
|
|
rule_groups = self.global_client.list_rule_groups(Scope="CLOUDFRONT")[
|
|
"RuleGroups"
|
|
]
|
|
return [
|
|
self.global_client.get_rule_group(ARN=rule_group["ARN"])["RuleGroup"]
|
|
for rule_group in rule_groups
|
|
]
|
|
|
|
def wafv2_logging_enabled(self):
|
|
compliant_resources = []
|
|
non_compliant_resources = []
|
|
|
|
for web_acl in self.regional_web_acls:
|
|
try:
|
|
configuration = self.client.get_logging_configuration(
|
|
ResourceArn=web_acl["ARN"]
|
|
)
|
|
compliant_resources.append(web_acl["ARN"])
|
|
except Exception as e:
|
|
if e.__class__.__name__ == "WAFNonexistentItemException":
|
|
non_compliant_resources.append(web_acl["ARN"])
|
|
else:
|
|
raise e
|
|
|
|
for web_acl in self.cloudfront_web_acls:
|
|
try:
|
|
configuration = self.global_client.get_logging_configuration(
|
|
ResourceArn=web_acl["ARN"]
|
|
)
|
|
compliant_resources.append(web_acl["ARN"])
|
|
except Exception as e:
|
|
if e.__class__.__name__ == "WAFNonexistentItemException":
|
|
non_compliant_resources.append(web_acl["ARN"])
|
|
else:
|
|
raise e
|
|
|
|
return RuleCheckResult(
|
|
passed=not non_compliant_resources,
|
|
compliant_resources=compliant_resources,
|
|
non_compliant_resources=non_compliant_resources,
|
|
)
|
|
|
|
def wafv2_rulegroup_logging_enabled(self):
|
|
compliant_resources = []
|
|
non_compliant_resources = []
|
|
|
|
for rule_group in self.regional_rule_groups:
|
|
if rule_group["VisibilityConfig"]["CloudWatchMetricsEnabled"] == True:
|
|
compliant_resources.append(rule_group["ARN"])
|
|
else:
|
|
non_compliant_resources.append(rule_group["ARN"])
|
|
|
|
for rule_group in self.cloudfront_rule_groups:
|
|
if rule_group["VisibilityConfig"]["CloudWatchMetricsEnabled"] == True:
|
|
compliant_resources.append(rule_group["ARN"])
|
|
else:
|
|
non_compliant_resources.append(rule_group["ARN"])
|
|
|
|
return RuleCheckResult(
|
|
passed=not non_compliant_resources,
|
|
compliant_resources=compliant_resources,
|
|
non_compliant_resources=non_compliant_resources,
|
|
)
|
|
|
|
def wafv2_rulegroup_not_empty(self):
|
|
compliant_resources = []
|
|
non_compliant_resources = []
|
|
|
|
for rule_group in self.regional_rule_groups:
|
|
if len(rule_group["Rules"]) > 0:
|
|
compliant_resources.append(rule_group["ARN"])
|
|
else:
|
|
non_compliant_resources.append(rule_group["ARN"])
|
|
|
|
for rule_group in self.cloudfront_rule_groups:
|
|
if len(rule_group["Rules"]) > 0:
|
|
compliant_resources.append(rule_group["ARN"])
|
|
else:
|
|
non_compliant_resources.append(rule_group["ARN"])
|
|
|
|
return RuleCheckResult(
|
|
passed=not non_compliant_resources,
|
|
compliant_resources=compliant_resources,
|
|
non_compliant_resources=non_compliant_resources,
|
|
)
|
|
|
|
def wafv2_webacl_not_empty(self):
|
|
compliant_resources = []
|
|
non_compliant_resources = []
|
|
|
|
for web_acl in self.regional_web_acls:
|
|
response = self.client.get_web_acl(
|
|
Id=web_acl["Id"], Name=web_acl["Name"], Scope="REGIONAL"
|
|
)
|
|
if len(response["WebACL"]["Rules"]) > 0:
|
|
compliant_resources.append(web_acl["ARN"])
|
|
else:
|
|
non_compliant_resources.append(web_acl["ARN"])
|
|
|
|
for web_acl in self.cloudfront_web_acls:
|
|
response = self.global_client.get_web_acl(
|
|
Id=web_acl["Id"], Name=web_acl["Name"], Scope="CLOUDFRONT"
|
|
)
|
|
if len(response["WebACL"]["Rules"]) > 0:
|
|
compliant_resources.append(web_acl["ARN"])
|
|
else:
|
|
non_compliant_resources.append(web_acl["ARN"])
|
|
|
|
return RuleCheckResult(
|
|
passed=not non_compliant_resources,
|
|
compliant_resources=compliant_resources,
|
|
non_compliant_resources=non_compliant_resources,
|
|
)
|
|
|
|
|
|
rule_checker = WAFv2RuleChecker
|