bp-check/services/rds.py

279 lines
9.2 KiB
Python

from models import RuleCheckResult
import datetime
from dateutil.tz import tzlocal
import boto3
client = boto3.client("rds")
backup_client = boto3.client("backup")
ec2_client = boto3.client("ec2")
def aurora_last_backup_recovery_point_created():
compliant_resources = []
non_compliant_resources = []
clusters = client.describe_db_clusters()["DBClusters"]
for cluster in clusters:
recovery_points = backup_client.list_recovery_points_by_resource(ResourceArn=cluster["DBClusterArn"])[
"RecoveryPoints"
]
recovery_point_creation_dates = sorted([i["CreationDate"] for i in recovery_points])
if datetime.datetime.now(tz=tzlocal()) - recovery_point_creation_dates[-1] < datetime.timedelta(days=1):
compliant_resources.append(cluster["DBClusterArn"])
else:
non_compliant_resources.append(cluster["DBClusterArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def aurora_mysql_backtracking_enabled():
compliant_resources = []
non_compliant_resources = []
clusters = client.describe_db_clusters()["DBClusters"]
for cluster in clusters:
if cluster["Engine"] == "aurora-mysql" and cluster.get("EarliestBacktrackTime", None) == None:
non_compliant_resources.append(cluster["DBClusterArn"])
else:
compliant_resources.append(cluster["DBClusterArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def db_instance_backup_enabled():
compliant_resources = []
non_compliant_resources = []
clusters = client.describe_db_clusters()["DBClusters"]
for cluster in clusters:
if "BackupRetentionPeriod" in cluster:
compliant_resources.append(cluster["DBClusterArn"])
else:
non_compliant_resources.append(cluster["DBClusterArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def rds_cluster_auto_minor_version_upgrade_enable():
compliant_resources = []
non_compliant_resources = []
clusters = client.describe_db_clusters()["DBClusters"]
for cluster in clusters:
if cluster["Engine"] == "docdb" or cluster.get("AutoMinorVersionUpgrade"):
compliant_resources.append(cluster["DBClusterArn"])
else:
non_compliant_resources.append(cluster["DBClusterArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def rds_cluster_default_admin_check():
compliant_resources = []
non_compliant_resources = []
clusters = client.describe_db_clusters()["DBClusters"]
for cluster in clusters:
if cluster["MasterUsername"] not in ["admin", "postgres"]:
compliant_resources.append(cluster["DBClusterArn"])
else:
non_compliant_resources.append(cluster["DBClusterArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def rds_cluster_deletion_protection_enabled():
compliant_resources = []
non_compliant_resources = []
clusters = client.describe_db_clusters()["DBClusters"]
for cluster in clusters:
if cluster["DeletionProtection"]:
compliant_resources.append(cluster["DBClusterArn"])
else:
non_compliant_resources.append(cluster["DBClusterArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def rds_cluster_encrypted_at_rest():
compliant_resources = []
non_compliant_resources = []
clusters = client.describe_db_clusters()["DBClusters"]
for cluster in clusters:
if cluster["StorageEncrypted"]:
compliant_resources.append(cluster["DBClusterArn"])
else:
non_compliant_resources.append(cluster["DBClusterArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def rds_cluster_iam_authentication_enabled():
compliant_resources = []
non_compliant_resources = []
clusters = client.describe_db_clusters()["DBClusters"]
for cluster in clusters:
if cluster["Engine"] == "docdb" or cluster.get("IAMDatabaseAuthenticationEnabled"):
compliant_resources.append(cluster["DBClusterArn"])
else:
non_compliant_resources.append(cluster["DBClusterArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def rds_cluster_multi_az_enabled():
compliant_resources = []
non_compliant_resources = []
clusters = client.describe_db_clusters()["DBClusters"]
for cluster in clusters:
if len(cluster.get("AvailabilityZones", [])) > 1:
compliant_resources.append(cluster["DBClusterArn"])
else:
non_compliant_resources.append(cluster["DBClusterArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def rds_db_security_group_not_allowed():
compliant_resources = []
non_compliant_resources = []
clusters = client.describe_db_clusters()["DBClusters"]
security_groups = ec2_client.describe_security_groups()["SecurityGroups"]
default_security_group_ids = [i["GroupId"] for i in security_groups if i["GroupName"] == "default"]
for cluster in clusters:
db_security_groups = [i["VpcSecurityGroupId"] for i in cluster["VpcSecurityGroups"] if i["Status"] == "active"]
for default_security_group_id in default_security_group_ids:
if default_security_group_id in db_security_groups:
non_compliant_resources.append(cluster["DBClusterArn"])
break
else:
compliant_resources.append(cluster["DBClusterArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def rds_enhanced_monitoring_enabled():
compliant_resources = []
non_compliant_resources = []
instances = client.describe_db_instances()["DBInstances"]
for instance in instances:
if instance.get("MonitoringInterval", 0):
compliant_resources.append(instance["DBInstanceArn"])
else:
non_compliant_resources.append(instance["DBInstanceArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def rds_instance_public_access_check():
compliant_resources = []
non_compliant_resources = []
instances = client.describe_db_instances()["DBInstances"]
for instance in instances:
if instance["PubliclyAccessible"]:
non_compliant_resources.append(instance["DBInstanceArn"])
else:
compliant_resources.append(instance["DBInstanceArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def rds_logging_enabled():
compliant_resources = []
non_compliant_resources = []
clusters = client.describe_db_clusters()["DBClusters"]
logs_for_engine = {
"aurora-mysql": ["audit", "error", "general", "slowquery"],
"aurora-postgresql": ["postgresql"],
"docdb": ["audit", "profiler"]
}
for cluster in clusters:
if sorted(cluster["EnabledCloudwatchLogsExports"]) == logs_for_engine.get(cluster["Engine"]):
compliant_resources.append(cluster["DBClusterArn"])
else:
non_compliant_resources.append(cluster["DBClusterArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def rds_snapshot_encrypted():
compliant_resources = []
non_compliant_resources = []
cluster_snapshots = client.describe_db_cluster_snapshots()["DBClusterSnapshots"]
for snapshot in cluster_snapshots:
if snapshot.get("StorageEncrypted") == True:
compliant_resources.append(snapshot["DBClusterSnapshotArn"])
else:
non_compliant_resources.append(snapshot["DBClusterSnapshotArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)