Initial commit

This commit is contained in:
EC2 Default User
2024-08-05 02:30:34 +00:00
commit 5db67720fd
33 changed files with 1921 additions and 0 deletions

28
services/__init__.py Normal file
View File

@ -0,0 +1,28 @@
from . import (
alb,
api_gw,
rds,
asg,
ec2,
cloudfront,
kms,
codeseries,
cloudwatch,
docdb,
dynamodb,
ecr,
ecs,
efs,
eks,
elasticache,
iam,
_lambda,
tags,
route53,
s3,
secrets_manager,
security_hub,
sns,
vpc,
wafv2,
)

29
services/_lambda.py Normal file
View File

@ -0,0 +1,29 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def lambda_dlq_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def lambda_function_public_access_prohibited():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def lambda_function_settings_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def lambda_inside_vpc():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

37
services/alb.py Normal file
View File

@ -0,0 +1,37 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def alb_http_drop_invalid_header_enabled():
return RuleCheckResult(
passed=False,
compliant_resources=[],
non_compliant_resources=[],
)
def alb_waf_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def elb_cross_zone_load_balancing_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def elb_deletion_protection_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def elb_logging_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

41
services/api_gw.py Normal file
View File

@ -0,0 +1,41 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def api_gwv2_access_logs_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def api_gwv2_authorization_type_configured():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def api_gw_associated_with_waf():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def api_gw_cache_enabled_and_encrypted():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def api_gw_execution_logging_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def api_gw_xray_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

17
services/asg.py Normal file
View File

@ -0,0 +1,17 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def autoscaling_group_elb_healthcheck_required():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def autoscaling_multiple_az():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

41
services/cloudfront.py Normal file
View File

@ -0,0 +1,41 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def cloudfront_accesslogs_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def cloudfront_associated_with_waf():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def cloudfront_default_root_object_configured():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def cloudfront_no_deprecated_ssl_protocols():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def cloudfront_s3_origin_access_control_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def cloudfront_viewer_policy_https():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

17
services/cloudwatch.py Normal file
View File

@ -0,0 +1,17 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def cw_loggroup_retention_period_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def cloudwatch_alarm_settings_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

23
services/codeseries.py Normal file
View File

@ -0,0 +1,23 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def codebuild_project_environment_privileged_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def codebuild_project_logging_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def codedeploy_auto_rollback_monitor_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

29
services/docdb.py Normal file
View File

@ -0,0 +1,29 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def docdb_cluster_audit_logging_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def docdb_cluster_backup_retention_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def docdb_cluster_deletion_protection_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def docdb_cluster_encrypted():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

41
services/dynamodb.py Normal file
View File

@ -0,0 +1,41 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def dynamodb_autoscaling_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def dynamodb_last_backup_recovery_point_created():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def dynamodb_pitr_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def dynamodb_table_deletion_protection_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def dynamodb_table_encrypted_kms():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def dynamodb_table_encryption_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

59
services/ec2.py Normal file
View File

@ -0,0 +1,59 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def autoscaling_launch_template():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ec2_ebs_encryption_by_default():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ec2_imdsv2_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ec2_instance_detailed_monitoring_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ec2_instance_managed_by_systems_manager():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ec2_instance_profile_attached():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ec2_no_amazon_key_pair():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ec2_stopped_instance():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ec2_token_hop_limit_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

29
services/ecr.py Normal file
View File

@ -0,0 +1,29 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def ecr_private_image_scanning_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ecr_private_lifecycle_policy_configured():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ecr_private_tag_immutability_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ecr_kms_encryption_1():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

53
services/ecs.py Normal file
View File

@ -0,0 +1,53 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def ecs_awsvpc_networking_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ecs_containers_nonprivileged():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ecs_containers_readonly_access():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ecs_container_insights_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ecs_fargate_latest_platform_version():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ecs_task_definition_log_configuration():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ecs_task_definition_memory_hard_limit():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def ecs_task_definition_nonroot_user():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

35
services/efs.py Normal file
View File

@ -0,0 +1,35 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def efs_access_point_enforce_root_directory():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def efs_access_point_enforce_user_identity():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def efs_automatic_backups_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def efs_encrypted_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def efs_mount_target_public_accessible():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

29
services/eks.py Normal file
View File

@ -0,0 +1,29 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def eks_cluster_logging_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def eks_cluster_secrets_encrypted():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def eks_endpoint_no_public_access():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def eks_secrets_encrypted():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

41
services/elasticache.py Normal file
View File

@ -0,0 +1,41 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def elasticache_auto_minor_version_upgrade_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def elasticache_redis_cluster_automatic_backup_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def elasticache_repl_grp_auto_failover_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def elasticache_repl_grp_encrypted_at_rest():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def elasticache_repl_grp_encrypted_in_transit():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def elasticache_subnet_group_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

23
services/iam.py Normal file
View File

@ -0,0 +1,23 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def iam_policy_no_statements_with_admin_access():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def iam_policy_no_statements_with_full_access():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def iam_role_managed_policy_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

11
services/kms.py Normal file
View File

@ -0,0 +1,11 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def cmk_backing_key_rotation_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

95
services/rds.py Normal file
View File

@ -0,0 +1,95 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def aurora_last_backup_recovery_point_created():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def aurora_mysql_backtracking_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def db_instance_backup_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_cluster_auto_minor_version_upgrade_enable():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_cluster_default_admin_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_cluster_deletion_protection_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_cluster_encrypted_at_rest():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_cluster_iam_authentication_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_cluster_multi_az_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_db_security_group_not_allowed():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_enhanced_monitoring_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_instance_deletion_protection_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_instance_public_access_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_logging_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def rds_snapshot_encrypted():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

11
services/route53.py Normal file
View File

@ -0,0 +1,11 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def route53_query_logging_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

65
services/s3.py Normal file
View File

@ -0,0 +1,65 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def s3_access_point_in_vpc_only():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def s3_bucket_default_lock_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def s3_bucket_level_public_access_prohibited():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def s3_bucket_logging_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def s3_bucket_ssl_requests_only():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def s3_bucket_versioning_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def s3_default_encryption_kms():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def s3_event_notifications_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def s3_last_backup_recovery_point_created():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def s3_lifecycle_policy_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

View File

@ -0,0 +1,23 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def secretsmanager_rotation_enabled_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def secretsmanager_scheduled_rotation_success_check():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def secretsmanager_secret_periodic_rotation():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

11
services/security_hub.py Normal file
View File

@ -0,0 +1,11 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def securityhub_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

17
services/sns.py Normal file
View File

@ -0,0 +1,17 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def sns_encrypted_kms():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def sns_topic_message_delivery_notification_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

11
services/tags.py Normal file
View File

@ -0,0 +1,11 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def required_tags():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)

257
services/vpc.py Normal file
View File

@ -0,0 +1,257 @@
from models import RuleCheckResult
from pprint import pprint
import boto3
ec2 = boto3.client("ec2")
def ec2_transit_gateway_auto_vpc_attach_disabled():
response = ec2.describe_transit_gateways()
non_compliant_resources = [
resource["TransitGatewayArn"]
for resource in filter(
lambda x: x["Options"]["AutoAcceptSharedAttachments"] == "enable",
response["TransitGateways"],
)
]
compliant_resources = list(
set([resource["TransitGatewayArn"] for resource in response["TransitGateways"]])
- set(non_compliant_resources)
)
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def restricted_ssh():
response = ec2.describe_security_group_rules()
non_compliant_resources = [
f'{resource["GroupId"]} / {resource["SecurityGroupRuleId"]}'
for resource in filter(
lambda x: x["IsEgress"] == False
and x["FromPort"] <= 22
and x["ToPort"] >= 22
and x.get("CidrIpv4") == "0.0.0.0/0",
response["SecurityGroupRules"],
)
]
compliant_resources = list(
set(
[
f'{resource["GroupId"]} / {resource["SecurityGroupRuleId"]}'
for resource in response["SecurityGroupRules"]
]
)
- set(non_compliant_resources)
)
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def restricted_common_ports():
common_ports = [
22, # SSH
80, # HTTP
3306, # MySQL
3389, # RDP
5432, # PostgreSQL
6379, # Redis
11211, # Memcached
]
response = ec2.describe_security_group_rules()
non_compliant_resources = [
f'{resource["GroupId"]} / {resource["SecurityGroupRuleId"]}'
for resource in filter(
lambda x: x["IsEgress"] == False
and x["FromPort"] in common_ports
and x["ToPort"] in common_ports
and x.get("PrefixListId") is None,
response["SecurityGroupRules"],
)
]
compliant_resources = list(
set(
f'{resource["GroupId"]} / {resource["SecurityGroupRuleId"]}'
for resource in response["SecurityGroupRules"]
)
- set(non_compliant_resources)
)
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def subnet_auto_assign_public_ip_disabled():
response = ec2.describe_subnets()
non_compliant_resources = [
resource["SubnetId"]
for resource in filter(lambda x: x["MapPublicIpOnLaunch"], response["Subnets"])
]
compliant_resources = list(
set(resource["SubnetId"] for resource in response["Subnets"])
- set(non_compliant_resources)
)
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def vpc_default_security_group_closed():
response = ec2.describe_security_groups(
Filters=[{"Name": "group-name", "Values": ["default"]}]
)
non_compliant_resources = [
resource["GroupId"]
for resource in filter(
lambda x: x["IpPermissions"] or x["IpPermissionsEgress"],
response["SecurityGroups"],
)
]
compliant_resources = list(
set(resource["GroupId"] for resource in response["SecurityGroups"])
- set(non_compliant_resources)
)
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def vpc_flow_logs_enabled():
response = ec2.describe_flow_logs()
flow_log_enabled_vpcs = [
resource["ResourceId"] for resource in response["FlowLogs"]
]
response = ec2.describe_vpcs()
non_compliant_resources = [
resource["VpcId"]
for resource in filter(
lambda x: x["VpcId"] not in flow_log_enabled_vpcs, response["Vpcs"]
)
]
compliant_resources = list(
set(resource["VpcId"] for resource in response["Vpcs"])
- set(non_compliant_resources)
)
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def vpc_network_acl_unused_check():
response = ec2.describe_network_acls()
non_compliant_resources = [
resource["NetworkAclId"]
for resource in filter(lambda x: not x["Associations"], response["NetworkAcls"])
]
compliant_resources = list(
set(resource["NetworkAclId"] for resource in response["NetworkAcls"])
- set(non_compliant_resources)
)
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def vpc_peering_dns_resolution_check():
response = ec2.describe_vpc_peering_connections()
non_compliant_resources = [
resource["VpcPeeringConnectionId"]
for resource in filter(
lambda x: x["Status"]["Code"] not in ["deleted", "deleting"]
and (
not x["AccepterVpcInfo"].get("PeeringOptions")
or not x["AccepterVpcInfo"]["PeeringOptions"][
"AllowDnsResolutionFromRemoteVpc"
]
or not x["RequesterVpcInfo"]["PeeringOptions"][
"AllowDnsResolutionFromRemoteVpc"
]
),
response["VpcPeeringConnections"],
)
]
compliant_resources = list(
set(
resource["VpcPeeringConnectionId"]
for resource in response["VpcPeeringConnections"]
)
- set(non_compliant_resources)
)
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)
def vpc_sg_open_only_to_authorized_ports():
response = ec2.describe_security_group_rules()
authorized_port = [
# 80
]
non_compliant_resources = [
f'{resource["GroupId"]} / {resource["SecurityGroupRuleId"]}'
for resource in filter(
lambda x: x["IsEgress"] == False
and (x.get("CidrIpv4") == "0.0.0.0/0" or x.get("CidrIpv6") == "::/0")
and x["FromPort"] not in authorized_port
and x["ToPort"] not in authorized_port,
response["SecurityGroupRules"],
)
]
compliant_resources = list(
set(
f'{resource["GroupId"]} / {resource["SecurityGroupRuleId"]}'
for resource in response["SecurityGroupRules"]
)
- set(non_compliant_resources)
)
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resources,
non_compliant_resources=non_compliant_resources,
)

29
services/wafv2.py Normal file
View File

@ -0,0 +1,29 @@
from models import RuleCheckResult
import boto3
# client = boto3.client("")
def wafv2_logging_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def wafv2_rulegroup_logging_enabled():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def wafv2_rulegroup_not_empty():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)
def wafv2_webacl_not_empty():
return RuleCheckResult(
passed=False, compliant_resources=[], non_compliant_resources=[]
)