bp-check/services/elasticache.py

116 lines
4.0 KiB
Python
Raw Permalink Normal View History

2024-08-14 01:05:06 +00:00
from models import RuleCheckResult, RuleChecker
from functools import cached_property
2024-08-05 02:30:34 +00:00
import boto3
2024-08-14 01:05:06 +00:00
class ElastiCacheRuleChecker(RuleChecker):
def __init__(self):
self.client = boto3.client("elasticache")
@cached_property
def clusters(self):
return self.client.describe_cache_clusters()["CacheClusters"]
@cached_property
def replication_groups(self):
return self.client.describe_replication_groups()["ReplicationGroups"]
def elasticache_auto_minor_version_upgrade_check(self):
compliant_resource = []
non_compliant_resources = []
for cluster in self.clusters:
if cluster["AutoMinorVersionUpgrade"]:
compliant_resource.append(cluster["ARN"])
else:
non_compliant_resources.append(cluster["ARN"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
)
def elasticache_redis_cluster_automatic_backup_check(self):
compliant_resource = []
non_compliant_resources = []
for replication_group in self.replication_groups:
if "SnapshottingClusterId" in replication_group:
compliant_resource.append(replication_group["ARN"])
else:
non_compliant_resources.append(replication_group["ARN"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
)
def elasticache_repl_grp_auto_failover_enabled(self):
compliant_resource = []
non_compliant_resources = []
for replication_group in self.replication_groups:
if replication_group["AutomaticFailover"] == "enabled":
compliant_resource.append(replication_group["ARN"])
else:
non_compliant_resources.append(replication_group["ARN"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
)
def elasticache_repl_grp_encrypted_at_rest(self):
compliant_resource = []
non_compliant_resources = []
for replication_group in self.replication_groups:
if replication_group["AtRestEncryptionEnabled"] == True:
compliant_resource.append(replication_group["ARN"])
else:
non_compliant_resources.append(replication_group["ARN"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
)
def elasticache_repl_grp_encrypted_in_transit(self):
compliant_resource = []
non_compliant_resources = []
for replication_group in self.replication_groups:
if replication_group["TransitEncryptionEnabled"] == True:
compliant_resource.append(replication_group["ARN"])
else:
non_compliant_resources.append(replication_group["ARN"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
)
def elasticache_subnet_group_check(self):
compliant_resource = []
non_compliant_resources = []
for cluster in self.clusters:
if cluster["CacheSubnetGroupName"] != "default":
compliant_resource.append(cluster["ARN"])
else:
non_compliant_resources.append(cluster["ARN"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
)
rule_checker = ElastiCacheRuleChecker