diff --git a/services/api_gw.py b/services/api_gw.py index aec3c94..5ae5266 100644 --- a/services/api_gw.py +++ b/services/api_gw.py @@ -2,181 +2,188 @@ from models import RuleCheckResult import boto3 -v2_client = boto3.client('apigatewayv2') -v1_client = boto3.client('apigateway') -wafv2_client = boto3.client('wafv2') +v1_client = boto3.client("apigateway") +v2_client = boto3.client("apigatewayv2") + def api_gwv2_access_logs_enabled(): apis = v2_client.get_apis() - compliant_resources=[] - non_compliant_resources=[] - if 'Items' in apis: - for api in apis['Items']: - stages = v2_client.get_stages( - ApiId=api['ApiId'], - ) - if 'Items' not in stages: continue - noncompliant_stage_names = [ - stage['StageName'] - for stage in stages['Items'] - if 'AccessLogSettings' not in stage - ] - if noncompliant_stage_names: - identity = api['Name'] + " | " + ", ".join(noncompliant_stage_names) - non_compliant_resources.append(identity) - else: - compliant_resources.append(api['name']) + compliant_resources = [] + non_compliant_resources = [] + + for api in apis["Items"]: + stages = v2_client.get_stages( + ApiId=api["ApiId"], + ) + + non_compliant_resources += [ + f"{api['Name']} / {stage['StageName']}" + for stage in stages["Items"] + if "AccessLogSettings" not in stage + ] + + compliant_resources += list( + set([f"{api['Name']} / {stage['StageName']}" for stage in stages["Items"]]) + - set(non_compliant_resources) + ) + return RuleCheckResult( passed=not non_compliant_resources, compliant_resources=compliant_resources, - non_compliant_resources=non_compliant_resources + non_compliant_resources=non_compliant_resources, ) def api_gwv2_authorization_type_configured(): apis = v2_client.get_apis() - compliant_resources=[] - non_compliant_resources=[] - if 'Items' in apis: - for api in apis['Items']: - response = v2_client.get_routes( - ApiId=api['ApiId'], - ) - noncomplaint_routes = [ - route['RouteKey'] - for route in response['Items'] - if route['AuthorizationType'] == "NONE" - ] - if noncomplaint_routes: - identity = api['Name'] + " | " + ', '.join(noncomplaint_routes) - non_compliant_resources.append(identity) - else: - compliant_resources.append(api['Name']) + compliant_resources = [] + non_compliant_resources = [] + + for api in apis["Items"]: + response = v2_client.get_routes( + ApiId=api["ApiId"], + ) + + non_compliant_resources += [ + f"{api['Name']} / {route['RouteKey']}" + for route in response["Items"] + if route["AuthorizationType"] == "NONE" + ] + + compliant_resources += list( + set([f"{api['Name']} / {route['RouteKey']}" for route in response["Items"]]) + - set(non_compliant_resources) + ) + return RuleCheckResult( passed=not non_compliant_resources, compliant_resources=compliant_resources, - non_compliant_resources=non_compliant_resources + non_compliant_resources=non_compliant_resources, ) def api_gw_associated_with_waf(): apis = v1_client.get_rest_apis() - compliant_resources=[] - non_compliant_resources=[] - if 'items' in apis: - for api in apis['items']: - stages = v1_client.get_stages( - restApiId=api['id'], - ) - if 'item' in stages: - for stage in stages['item']: - stage_arn = f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" - response = wafv2_client.get_web_acl_for_resource( - ResourceArn=stage_arn - ) - if 'WebACL' in response: compliant_resources.append(stage_arn) - else: non_compliant_resources.append(stage_arn) + compliant_resources = [] + non_compliant_resources = [] + + for api in apis["items"]: + stages = v1_client.get_stages( + restApiId=api["id"], + ) + + for stage in stages["item"]: + stage_arn = f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" + + if "webAclArn" in stage: + compliant_resources.append(stage_arn) + else: + non_compliant_resources.append(stage_arn) + return RuleCheckResult( passed=not non_compliant_resources, compliant_resources=compliant_resources, - non_compliant_resources=non_compliant_resources + non_compliant_resources=non_compliant_resources, ) def api_gw_cache_enabled_and_encrypted(): apis = v1_client.get_rest_apis() - compliant_resources=[] - non_compliant_resources=[] - if 'items' in apis: - for api in apis['items']: - stages = v1_client.get_stages( - restApiId=api['id'], + compliant_resources = [] + non_compliant_resources = [] + + for api in apis["items"]: + stages = v1_client.get_stages( + restApiId=api["id"], + ) + + non_compliant_resources += [ + f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" + for stage in stages["item"] + if not "*/*" in stage["methodSettings"] + or ( + not stage["methodSettings"]["*/*"]["cachingEnabled"] + or not stage["methodSettings"]["*/*"]["cacheDataEncrypted"] ) - if 'item' in stages: - non_compliant_resources.extend([ + ] + compliant_resources += list( + set( + [ f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" - for stage in stages['item'] - if not stage['methodSettings'] - ]) - non_compliant_resources.extend([ - f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" - for stage in stages['item'] - for _,settings in stage['methodSettings'].items() - if not settings['cachingEnabled'] - or not settings['cacheDataEncrypted'] - ]) - compliant_resources.extend([ - f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" - for stage in stages['item'] - for _,settings in stage['methodSettings'].items() - if settings['cachingEnabled'] - and settings['cacheDataEncrypted'] - ]) + for stage in stages["item"] + ] + ) + - set(non_compliant_resources) + ) + return RuleCheckResult( passed=not non_compliant_resources, compliant_resources=compliant_resources, - non_compliant_resources=non_compliant_resources + non_compliant_resources=non_compliant_resources, ) def api_gw_execution_logging_enabled(): apis = v1_client.get_rest_apis() - compliant_resources=[] - non_compliant_resources=[] - if 'items' in apis: - for api in apis['items']: - stages = v1_client.get_stages( - restApiId=api['id'], + compliant_resources = [] + non_compliant_resources = [] + for api in apis["items"]: + stages = v1_client.get_stages( + restApiId=api["id"], + ) + + non_compliant_resources += [ + f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" + for stage in stages["item"] + if not "*/*" in stage["methodSettings"] + or ( + not "loggingLevel" in stage["methodSettings"]["*/*"] + or stage["methodSettings"]["*/*"]["loggingLevel"] == "OFF" ) - if 'item' in stages: - non_compliant_resources.extend([ + ] + compliant_resources += list( + set( + [ f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" - for stage in stages['item'] - if not stage['methodSettings'] - ]) - non_compliant_resources.extend([ - f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" - for stage in stages['item'] - for _,settings in stage['methodSettings'].items() - if not (settings['loggingLevel'] == "INFO" - or settings['loggingLevel'] == "ERROR") - ]) - compliant_resources.extend([ - f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" - for stage in stages['item'] - for _,settings in stage['methodSettings'].items() - if settings['loggingLevel'] == "INFO" - or settings['loggingLevel'] == "ERROR" - ]) + for stage in stages["item"] + ] + ) + - set(non_compliant_resources) + ) + return RuleCheckResult( passed=not non_compliant_resources, compliant_resources=compliant_resources, - non_compliant_resources=non_compliant_resources + non_compliant_resources=non_compliant_resources, ) def api_gw_xray_enabled(): apis = v1_client.get_rest_apis() - compliant_resources=[] - non_compliant_resources=[] - if 'items' in apis: - for api in apis['items']: - stages = v1_client.get_stages( - restApiId=api['id'], + compliant_resources = [] + non_compliant_resources = [] + for api in apis["items"]: + stages = v1_client.get_stages( + restApiId=api["id"], + ) + + non_compliant_resources += [ + f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" + for stage in stages["item"] + if not stage["tracingEnabled"] + ] + compliant_resources += list( + set( + [ + f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" + for stage in stages["item"] + ] ) - if 'item' in stages: - non_compliant_resources.extend([ - f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" - for stage in stages['item'] - if not stage['tracingEnabled'] - ]) - compliant_resources.extend([ - f"arn:aws:apigateway:{v1_client.meta.region_name}::/restapis/{api['id']}/stages/{stage['stageName']}" - for stage in stages['item'] - if stage['tracingEnabled'] - ]) + - set(non_compliant_resources) + ) + return RuleCheckResult( passed=not non_compliant_resources, compliant_resources=compliant_resources, - non_compliant_resources=non_compliant_resources + non_compliant_resources=non_compliant_resources, )