""" This module contains wrappers for Python serialization modules for common formats that make it easier to serialize/deserialize WA Plain Old Data structures (serilizable WA classes implement ``to_pod()``/``from_pod()`` methods for converting between POD structures and Python class instances). The modifications to standard serilization procedures are: - mappings are deserialized as ``OrderedDict``\ 's rather than standard Python ``dict``\ 's. This allows for cleaner syntax in certain parts of WA configuration (e.g. values to be written to files can be specified as a dict, and they will be written in the order specified in the config). - regular expressions are automatically encoded/decoded. This allows for configuration values to be transparently specified as strings or regexes in the POD config. This module exports the "wrapped" versions of serialization libraries, and this should be imported and used instead of importing the libraries directly. i.e. :: from wa.utils.serializer import yaml pod = yaml.load(fh) instead of :: import yaml pod = yaml.load(fh) It's also possible to use the serializer directly:: from wa.utils import serializer pod = serializer.load(fh) This can also be used to ``dump()`` POD structures. By default, ``dump()`` will produce JSON, but ``fmt`` parameter may be used to specify an alternative format (``yaml`` or ``python``). ``load()`` will use the file plugin to guess the format, but ``fmt`` may also be used to specify it explicitly. """ # pylint: disable=unused-argument import os import re import json as _json from collections import OrderedDict from datetime import datetime import yaml as _yaml import dateutil.parser from past.builtins import basestring from wa.framework.exception import SerializerSyntaxError from wa.utils.misc import isiterable from wa.utils.types import regex_type, none_type, level, cpu_mask __all__ = [ 'json', 'yaml', 'read_pod', 'dump', 'load', 'is_pod', 'POD_TYPES', ] POD_TYPES = [ list, tuple, dict, set, basestring, str, int, float, bool, OrderedDict, datetime, regex_type, none_type, level, cpu_mask, ] class WAJSONEncoder(_json.JSONEncoder): def default(self, obj): # pylint: disable=method-hidden if isinstance(obj, regex_type): return 'REGEX:{}:{}'.format(obj.flags, obj.pattern) elif isinstance(obj, datetime): return 'DATET:{}'.format(obj.isoformat()) elif isinstance(obj, level): return 'LEVEL:{}:{}'.format(obj.name, obj.value) elif isinstance(obj, cpu_mask): return 'CPUMASK:{}'.format(obj.mask()) else: return _json.JSONEncoder.default(self, obj) class WAJSONDecoder(_json.JSONDecoder): def decode(self, s, **kwargs): d = _json.JSONDecoder.decode(self, s, **kwargs) def try_parse_object(v): if isinstance(v, basestring): if v.startswith('REGEX:'): _, flags, pattern = v.split(':', 2) return re.compile(pattern, int(flags or 0)) elif v.startswith('DATET:'): _, pattern = v.split(':', 1) return dateutil.parser.parse(pattern) elif v.startswith('LEVEL:'): _, name, value = v.split(':', 2) return level(name, value) elif v.startswith('CPUMASK:'): _, value = v.split(':', 1) return cpu_mask(value) return v def load_objects(d): pairs = [] for k, v in d.items(): if hasattr(v, 'items'): pairs.append((k, load_objects(v))) elif isiterable(v): pairs.append((k, [try_parse_object(i) for i in v])) else: pairs.append((k, try_parse_object(v))) return OrderedDict(pairs) return load_objects(d) class json(object): @staticmethod def dump(o, wfh, indent=4, *args, **kwargs): return _json.dump(o, wfh, cls=WAJSONEncoder, indent=indent, *args, **kwargs) @staticmethod def dumps(o, indent=4, *args, **kwargs): return _json.dumps(o, cls=WAJSONEncoder, indent=indent, *args, **kwargs) @staticmethod def load(fh, *args, **kwargs): try: return _json.load(fh, cls=WAJSONDecoder, object_pairs_hook=OrderedDict, *args, **kwargs) except ValueError as e: raise SerializerSyntaxError(e.message) @staticmethod def loads(s, *args, **kwargs): try: return _json.loads(s, cls=WAJSONDecoder, object_pairs_hook=OrderedDict, *args, **kwargs) except ValueError as e: raise SerializerSyntaxError(e.message) _mapping_tag = _yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG _regex_tag = 'tag:wa:regex' _level_tag = 'tag:wa:level' _cpu_mask_tag = 'tag:wa:cpu_mask' def _wa_dict_representer(dumper, data): return dumper.represent_mapping(_mapping_tag, iter(data.items())) def _wa_regex_representer(dumper, data): text = '{}:{}'.format(data.flags, data.pattern) return dumper.represent_scalar(_regex_tag, text) def _wa_level_representer(dumper, data): text = '{}:{}'.format(data.name, data.level) return dumper.represent_scalar(_level_tag, text) def _wa_cpu_mask_representer(dumper, data): return dumper.represent_scalar(_cpu_mask_tag, data.mask()) def _wa_dict_constructor(loader, node): pairs = loader.construct_pairs(node) seen_keys = set() for k, _ in pairs: if k in seen_keys: raise ValueError('Duplicate entry: {}'.format(k)) seen_keys.add(k) return OrderedDict(pairs) def _wa_regex_constructor(loader, node): value = loader.construct_scalar(node) flags, pattern = value.split(':', 1) return re.compile(pattern, int(flags or 0)) def _wa_level_constructor(loader, node): value = loader.construct_scalar(node) name, value = value.split(':', 1) return level(name, value) def _wa_cpu_mask_constructor(loader, node): value = loader.construct_scalar(node) return cpu_mask(value) _yaml.add_representer(OrderedDict, _wa_dict_representer) _yaml.add_representer(regex_type, _wa_regex_representer) _yaml.add_representer(level, _wa_level_representer) _yaml.add_representer(cpu_mask, _wa_cpu_mask_representer) _yaml.add_constructor(_mapping_tag, _wa_dict_constructor) _yaml.add_constructor(_regex_tag, _wa_regex_constructor) _yaml.add_constructor(_level_tag, _wa_level_constructor) _yaml.add_constructor(_cpu_mask_tag, _wa_cpu_mask_constructor) class yaml(object): @staticmethod def dump(o, wfh, *args, **kwargs): return _yaml.dump(o, wfh, *args, **kwargs) @staticmethod def load(fh, *args, **kwargs): try: return _yaml.load(fh, *args, **kwargs) except _yaml.YAMLError as e: lineno = None if hasattr(e, 'problem_mark'): lineno = e.problem_mark.line # pylint: disable=no-member raise SerializerSyntaxError(e.message, lineno) loads = load class python(object): @staticmethod def dump(o, wfh, *args, **kwargs): raise NotImplementedError() @classmethod def load(cls, fh, *args, **kwargs): return cls.loads(fh.read()) @staticmethod def loads(s, *args, **kwargs): pod = {} try: exec(s, pod) # pylint: disable=exec-used except SyntaxError as e: raise SerializerSyntaxError(e.message, e.lineno) for k in list(pod.keys()): if k.startswith('__'): del pod[k] return pod def read_pod(source, fmt=None): if isinstance(source, str): with open(source) as fh: return _read_pod(fh, fmt) elif hasattr(source, 'read') and (hasattr(source, 'name') or fmt): return _read_pod(source, fmt) else: message = 'source must be a path or an open file handle; got {}' raise ValueError(message.format(type(source))) def write_pod(pod, dest, fmt=None): if isinstance(dest, str): with open(dest, 'w') as wfh: return _write_pod(pod, wfh, fmt) elif hasattr(dest, 'write') and (hasattr(dest, 'name') or fmt): return _write_pod(pod, dest, fmt) else: message = 'dest must be a path or an open file handle; got {}' raise ValueError(message.format(type(dest))) def dump(o, wfh, fmt='json', *args, **kwargs): serializer = {'yaml': yaml, 'json': json, 'python': python, 'py': python, }.get(fmt) if serializer is None: raise ValueError('Unknown serialization format: "{}"'.format(fmt)) serializer.dump(o, wfh, *args, **kwargs) def load(s, fmt='json', *args, **kwargs): return read_pod(s, fmt=fmt) def _read_pod(fh, fmt=None): if fmt is None: fmt = os.path.splitext(fh.name)[1].lower().strip('.') if fmt == 'yaml': return yaml.load(fh) elif fmt == 'json': return json.load(fh) elif fmt == 'py': return python.load(fh) else: raise ValueError('Unknown format "{}": {}'.format(fmt, getattr(fh, 'name', ''))) def _write_pod(pod, wfh, fmt=None): if fmt is None: fmt = os.path.splitext(wfh.name)[1].lower().strip('.') if fmt == 'yaml': return yaml.dump(pod, wfh) elif fmt == 'json': return json.dump(pod, wfh) elif fmt == 'py': raise ValueError('Serializing to Python is not supported') else: raise ValueError('Unknown format "{}": {}'.format(fmt, getattr(wfh, 'name', ''))) def is_pod(obj): if type(obj) not in POD_TYPES: return False if hasattr(obj, 'items'): for k, v in obj.items(): if not (is_pod(k) and is_pod(v)): return False elif isiterable(obj): for v in obj: if not is_pod(v): return False return True