Source code for awsenergylabelercli.validators

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: validators.py
#
# Copyright 2022 Theodoor Scholte, Costas Tyfoxylos, Jenda Brands
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
#  DEALINGS IN THE SOFTWARE.
#

"""
Main code for validators.

.. _Google Python Style Guide:
   http://google.github.io/styleguide/pyguide.html

"""

import argparse
import json
import logging
import os
import re
from argparse import ArgumentTypeError
from pathlib import Path

from schema import SchemaUnexpectedTypeError, SchemaError
from awsenergylabelerlib import (is_valid_account_id,
                                 is_valid_region,
                                 SECURITY_HUB_ACTIVE_REGIONS)
from awsenergylabelerlib.schemas import account_thresholds_schema, zone_thresholds_schema

from .awsenergylabelercliexceptions import MutuallyExclusiveArguments, MissingRequiredArguments

__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''08-04-2022'''
__copyright__ = '''Copyright 2022, Costas Tyfoxylos'''
__credits__ = ["Theodoor Scholte", "Costas Tyfoxylos", "Jenda Brands"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".

# This is the main prefix used for logging
LOGGER_BASENAME = '''validators'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())


[docs] def aws_account_id(account_id): """Setting a type for an account id argument.""" if not is_valid_account_id(account_id): raise ArgumentTypeError(f'Account id {account_id} provided does not seem to be valid.') return account_id
[docs] def security_hub_region(region): """Setting a type for a security hub region.""" if not is_valid_region(region): raise ArgumentTypeError(f'Region {region} provided does not seem to be valid, valid regions are ' f'{SECURITY_HUB_ACTIVE_REGIONS}.') return region
[docs] def character_delimited_list_variable(value): """Support for environment variables with characters delimiting a list of value.""" delimiting_characters = '[,|\\s]' result = [entry for entry in re.split(delimiting_characters, str(value)) if entry] if len(result) == 1: return result[0] return result
[docs] def environment_variable_boolean(value): """Parses an environment variable as a boolean. Args: value: The value of the environment variable. Returns: True if environment variable is one of the supported values, False otherwise. """ if value in [True, 't', 'T', 'true', 'True', 1, '1', 'TRUE']: return True return False
[docs] def positive_integer(value): """Casts an argument to an int and validates that it is a positive number. Args: value: The value to cast. Returns: The positive integer. Raises: ArgumentTypeError: If the argument cannot be cast or if it is a negative number. """ if value is None: return value try: num_value = int(value) except ValueError: num_value = -1 if num_value <= 0: raise ArgumentTypeError(f'{value} is an invalid positive int value') return num_value
[docs] def get_mutually_exclusive_args(*args, required=False): """Test if multiple mutually exclusive arguments are provided.""" set_arguments = [arg for arg in args if arg] if len(set_arguments) > 1: raise MutuallyExclusiveArguments(*set_arguments) if required and not any(set_arguments): raise MissingRequiredArguments() return args
[docs] def default_environment_variable(variable_name): """Closure to pass the variable name to the inline custom Action. Args: variable_name: The variable to look up as environment variable. Returns: The Action object. """ class DefaultEnvVar(argparse.Action): """Default Environment Variable.""" def __init__(self, *args, **kwargs): if variable_name in os.environ: kwargs['default'] = os.environ[variable_name] if kwargs.get('required') and kwargs.get('default'): kwargs['required'] = False super().__init__(*args, **kwargs) def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, values) return DefaultEnvVar
[docs] def json_string(value): """Validates that the provided argument is a valid json string. Args: value: The string to load as json Returns: The json object on success Raises: ArgumentTypeError on error. """ if value is None: return None try: json_value = json.loads(value) except ValueError: raise ArgumentTypeError(f'{value} is an invalid json string.') from None return json_value
[docs] def account_thresholds_config(value): """Validates that the provided string value is an account thresholds configuration. Args: value: The value to validate for an account thresholds configuration. Returns: A valid account configuration. """ config = json_string(value) try: config = account_thresholds_schema.validate(config) except (SchemaUnexpectedTypeError, SchemaError): raise ArgumentTypeError( f'Provided configuration {value} is an invalid accounts thresholds configuration.') from None return config
[docs] def zone_thresholds_config(value): """Validates that the provided string value is a zone thresholds configuration. Args: value: The value to validate for a zone thresholds configuration. Returns: A valid zone configuration. """ config = json_string(value) try: config = zone_thresholds_schema.validate(config) except (SchemaUnexpectedTypeError, SchemaError): raise ArgumentTypeError( f'Provided configuration {value} is an invalid zone thresholds configuration.') from None return config
[docs] class OverridingArgument(argparse.Action): """Argument that if set will disable all other arguments that are set as required.""" def __call__(self, parser, namespace, values, option_string=None): # If we get here, it means that the argument is set so any other argument that has been configured as required # will have it's required attribute disabled due to this overriding argument being called. for argument in parser._actions: # noqa if argument.required: # this will not log as the logger is set up up after the parsing of arguments. Message is left as # documentation and can be turned into a print statement for debugging. LOGGER.info(f'Argument {argument.dest} is required, overriding that to not required due to argument ' f'{self.dest} set as overriding argument which will disable all other required arguments.') argument.required = False # if we get here there has been an argument provided so to support flag arguments if no actual value has been # provided we set the argument to True. Assumption is that the argument has been configured with nargs=0. values = True if not values else values setattr(namespace, self.dest, values)
[docs] def valid_local_file(local_path): """Validates an argparse argument to be an existing local file. Args: local_path: The path provided as an argument. Returns: The local path if the file exists. Raises: ArgumentTypeError: If the file does not exist. """ path_file = Path(local_path) if not path_file.exists(): raise ArgumentTypeError(f'Local file path "{local_path}" provided, does not exist.') return path_file.resolve()