You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			339 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			339 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
"""schema is a library for validating Python data structures, such as those
 | 
						|
obtained from config-files, forms, external services or command-line
 | 
						|
parsing, converted from JSON/YAML (or something else) to Python data-types."""
 | 
						|
 | 
						|
import re
 | 
						|
 | 
						|
__version__ = '0.6.6'
 | 
						|
__all__ = ['Schema',
 | 
						|
           'And', 'Or', 'Regex', 'Optional', 'Use',
 | 
						|
           'SchemaError',
 | 
						|
           'SchemaWrongKeyError',
 | 
						|
           'SchemaMissingKeyError',
 | 
						|
           'SchemaUnexpectedTypeError']
 | 
						|
 | 
						|
 | 
						|
class SchemaError(Exception):
 | 
						|
    """Error during Schema validation."""
 | 
						|
 | 
						|
    def __init__(self, autos, errors=None):
 | 
						|
        self.autos = autos if type(autos) is list else [autos]
 | 
						|
        self.errors = errors if type(errors) is list else [errors]
 | 
						|
        Exception.__init__(self, self.code)
 | 
						|
 | 
						|
    @property
 | 
						|
    def code(self):
 | 
						|
        """
 | 
						|
        Removes duplicates values in auto and error list.
 | 
						|
        parameters.
 | 
						|
        """
 | 
						|
        def uniq(seq):
 | 
						|
            """
 | 
						|
            Utility function that removes duplicate.
 | 
						|
            """
 | 
						|
            seen = set()
 | 
						|
            seen_add = seen.add
 | 
						|
            # This way removes duplicates while preserving the order.
 | 
						|
            return [x for x in seq if x not in seen and not seen_add(x)]
 | 
						|
        data_set = uniq(i for i in self.autos if i is not None)
 | 
						|
        error_list = uniq(i for i in self.errors if i is not None)
 | 
						|
        if error_list:
 | 
						|
            return '\n'.join(error_list)
 | 
						|
        return '\n'.join(data_set)
 | 
						|
 | 
						|
 | 
						|
class SchemaWrongKeyError(SchemaError):
 | 
						|
    """Error Should be raised when an unexpected key is detected within the
 | 
						|
    data set being."""
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class SchemaMissingKeyError(SchemaError):
 | 
						|
    """Error should be raised when a mandatory key is not found within the
 | 
						|
    data set being vaidated"""
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class SchemaUnexpectedTypeError(SchemaError):
 | 
						|
    """Error should be raised when a type mismatch is detected within the
 | 
						|
    data set being validated."""
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class And(object):
 | 
						|
    """
 | 
						|
    Utility function to combine validation directives in AND Boolean fashion.
 | 
						|
    """
 | 
						|
    def __init__(self, *args, **kw):
 | 
						|
        self._args = args
 | 
						|
        assert set(kw).issubset(['error', 'schema', 'ignore_extra_keys'])
 | 
						|
        self._error = kw.get('error')
 | 
						|
        self._ignore_extra_keys = kw.get('ignore_extra_keys', False)
 | 
						|
        # You can pass your inherited Schema class.
 | 
						|
        self._schema = kw.get('schema', Schema)
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return '%s(%s)' % (self.__class__.__name__,
 | 
						|
                           ', '.join(repr(a) for a in self._args))
 | 
						|
 | 
						|
    def validate(self, data):
 | 
						|
        """
 | 
						|
        Validate data using defined sub schema/expressions ensuring all
 | 
						|
        values are valid.
 | 
						|
        :param data: to be validated with sub defined schemas.
 | 
						|
        :return: returns validated data
 | 
						|
        """
 | 
						|
        for s in [self._schema(s, error=self._error,
 | 
						|
                               ignore_extra_keys=self._ignore_extra_keys)
 | 
						|
                  for s in self._args]:
 | 
						|
            data = s.validate(data)
 | 
						|
        return data
 | 
						|
 | 
						|
 | 
						|
class Or(And):
 | 
						|
    """Utility function to combine validation directives in a OR Boolean
 | 
						|
    fashion."""
 | 
						|
    def validate(self, data):
 | 
						|
        """
 | 
						|
        Validate data using sub defined schema/expressions ensuring at least
 | 
						|
        one value is valid.
 | 
						|
        :param data: data to be validated by provided schema.
 | 
						|
        :return: return validated data if not validation
 | 
						|
        """
 | 
						|
        x = SchemaError([], [])
 | 
						|
        for s in [self._schema(s, error=self._error,
 | 
						|
                               ignore_extra_keys=self._ignore_extra_keys)
 | 
						|
                  for s in self._args]:
 | 
						|
            try:
 | 
						|
                return s.validate(data)
 | 
						|
            except SchemaError as _x:
 | 
						|
                x = _x
 | 
						|
        raise SchemaError(['%r did not validate %r' % (self, data)] + x.autos,
 | 
						|
                          [self._error.format(data) if self._error else None] +
 | 
						|
                          x.errors)
 | 
						|
 | 
						|
 | 
						|
class Regex(object):
 | 
						|
    """
 | 
						|
    Enables schema.py to validate string using regular expressions.
 | 
						|
    """
 | 
						|
    # Map all flags bits to a more readable description
 | 
						|
    NAMES = ['re.ASCII', 're.DEBUG', 're.VERBOSE', 're.UNICODE', 're.DOTALL',
 | 
						|
             're.MULTILINE', 're.LOCALE', 're.IGNORECASE', 're.TEMPLATE']
 | 
						|
 | 
						|
    def __init__(self, pattern_str, flags=0, error=None):
 | 
						|
        self._pattern_str = pattern_str
 | 
						|
        flags_list = [Regex.NAMES[i] for i, f in  # Name for each bit
 | 
						|
                      enumerate('{0:09b}'.format(flags)) if f != '0']
 | 
						|
 | 
						|
        if flags_list:
 | 
						|
            self._flags_names = ', flags=' + '|'.join(flags_list)
 | 
						|
        else:
 | 
						|
            self._flags_names = ''
 | 
						|
 | 
						|
        self._pattern = re.compile(pattern_str, flags=flags)
 | 
						|
        self._error = error
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return '%s(%r%s)' % (
 | 
						|
            self.__class__.__name__, self._pattern_str, self._flags_names
 | 
						|
        )
 | 
						|
 | 
						|
    def validate(self, data):
 | 
						|
        """
 | 
						|
        Validated data using defined regex.
 | 
						|
        :param data: data to be validated
 | 
						|
        :return: return validated data.
 | 
						|
        """
 | 
						|
        e = self._error
 | 
						|
 | 
						|
        try:
 | 
						|
            if self._pattern.search(data):
 | 
						|
                return data
 | 
						|
            else:
 | 
						|
                raise SchemaError('%r does not match %r' % (self, data), e)
 | 
						|
        except TypeError:
 | 
						|
            raise SchemaError('%r is not string nor buffer' % data, e)
 | 
						|
 | 
						|
 | 
						|
class Use(object):
 | 
						|
    """
 | 
						|
    For more general use cases, you can use the Use class to transform
 | 
						|
    the data while it is being validate.
 | 
						|
    """
 | 
						|
    def __init__(self, callable_, error=None):
 | 
						|
        assert callable(callable_)
 | 
						|
        self._callable = callable_
 | 
						|
        self._error = error
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return '%s(%r)' % (self.__class__.__name__, self._callable)
 | 
						|
 | 
						|
    def validate(self, data):
 | 
						|
        try:
 | 
						|
            return self._callable(data)
 | 
						|
        except SchemaError as x:
 | 
						|
            raise SchemaError([None] + x.autos,
 | 
						|
                              [self._error.format(data)
 | 
						|
                               if self._error else None] + x.errors)
 | 
						|
        except BaseException as x:
 | 
						|
            f = _callable_str(self._callable)
 | 
						|
            raise SchemaError('%s(%r) raised %r' % (f, data, x),
 | 
						|
                              self._error.format(data)
 | 
						|
                              if self._error else None)
 | 
						|
 | 
						|
 | 
						|
COMPARABLE, CALLABLE, VALIDATOR, TYPE, DICT, ITERABLE = range(6)
 | 
						|
 | 
						|
 | 
						|
def _priority(s):
 | 
						|
    """Return priority for a given object."""
 | 
						|
    if type(s) in (list, tuple, set, frozenset):
 | 
						|
        return ITERABLE
 | 
						|
    if type(s) is dict:
 | 
						|
        return DICT
 | 
						|
    if issubclass(type(s), type):
 | 
						|
        return TYPE
 | 
						|
    if hasattr(s, 'validate'):
 | 
						|
        return VALIDATOR
 | 
						|
    if callable(s):
 | 
						|
        return CALLABLE
 | 
						|
    else:
 | 
						|
        return COMPARABLE
 | 
						|
 | 
						|
 | 
						|
class Schema(object):
 | 
						|
    """
 | 
						|
    Entry point of the library, use this class to instantiate validation
 | 
						|
    schema for the data that will be validated.
 | 
						|
    """
 | 
						|
    def __init__(self, schema, error=None, ignore_extra_keys=False):
 | 
						|
        self._schema = schema
 | 
						|
        self._error = error
 | 
						|
        self._ignore_extra_keys = ignore_extra_keys
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return '%s(%r)' % (self.__class__.__name__, self._schema)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _dict_key_priority(s):
 | 
						|
        """Return priority for a given key object."""
 | 
						|
        if isinstance(s, Optional):
 | 
						|
            return _priority(s._schema) + 0.5
 | 
						|
        return _priority(s)
 | 
						|
 | 
						|
    def validate(self, data):
 | 
						|
        Schema = self.__class__
 | 
						|
        s = self._schema
 | 
						|
        e = self._error
 | 
						|
        i = self._ignore_extra_keys
 | 
						|
        flavor = _priority(s)
 | 
						|
        if flavor == ITERABLE:
 | 
						|
            data = Schema(type(s), error=e).validate(data)
 | 
						|
            o = Or(*s, error=e, schema=Schema, ignore_extra_keys=i)
 | 
						|
            return type(data)(o.validate(d) for d in data)
 | 
						|
        if flavor == DICT:
 | 
						|
            data = Schema(dict, error=e).validate(data)
 | 
						|
            new = type(data)()  # new - is a dict of the validated values
 | 
						|
            coverage = set()  # matched schema keys
 | 
						|
            # for each key and value find a schema entry matching them, if any
 | 
						|
            sorted_skeys = sorted(s, key=self._dict_key_priority)
 | 
						|
            for key, value in data.items():
 | 
						|
                for skey in sorted_skeys:
 | 
						|
                    svalue = s[skey]
 | 
						|
                    try:
 | 
						|
                        nkey = Schema(skey, error=e).validate(key)
 | 
						|
                    except SchemaError:
 | 
						|
                        pass
 | 
						|
                    else:
 | 
						|
                        try:
 | 
						|
                            nvalue = Schema(svalue, error=e,
 | 
						|
                                            ignore_extra_keys=i).validate(value)
 | 
						|
                        except SchemaError as x:
 | 
						|
                            k = "Key '%s' error:" % nkey
 | 
						|
                            raise SchemaError([k] + x.autos, [e] + x.errors)
 | 
						|
                        else:
 | 
						|
                            new[nkey] = nvalue
 | 
						|
                            coverage.add(skey)
 | 
						|
                            break
 | 
						|
            required = set(k for k in s if type(k) is not Optional)
 | 
						|
            if not required.issubset(coverage):
 | 
						|
                missing_keys = required - coverage
 | 
						|
                s_missing_keys = \
 | 
						|
                    ', '.join(repr(k) for k in sorted(missing_keys, key=repr))
 | 
						|
                raise \
 | 
						|
                    SchemaMissingKeyError('Missing keys: ' + s_missing_keys, e)
 | 
						|
            if not self._ignore_extra_keys and (len(new) != len(data)):
 | 
						|
                wrong_keys = set(data.keys()) - set(new.keys())
 | 
						|
                s_wrong_keys = \
 | 
						|
                    ', '.join(repr(k) for k in sorted(wrong_keys, key=repr))
 | 
						|
                raise \
 | 
						|
                    SchemaWrongKeyError(
 | 
						|
                        'Wrong keys %s in %r' % (s_wrong_keys, data),
 | 
						|
                        e.format(data) if e else None)
 | 
						|
 | 
						|
            # Apply default-having optionals that haven't been used:
 | 
						|
            defaults = set(k for k in s if type(k) is Optional and
 | 
						|
                           hasattr(k, 'default')) - coverage
 | 
						|
            for default in defaults:
 | 
						|
                new[default.key] = default.default
 | 
						|
 | 
						|
            return new
 | 
						|
        if flavor == TYPE:
 | 
						|
            if isinstance(data, s):
 | 
						|
                return data
 | 
						|
            else:
 | 
						|
                raise SchemaUnexpectedTypeError(
 | 
						|
                    '%r should be instance of %r' % (data, s.__name__),
 | 
						|
                    e.format(data) if e else None)
 | 
						|
        if flavor == VALIDATOR:
 | 
						|
            try:
 | 
						|
                return s.validate(data)
 | 
						|
            except SchemaError as x:
 | 
						|
                raise SchemaError([None] + x.autos, [e] + x.errors)
 | 
						|
            except BaseException as x:
 | 
						|
                raise SchemaError(
 | 
						|
                    '%r.validate(%r) raised %r' % (s, data, x),
 | 
						|
                    self._error.format(data) if self._error else None)
 | 
						|
        if flavor == CALLABLE:
 | 
						|
            f = _callable_str(s)
 | 
						|
            try:
 | 
						|
                if s(data):
 | 
						|
                    return data
 | 
						|
            except SchemaError as x:
 | 
						|
                raise SchemaError([None] + x.autos, [e] + x.errors)
 | 
						|
            except BaseException as x:
 | 
						|
                raise SchemaError(
 | 
						|
                    '%s(%r) raised %r' % (f, data, x),
 | 
						|
                    self._error.format(data) if self._error else None)
 | 
						|
            raise SchemaError('%s(%r) should evaluate to True' % (f, data), e)
 | 
						|
        if s == data:
 | 
						|
            return data
 | 
						|
        else:
 | 
						|
            raise SchemaError('%r does not match %r' % (s, data),
 | 
						|
                              e.format(data) if e else None)
 | 
						|
 | 
						|
 | 
						|
class Optional(Schema):
 | 
						|
    """Marker for an optional part of the validation Schema."""
 | 
						|
    _MARKER = object()
 | 
						|
 | 
						|
    def __init__(self, *args, **kwargs):
 | 
						|
        default = kwargs.pop('default', self._MARKER)
 | 
						|
        super(Optional, self).__init__(*args, **kwargs)
 | 
						|
        if default is not self._MARKER:
 | 
						|
            # See if I can come up with a static key to use for myself:
 | 
						|
            if _priority(self._schema) != COMPARABLE:
 | 
						|
                raise TypeError(
 | 
						|
                    'Optional keys with defaults must have simple, '
 | 
						|
                    'predictable values, like literal strings or ints. '
 | 
						|
                    '"%r" is too complex.' % (self._schema,))
 | 
						|
            self.default = default
 | 
						|
            self.key = self._schema
 | 
						|
 | 
						|
 | 
						|
def _callable_str(callable_):
 | 
						|
    if hasattr(callable_, '__name__'):
 | 
						|
        return callable_.__name__
 | 
						|
    return str(callable_)
 |