#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: awsenergylabelercli.py
#
# Copyright 2022 Theodoor Scholte, Costas Tyfoxylos, Jenda Brands
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
#
"""
Main code for awsenergylabelercli.
.. _Google Python Style Guide:
http://google.github.io/styleguide/pyguide.html
"""
import argparse
import json
import logging
import logging.config
import hashlib
import os
import coloredlogs
from yaspin import yaspin
from awsenergylabelerlib import (validate_regions,
validate_account_ids,
AwsAccount,
EnergyLabeler,
DestinationPath,
SecurityHub,
ACCOUNT_THRESHOLDS,
ZONE_THRESHOLDS,
DEFAULT_SECURITY_HUB_FILTER,
DEFAULT_SECURITY_HUB_FRAMEWORKS,
AWS_FOUNDATIONAL_SECURITY_FRAMEWORK,
CIS_AWS_FOUNDATION_FRAMEWORK,
PCI_DSS_FRAMEWORK,
ALL_ZONE_EXPORT_TYPES,
ZONE_METRIC_EXPORT_TYPES,
ALL_ACCOUNT_EXPORT_TYPES,
ACCOUNT_METRIC_EXPORT_TYPES,
InvalidFrameworks,
InvalidAccountListProvided,
InvalidRegionListProvided)
from ._version import __version__ as cli_version
from .awsenergylabelercliexceptions import MissingRequiredArguments, MutuallyExclusiveArguments
from .entities import MetadataEntry
from .validators import (account_thresholds_config,
aws_account_id,
character_delimited_list_variable,
environment_variable_boolean,
default_environment_variable,
get_mutually_exclusive_args,
json_string,
positive_integer,
security_hub_region,
valid_local_file,
zone_thresholds_config,
OverridingArgument)
__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''11-11-2021'''
__copyright__ = '''Copyright 2022, Costas Tyfoxylos'''
__credits__ = ["Theodoor Scholte", "Costas Tyfoxylos", "Jenda Brands"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".
# This is the main prefix used for logging
LOGGER_BASENAME = '''awsenergylabelercli'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())
SUPPRESSED_FINDINGS_QUERY = {'WorkflowStatus': [{'Value': 'SUPPRESSED',
'Comparison': 'EQUALS'}]}
RESOLVED_FINDINGS_QUERY = lambda x: {'UpdatedAt': [{'DateRange': {'Value': x, 'Unit': 'DAYS'}}], # noqa
'WorkflowStatus': [{'Value': 'RESOLVED', 'Comparison': 'EQUALS'}]}
[docs]
def get_parser():
"""Constructs the parser with all the arguments and returns it."""
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser(description='''A cli to label accounts and security zones with energy labels based
on Security Hub findings.''')
parser.add_argument('--log-config',
'-l',
action=default_environment_variable('AWS_LABELER_LOG_CONFIG'),
dest='logger_config',
help='The location of the logging config json file')
parser.add_argument('--log-level',
'-L',
help='Provide the log level. Defaults to info.',
dest='log_level',
action=default_environment_variable('AWS_LABELER_LOG_LEVEL'),
default='info',
choices=['debug',
'info',
'warning',
'error',
'critical'])
parser.add_argument('--region',
'-r',
action=default_environment_variable('AWS_LABELER_REGION'),
type=security_hub_region,
required=True,
help='The home AWS region, default is looking into the environment for either '
'"AWS_LABELER_REGION" or "AWS_DEFAULT_REGION" variables.')
parser.add_argument('--organizations-zone-name',
'-o',
action=default_environment_variable('AWS_LABELER_ORGANIZATIONS_ZONE_NAME'),
help='The name of the Organizations Zone to label. Implies access to organizations api in aws.'
'Mutually exclusive with --single-account-id argument and --audit-zone-name.')
parser.add_argument('--audit-zone-name',
'-z',
action=default_environment_variable('AWS_LABELER_AUDIT_ZONE_NAME'),
help='The name of the Audit Zone to label. Does not need access to organizations api in aws, '
'retrieves accounts from security hub, will not report on the audit account itself.'
'Mutually exclusive with --single-account-id argument and --organizations-zone-name.')
parser.add_argument('--single-account-id',
'-s',
type=aws_account_id,
action=default_environment_variable('AWS_LABELER_SINGLE_ACCOUNT_ID'),
help='Run the labeler on a single account. '
'Mutually exclusive with --organizations-zone-name and '
'--audit-zone-name argument.')
parser.add_argument('--frameworks',
'-f',
default=os.environ.get('AWS_LABELER_FRAMEWORKS', DEFAULT_SECURITY_HUB_FRAMEWORKS),
type=character_delimited_list_variable,
help=f"The list of applicable frameworks: ['{AWS_FOUNDATIONAL_SECURITY_FRAMEWORK}', "
f"'{CIS_AWS_FOUNDATION_FRAMEWORK}', '{PCI_DSS_FRAMEWORK}'], "
f"default={list(DEFAULT_SECURITY_HUB_FRAMEWORKS)}. "
"Setting the flag with an empty string argument will set no frameworks for filters.")
parser.add_argument('--allowed-account-ids',
'-a',
action=default_environment_variable('AWS_LABELER_ALLOWED_ACCOUNT_IDS'),
type=character_delimited_list_variable,
help='A list of AWS Account IDs for which an energy label will be produced. '
'Mutually exclusive with --denied-account-ids and --single-account-id arguments.')
parser.add_argument('--denied-account-ids',
'-d',
action=default_environment_variable('AWS_LABELER_DENIED_ACCOUNT_IDS'),
type=character_delimited_list_variable,
help='A list of AWS Account IDs that will be excluded from producing the energy label. '
'Mutually exclusive with --allowed-account-ids and --single-account-id arguments.')
parser.add_argument('--allowed-regions',
'-ar',
action=default_environment_variable('AWS_LABELER_ALLOWED_REGIONS'),
type=character_delimited_list_variable,
help='A list of AWS regions included in producing the energy label.'
'Mutually exclusive with --denied-regions argument.')
parser.add_argument('--denied-regions',
'-dr',
action=default_environment_variable('AWS_LABELER_DENIED_REGIONS'),
type=character_delimited_list_variable,
help='A list of AWS regions excluded from producing the energy label.'
'Mutually exclusive with --allowed-regions argument.')
parser.add_argument('--export-path',
'-p',
action=default_environment_variable('AWS_LABELER_EXPORT_PATH'),
help='Exports a snapshot of chosen data in '
'JSON formatted files to the specified directory or S3 location.')
parser.add_argument('--export-metrics-only',
'-e',
dest='export_all',
action='store_false',
help='Exports metrics/statistics without sensitive findings data if set, in JSON formatted '
'files to the specified directory or S3 location, default is export all data.')
parser.add_argument('--to-json',
'-j',
action='store_true',
default=environment_variable_boolean(os.environ.get('AWS_LABELER_TO_JSON', False)),
help='Return the report in json format.')
parser.add_argument('--report-closed-findings-days',
'-rd',
action='store',
required=False,
default=os.environ.get('AWS_LABELER_REPORT_CLOSED_FINDINGS_DAYS'),
type=positive_integer,
help='If set the report will contain info on the number of findings that were closed during the'
' provided days count')
parser.add_argument('--report-suppressed-findings',
'-rs',
action='store_true',
default=environment_variable_boolean(os.environ.get('AWS_LABELER_REPORT_SUPPRESSED_FINDINGS',
False)),
help='If set the report will contain info on the number of suppressed findings')
parser.add_argument('--account-thresholds',
'-at',
type=account_thresholds_config,
default=os.environ.get('AWS_LABELER_ACCOUNT_THRESHOLDS'),
help='If set the account thresholds will be used instead of the default ones. Usage of this '
'option will be reported on the report output and the metadata file upon export.')
parser.add_argument('--zone-thresholds',
'-zt',
type=zone_thresholds_config,
default=os.environ.get('AWS_LABELER_ZONE_THRESHOLDS'),
help='If set the zone thresholds will be used instead of the default ones. Usage of this option'
' will be reported on the report output and the metadata file upon export.')
parser.add_argument('--security-hub-query-filter',
'-sf',
type=json_string,
default=os.environ.get('AWS_LABELER_SECURITY_HUB_QUERY_FILTER'),
help='If set, this filter will be used instead of the default built in. Usage of this option '
'will be reported on the report output and the metadata file upon export. Usage of the '
'allowed ips and denied ips options will still affect the filter as well as the default '
'set frameworks. If no framework filtering is needed the built in default frameworks can '
'be overriden by calling the "-f" option with "" as an argument.')
parser.add_argument('--disable-spinner',
'-ds',
action='store_true',
default=environment_variable_boolean(os.environ.get('AWS_LABELER_DISABLE_SPINNER',
False)),
help='If set spinner will be disabled on the CLI.')
parser.add_argument('--disable-banner',
'-db',
action='store_true',
default=environment_variable_boolean(os.environ.get('AWS_LABELER_DISABLE_BANNER',
False)),
help='If set banner will be disabled on the CLI.')
parser.add_argument('--validate-metadata-file',
'-vm',
action=OverridingArgument,
type=valid_local_file,
help='Validates a metadata file. If this argument is set any other argument is '
'effectively disregarded and only the file provided is processed.')
parser.add_argument('--version',
'-v',
action=OverridingArgument,
nargs=0,
help='Prints the version of the tool. If this argument is set any other argument is effectively'
' disregarded. ')
parser.set_defaults(export_all=True)
return parser
[docs]
def calculate_file_hash(binary_contents):
"""Calculates a hex digest of binary contents.
Args:
binary_contents: The binary object to calculate the hex digest of.
Returns:
The calculated hex digest of the binary object.
"""
hash_object = hashlib.sha256()
hash_object.update(binary_contents)
return hash_object.hexdigest()
[docs]
def get_arguments(arguments=None): # noqa: MC0001
"""
Gets us the cli arguments.
Returns the args as parsed from the argsparser.
"""
parser = get_parser()
args = parser.parse_args(arguments)
if args.version:
parser.exit(0, cli_version)
if args.validate_metadata_file:
# if overriding argument is set then we do not check any other arguments and we exit straight
# after validating the argument
validate_metadata_file(args.validate_metadata_file, parser)
# Since mutual exclusive cannot work with environment variables we need to check explicitly for all pairs of
# mutual relations that are not allowed.
if all([args.allowed_account_ids, args.denied_account_ids]):
parser.error('argument --allowed-account-ids/-a: not allowed with argument --denied-account-ids/-d')
if all([args.allowed_regions, args.denied_regions]):
parser.error('argument --allowed-regions/-ar: not allowed with argument --denied-regions/-dr')
export_metrics_set = environment_variable_boolean(os.environ.get('AWS_LABELER_EXPORT_ONLY_METRICS'))
if export_metrics_set:
args.export_all = False
exclusive_args = [args.organizations_zone_name, args.audit_zone_name, args.single_account_id]
try:
_ = get_mutually_exclusive_args(*exclusive_args, required=True)
except MissingRequiredArguments:
parser.error('one of the arguments --organizations-zone-name/-o --audit-zone-name/-z '
'--single-account-id/-s is required')
except MutuallyExclusiveArguments:
parser.error('arguments --organizations-zone-name/-o --audit-zone-name/-z '
'--single-account-id/-s are mutually exclusive')
exclusive_args = [args.allowed_account_ids, args.denied_account_ids, args.single_account_id]
try:
_ = get_mutually_exclusive_args(*exclusive_args)
except MutuallyExclusiveArguments:
parser.error('arguments --allowed-account-ids/-a --denied-account-ids/-d --single-account-id/-s are '
'mutually exclusive')
try:
SecurityHub.validate_frameworks(args.frameworks)
except InvalidFrameworks:
parser.error(f'{args.frameworks} are not valid supported security hub frameworks. Currently supported '
f'are {SecurityHub.frameworks}')
for argument in ['allowed_account_ids', 'denied_account_ids']:
try:
_ = validate_account_ids(getattr(args, argument))
except InvalidAccountListProvided:
parser.error(f'{getattr(args, argument)} contains invalid account ids.')
for argument in ['allowed_regions', 'denied_regions']:
try:
_ = validate_regions(getattr(args, argument))
except InvalidRegionListProvided:
parser.error(f'{getattr(args, argument)} contains invalid regions.')
if args.export_path and not DestinationPath(args.export_path).is_valid():
parser.error(f'{args.export_path} is an invalid export location. Example --export-path '
f'/a/directory or --export-path s3://mybucket location')
return args
[docs]
def setup_logging(level, config_file=None):
"""Sets up the logging.
Args:
level: At which level do we log
config_file: Configuration to use
"""
# This will configure the logging, if the user has set a config file.
# If there's no config file, logging will default to stdout.
if config_file:
try:
with open(config_file, encoding='utf-8') as conf_file:
configuration = json.loads(conf_file.read())
logging.config.dictConfig(configuration)
except ValueError:
print(f'File "{config_file}" is not valid json, cannot continue.')
raise SystemExit(1) from None
except FileNotFoundError:
print(f'File "{config_file}" does not exist or cannot be read, cannot continue.')
raise SystemExit(1) from None
else:
coloredlogs.install(level=level.upper())
[docs]
def wait_for_findings(method_name, method_argument, log_level, finding_type=None, disable_spinner=False):
"""If log level is not debug shows a spinner while the callable provided gets security hub findings.
Args:
method_name: The method to execute while waiting.
method_argument: The argument to pass to the method.
log_level: The log level as set by the user.
finding_type: The type of the finding to use for the helping message.
disable_spinner: The spinner will be disabled while retrieving the findings.
Returns:
findings: A list of security hub findings as retrieved by the callable.
"""
try:
if all([log_level != 'debug', not disable_spinner]):
with yaspin(text=f"Please wait while retrieving Security Hub{f' {finding_type} ' if finding_type else ' '}"
f"findings...", color="yellow") as spinner:
findings = method_name(method_argument) if method_argument else method_name()
spinner.ok("✅")
else:
findings = method_name(method_argument) if method_argument else method_name()
except Exception as msg:
LOGGER.error(msg)
raise SystemExit(1) from None
return findings
# pylint: disable=too-many-arguments,too-many-locals
[docs]
def get_zone_reporting_data(zone_name,
region,
frameworks,
allowed_account_ids,
denied_account_ids,
allowed_regions,
denied_regions,
export_all_data_flag,
report_closed_findings_days,
report_suppressed_findings,
account_thresholds,
zone_thresholds,
security_hub_query_filter,
log_level,
zone_type,
metadata,
disable_spinner):
"""Gets the reporting data for an organizations zone.
Args:
zone_name: The name of the security zone.
region: The home region of AWS.
frameworks: The frameworks to include in scoring.
allowed_account_ids: The allowed account ids for landing zone inclusion if any.
denied_account_ids: The allowed account ids for landing zone exclusion if any.
allowed_regions: The allowed regions for security hub if any.
denied_regions: The denied regions for security hub if any.
export_all_data_flag: If set all data is going to be exported, else only basic reporting.
report_closed_findings_days:
report_suppressed_findings:
account_thresholds:
zone_thresholds:
log_level: The log level set.
zone_type: The type of zone to label.
metadata: The metadata of the execution, provided to be enriched by the active ones.
disable_spinner: The spinner will be disabled while retrieving the findings.
Returns:
report_data, exporter_arguments
"""
labeler = EnergyLabeler(zone_name=zone_name,
region=region,
account_thresholds=account_thresholds or ACCOUNT_THRESHOLDS,
zone_thresholds=zone_thresholds or ZONE_THRESHOLDS,
security_hub_filter=security_hub_query_filter or DEFAULT_SECURITY_HUB_FILTER,
frameworks=frameworks,
allowed_account_ids=allowed_account_ids,
denied_account_ids=denied_account_ids,
allowed_regions=allowed_regions,
denied_regions=denied_regions,
zone_type=zone_type)
wait_for_findings(EnergyLabeler.security_hub_findings.fget, labeler, log_level, disable_spinner=disable_spinner)
report_data = [['Zone:', labeler.zone.name],
['Zone Security Score:', labeler.zone_energy_label.label],
['Zone Percentage Coverage:', labeler.zone_energy_label.coverage],
['Labeled Accounts Measured:', labeler.labeled_accounts_energy_label.accounts_measured]]
if labeler.zone_energy_label.best_label != labeler.zone_energy_label.worst_label:
report_data.extend([['Best Account Security Score:', labeler.zone_energy_label.best_label],
['Worst Account Security Score:', labeler.zone_energy_label.worst_label]])
if report_closed_findings_days:
LOGGER.warning('Reporting on resolved findings is not functional yet.')
# query_filter = labeler.security_hub.calculate_query_filter(RESOLVED_FINDINGS_QUERY(
# report_closed_findings_days),
# allowed_account_ids=allowed_account_ids,
# denied_account_ids=denied_account_ids,
# frameworks=frameworks)
# resolved_findings = wait_for_findings(labeler.security_hub.get_findings,
# query_filter,
# log_level,
# 'resolved',
# disable_spinner=disable_spinner)
# report_data.append([f'Resolved Findings Last {report_closed_findings_days} Days:', len(resolved_findings)])
if report_suppressed_findings:
query_filter = labeler.security_hub.calculate_query_filter(SUPPRESSED_FINDINGS_QUERY,
allowed_account_ids=allowed_account_ids,
denied_account_ids=denied_account_ids,
frameworks=frameworks)
suppressed_findings = wait_for_findings(labeler.security_hub.get_findings,
query_filter,
log_level,
'suppressed',
disable_spinner=disable_spinner)
metadata.add_entry(MetadataEntry(title='Suppressed Findings:',
value=str(len(suppressed_findings)),
is_report_entry=True))
metadata.add_entry(MetadataEntry(title='Enabled Security Hub Integrations:',
value=labeler.security_hub.enabled_products,
is_report_entry=False))
export_types = ALL_ZONE_EXPORT_TYPES if export_all_data_flag else ZONE_METRIC_EXPORT_TYPES
exporter_arguments = {'export_types': export_types,
'name': labeler.zone.name,
'energy_label': labeler.zone_energy_label.label,
'security_hub_findings': labeler.security_hub_findings,
'labeled_accounts': labeler.zone_labeled_accounts,
'metadata': metadata}
return report_data, exporter_arguments
# pylint: disable=too-many-arguments,too-many-locals
[docs]
def get_account_reporting_data(account_id,
region,
frameworks,
allowed_regions,
denied_regions,
export_all_data_flag,
report_closed_findings_days,
report_suppressed_findings,
account_thresholds,
security_hub_query_filter,
log_level,
metadata,
disable_spinner):
"""Gets the reporting data for a single account.
Args:
account_id: The ID of the account to get reporting on.
region: The home region of AWS.
frameworks: The frameworks to include in scoring.
allowed_regions: The allowed regions for security hub if any.
denied_regions: The denied regions for security hub if any.
export_all_data_flag: If set all data is going to be exported, else only basic reporting.
report_closed_findings_days: The number of days to report the resolved findings for.
report_suppressed_findings: A flag to report on suppressed findings or not.
account_thresholds: The account thresholds to apply.
security_hub_query_filter: The security hub filter to apply.
log_level: The log level set.
metadata: The metadata of the execution, provided to be enriched by the active ones.
disable_spinner: The spinner will be disabled while retrieving the findings.
Returns:
report_data, exporter_arguments
"""
account = AwsAccount(account_id, account_thresholds or ACCOUNT_THRESHOLDS, 'Not Retrieved')
security_hub = SecurityHub(region=region,
allowed_regions=allowed_regions,
denied_regions=denied_regions)
query_filter = SecurityHub.calculate_query_filter(security_hub_query_filter or DEFAULT_SECURITY_HUB_FILTER,
allowed_account_ids=[account_id],
denied_account_ids=None,
frameworks=frameworks)
unfiltered_findings = wait_for_findings(security_hub.get_findings, query_filter, log_level,
disable_spinner=disable_spinner)
security_hub_findings = security_hub.filter_findings_by_frameworks(unfiltered_findings, frameworks)
account.calculate_energy_label(security_hub_findings)
report_data = [['Account ID:', account.id],
['Account Security Score:', account.energy_label.label],
['Number Of Critical Findings:', account.energy_label.number_of_critical_findings],
['Number Of High Findings:', account.energy_label.number_of_high_findings],
['Number Of Medium Findings:', account.energy_label.number_of_medium_findings],
['Number Of Low Findings:', account.energy_label.number_of_low_findings],
['Max Days Open:', account.energy_label.max_days_open]]
if report_closed_findings_days:
LOGGER.warning('Reporting on resolved findings is not functional yet.')
# query_filter = SecurityHub.calculate_query_filter(RESOLVED_FINDINGS_QUERY(report_closed_findings_days),
# allowed_account_ids=[account_id],
# denied_account_ids=None,
# frameworks=frameworks)
# resolved_findings = wait_for_findings(security_hub.get_findings,
# query_filter,
# log_level,
# 'resolved',
# disable_spinner=disable_spinner)
# report_data.append([f'Resolved Findings Last {report_closed_findings_days} Days:', len(resolved_findings)])
if report_suppressed_findings:
query_filter = SecurityHub.calculate_query_filter(SUPPRESSED_FINDINGS_QUERY,
allowed_account_ids=[account_id],
denied_account_ids=None,
frameworks=frameworks)
suppressed_findings = wait_for_findings(security_hub.get_findings, query_filter, log_level, 'suppressed',
disable_spinner=disable_spinner)
metadata.add_entry(MetadataEntry(title='Suppressed Findings:',
value=str(len(suppressed_findings)),
is_report_entry=True))
metadata.add_entry(MetadataEntry(title='Enabled Security Hub Integrations:',
value=security_hub.enabled_products,
is_report_entry=False))
export_types = ALL_ACCOUNT_EXPORT_TYPES if export_all_data_flag else ACCOUNT_METRIC_EXPORT_TYPES
exporter_arguments = {'export_types': export_types,
'name': account.id,
'energy_label': account.energy_label.label,
'security_hub_findings': security_hub_findings,
'labeled_accounts': account,
'metadata': metadata}
return report_data, exporter_arguments