bp-check/services/_lambda.py
2024-08-07 08:20:48 +00:00

92 lines
2.8 KiB
Python

from models import RuleCheckResult
import boto3
import json
client = boto3.client("lambda")
iam_client = boto3.client("iam")
def lambda_dlq_check():
compliant_resource = []
non_compliant_resources = []
functions = client.list_functions()["Functions"]
for function in functions:
if "DeadLetterConfig" in function:
compliant_resource.append(function["FunctionArn"])
else:
non_compliant_resources.append(function["FunctionArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
)
def lambda_function_public_access_prohibited():
compliant_resource = []
non_compliant_resources = []
functions = client.list_functions()["Functions"]
for function in functions:
try:
policy = json.loads(client.get_policy(FunctionName=function["FunctionName"])["Policy"])
for statement in policy["Statement"]:
if statement["Principal"] in ["*", "", '{"AWS": ""}', '{"AWS": "*"}']:
non_compliant_resources.append(function["FunctionArn"])
break
else:
compliant_resource.append(function["FunctionArn"])
except Exception as e:
if e.__class__.__name__ == "ResourceNotFoundException":
non_compliant_resources.append(function["FunctionArn"])
else:
raise e
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
)
def lambda_function_settings_check():
compliant_resource = []
non_compliant_resources = []
functions = client.list_functions()["Functions"]
default_timeout = 3
default_memory_size = 128
for function in functions:
if function["Timeout"] == default_timeout or function["MemorySize"] == default_memory_size:
non_compliant_resources.append(function["FunctionArn"])
else:
compliant_resource.append(function["FunctionArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
)
def lambda_inside_vpc():
compliant_resource = []
non_compliant_resources = []
functions = client.list_functions()["Functions"]
for function in functions:
if "VpcConfig" in function:
compliant_resource.append(function["FunctionArn"])
else:
non_compliant_resources.append(function["FunctionArn"])
return RuleCheckResult(
passed=not non_compliant_resources,
compliant_resources=compliant_resource,
non_compliant_resources=non_compliant_resources,
)