Add global resources check

This commit is contained in:
skyuecx0630 2024-08-07 20:59:15 +09:00
parent 43d8059c17
commit 2c7f90455d

View File

@ -2,23 +2,35 @@ from models import RuleCheckResult
import boto3 import boto3
client = boto3.client("wafv2", region_name="us-east-1") client = boto3.client("wafv2")
global_client = boto3.client("wafv2", region_name="us-east-1")
cloudfront_client = boto3.client("cloudfront", "us-east-1")
def wafv2_logging_enabled(): def wafv2_logging_enabled():
compliant_resources = [] compliant_resources = []
non_compliant_resources = [] non_compliant_resources = []
webacls = client.list_web_acls(Scope="REGIONAL")["WebACLs"] regional_web_acls = client.list_web_acls(Scope="REGIONAL")["WebACLs"]
cloudfront_web_acls = global_client.list_web_acls(Scope="CLOUDFRONT")["WebACLs"]
for webacl in webacls: for web_acl in regional_web_acls:
print(webacl["ARN"]) try:
configuration = client.get_logging_configuration(ResourceArn=webacl["ARN"]) configuration = client.get_logging_configuration(ResourceArn=web_acl["ARN"])
if configuration["LoggingConfiguration"] != []: compliant_resources.append(web_acl["ARN"])
compliant_resources.append(webacl["ARN"]) except Exception as e:
else: if e.__class__.__name__ == "WAFNonexistentItemException":
non_compliant_resources.append(webacl["ARN"]) non_compliant_resources.append(web_acl["ARN"])
else:
raise e
for web_acl in cloudfront_web_acls:
try:
configuration = global_client.get_logging_configuration(ResourceArn=web_acl["ARN"])
compliant_resources.append(web_acl["ARN"])
except Exception as e:
if e.__class__.__name__ == "WAFNonexistentItemException":
non_compliant_resources.append(web_acl["ARN"])
else:
raise e
return RuleCheckResult( return RuleCheckResult(
passed=not non_compliant_resources, passed=not non_compliant_resources,
@ -30,15 +42,24 @@ def wafv2_logging_enabled():
def wafv2_rulegroup_logging_enabled(): def wafv2_rulegroup_logging_enabled():
compliant_resources = [] compliant_resources = []
non_compliant_resources = [] non_compliant_resources = []
rule_groups = client.list_rule_groups(Scope="REGIONAL")["RuleGroups"] regional_rule_groups = client.list_rule_groups(Scope="REGIONAL")["RuleGroups"]
cloudfront_rule_groups = global_client.list_rule_groups(Scope="CLOUDFRONT")["RuleGroups"]
for rule_group in rule_groups:
for rule_group in regional_rule_groups:
configuration = client.get_rule_group(ARN=rule_group["ARN"]) configuration = client.get_rule_group(ARN=rule_group["ARN"])
if configuration["RuleGroup"]["VisibilityConfig"]["CloudWatchMetricsEnabled"] == True: if configuration["RuleGroup"]["VisibilityConfig"]["CloudWatchMetricsEnabled"] == True:
compliant_resources.append(rule_group["ARN"]) compliant_resources.append(rule_group["ARN"])
else: else:
non_compliant_resources.append(rule_group["ARN"]) non_compliant_resources.append(rule_group["ARN"])
for rule_group in cloudfront_rule_groups:
configuration = global_client.get_rule_group(ARN=rule_group["ARN"])
if configuration["RuleGroup"]["VisibilityConfig"]["CloudWatchMetricsEnabled"] == True:
compliant_resources.append(rule_group["ARN"])
else:
non_compliant_resources.append(rule_group["ARN"])
return RuleCheckResult( return RuleCheckResult(
passed=not non_compliant_resources, passed=not non_compliant_resources,
compliant_resources=compliant_resources, compliant_resources=compliant_resources,
@ -49,15 +70,23 @@ def wafv2_rulegroup_logging_enabled():
def wafv2_rulegroup_not_empty(): def wafv2_rulegroup_not_empty():
compliant_resources = [] compliant_resources = []
non_compliant_resources = [] non_compliant_resources = []
rule_groups = client.list_rule_groups(Scope="REGIONAL")["RuleGroups"] regional_rule_groups = client.list_rule_groups(Scope="REGIONAL")["RuleGroups"]
cloudfront_rule_groups = global_client.list_rule_groups(Scope="CLOUDFRONT")["RuleGroups"]
for rule_group in rule_groups: for rule_group in regional_rule_groups:
configuration = client.get_rule_group(ARN=rule_group["ARN"]) configuration = client.get_rule_group(ARN=rule_group["ARN"])
if len(configuration["RuleGroup"]["Rules"]) > 0: if len(configuration["RuleGroup"]["Rules"]) > 0:
compliant_resources.append(rule_group["ARN"]) compliant_resources.append(rule_group["ARN"])
else: else:
non_compliant_resources.append(rule_group["ARN"]) non_compliant_resources.append(rule_group["ARN"])
for rule_group in cloudfront_rule_groups:
configuration = global_client.get_rule_group(ARN=rule_group["ARN"])
if len(configuration["RuleGroup"]["Rules"]) > 0:
compliant_resources.append(rule_group["ARN"])
else:
non_compliant_resources.append(rule_group["ARN"])
return RuleCheckResult( return RuleCheckResult(
passed=not non_compliant_resources, passed=not non_compliant_resources,
compliant_resources=compliant_resources, compliant_resources=compliant_resources,
@ -68,14 +97,21 @@ def wafv2_rulegroup_not_empty():
def wafv2_webacl_not_empty(): def wafv2_webacl_not_empty():
compliant_resources = [] compliant_resources = []
non_compliant_resources = [] non_compliant_resources = []
webacls = client.list_web_acls(Scope="REGIONAL")["WebACLs"] regional_web_acls = client.list_web_acls(Scope="REGIONAL")["WebACLs"]
cloudfront_web_acls = global_client.list_web_acls(Scope="CLOUDFRONT")["WebACLs"]
for webacl in webacls: for web_acl in regional_web_acls:
response = client.get_web_acl(Id=webacl["Id"], Name=webacl["Name"], Scope="REGIONAL") response = client.get_web_acl(Id=web_acl["Id"], Name=web_acl["Name"], Scope="REGIONAL")
if len(response["WebACL"]["Rules"]) > 0: if len(response["WebACL"]["Rules"]) > 0:
compliant_resources.append(webacl["ARN"]) compliant_resources.append(web_acl["ARN"])
else: else:
non_compliant_resources.append(webacl["ARN"]) non_compliant_resources.append(web_acl["ARN"])
for web_acl in cloudfront_web_acls:
response = global_client.get_web_acl(Id=web_acl["Id"], Name=web_acl["Name"], Scope="CLOUDFRONT")
if len(response["WebACL"]["Rules"]) > 0:
compliant_resources.append(web_acl["ARN"])
else:
non_compliant_resources.append(web_acl["ARN"])
return RuleCheckResult( return RuleCheckResult(
passed=not non_compliant_resources, passed=not non_compliant_resources,