import os, sys, io, re, string, warnings, enum, pathlib, collections as cs
import yaml
PYAMLSort = enum.Enum('PYAMLSort', 'none keys oneline_group')
class PYAMLDumper(yaml.dumper.SafeDumper):
class str_ext(str): __slots__ = 'ext',
pyaml_anchor_decode = None # imported from unidecode module when needed
pyaml_sort_dicts = pyaml_repr_unknown = None
def __init__( self, *args, sort_dicts=None, force_embed=True,
string_val_style=None, anchor_len_max=40, repr_unknown=False, **kws ):
self.pyaml_force_embed = force_embed
self.pyaml_string_val_style = string_val_style
self.pyaml_anchor_len_max = anchor_len_max
self.pyaml_repr_unknown = repr_unknown
if isinstance(sort_dicts, PYAMLSort):
if sort_dicts is sort_dicts.none: kws['sort_keys'] = False
elif sort_dicts is sort_dicts.keys: kws['sort_keys'] = True
else: self.pyaml_sort_dicts, kws['sort_keys'] = sort_dicts, False
elif sort_dicts is not None: kws['sort_keys'] = sort_dicts # for compatibility
return super().__init__(*args, **kws)
@staticmethod
def pyaml_transliterate(s):
if unidecode_missing := not all(ord(c) < 128 for c in s):
if (unidecode := PYAMLDumper.pyaml_anchor_decode) is None:
try: from unidecode import unidecode
except ImportError: unidecode = False
PYAMLDumper.pyaml_anchor_decode = unidecode
if unidecode: unidecode_missing, s = None, unidecode(s)
return re.sub(r'[^-_a-z0-9]+', '_', s.lower()), unidecode_missing
def anchor_node(self, node, hints=list()):
if node in self.anchors:
if self.anchors[node] is None and not self.pyaml_force_embed:
if hints:
nid, uc = self.pyaml_transliterate('_-_'.join(h.value for h in hints))
if len(nid) > (n := self.pyaml_anchor_len_max - 9) + 9:
nid = f'{nid[:n//2]}-_-{nid[-n//2:]}_{self.generate_anchor(node)}'
elif uc is True: nid = f'{nid}_{self.generate_anchor(node)}'
else: nid = self.generate_anchor(node)
self.anchors[node] = nid
else:
self.anchors[node] = None
if isinstance(node, yaml.nodes.SequenceNode):
for item in node.value: self.anchor_node(item)
elif isinstance(node, yaml.nodes.MappingNode):
for key, value in node.value:
self.anchor_node(key)
self.anchor_node(value, hints=hints+[key])
def serialize_node(self, node, parent, index):
if self.pyaml_force_embed: self.anchors[node] = self.serialized_nodes.clear()
return super().serialize_node(node, parent, index)
def expect_block_sequence(self):
self.increase_indent(flow=False, indentless=False)
self.state = self.expect_first_block_sequence_item
def expect_block_sequence_item(self, first=False):
if not first and isinstance(self.event, yaml.events.SequenceEndEvent):
self.indent = self.indents.pop()
self.state = self.states.pop()
else:
self.write_indent()
self.write_indicator('-', True, indention=True)
self.states.append(self.expect_block_sequence_item)
self.expect_node(sequence=True)
def check_simple_key(self):
res = super().check_simple_key()
if self.analysis: self.analysis.allow_flow_plain = False
return res
def choose_scalar_style(self, _re1=re.compile(r':(\s|$)')):
if self.states[-1] == self.expect_block_mapping_simple_value:
# Mapping keys - disable overriding string style, strip comments
if self.pyaml_string_val_style: self.event.style = 'plain'
if isinstance(self.analysis.scalar, self.str_ext):
self.analysis.scalar = str(self.event.value)
# Do default thing for complicated stuff
if self.event.style != 'plain': return super().choose_scalar_style()
# Make sure style isn't overidden for strings like list/mapping items
if (s := self.event.value).startswith('- ') or _re1.search(s): return "'"
# Returned style=None picks write_plain in Emitter.process_scalar
def write_indicator(self, indicator, *args, **kws):
if indicator == '...': return # presumably it's useful somewhere, but don't care
super().write_indicator(indicator, *args, **kws)
def represent_str(self, data):
if not (style := self.pyaml_string_val_style):
if '\n' in data[:-1]:
style = 'literal'
for line in data.splitlines():
if len(line) > self.best_width: break
else: style = '|'
return yaml.representer.ScalarNode('tag:yaml.org,2002:str', data, style=style)
def represent_mapping_sort_oneline(self, kv):
key, value = kv
if not value or isinstance(value, (int, float)): v = 1
elif isinstance(value, str) and '\n' not in value: v = 1
else: v = 2
if isinstance(key, (int, float)): k = 1
elif isinstance(key, str): k = 2
elif key is None: k = 4
else: k, key = 3, f'{type(key)}\0{key}' # best-effort sort for all other types
return v, k, key
def represent_mapping(self, tag, mapping, *args, **kws):
if self.pyaml_sort_dicts is PYAMLSort.oneline_group:
try:
mapping = dict(sorted( mapping.items(),
key=self.represent_mapping_sort_oneline ))
except TypeError: pass # for subtype comparison fails
return super().represent_mapping(tag, mapping, *args, **kws)
def represent_undefined(self, data):
if isinstance(data, tuple) and hasattr(data, '_make') and hasattr(data, '_asdict'):
return self.represent_dict(data._asdict()) # assuming namedtuple
if isinstance(data, cs.abc.Mapping): return self.represent_dict(data) # dict-like
if type(data).__class__.__module__ == 'enum':
node = self.represent_data(data.value)
node.value = self.str_ext(node.value)
node.value.ext = f'# {data.__class__.__name__}.{data.name}'
return node
if hasattr(type(data), '__dataclass_fields__'):
try: import dataclasses as dcs
except ImportError: pass # can still be something else
else: return self.represent_dict(dcs.asdict(data))
try: # this is for numpy arrays, and the likes
if not callable(getattr(data, 'tolist', None)): raise AttributeError
except: pass # can raise other errors with custom types
else: return self.represent_data(data.tolist())
if self.pyaml_repr_unknown: # repr value as a short oneliner
if isinstance(n := self.pyaml_repr_unknown, bool): n = 50
if len(s := repr(data).replace('\n', '⏎')) > n + 10:
if (m := re.search(r' at (0x[0-9a-f]+>)$', s)) and n > len(m[0]):
s = s[:n-len(m[0])] + f' ~[{n:,d}/{len(s):,d}]~ ' + m[1]
else: s = s[:n] + f' ...[{n:,d}/{len(s):,d}]'
cls, node = data.__class__, self.represent_data(s)
if (st := f'{cls.__module__}.{cls.__name__}') in s: st = 'value'
node.value = (s := self.str_ext(s)); s.ext = f'# python {st}'; return node
return super().represent_undefined(data) # will raise RepresenterError
def write_ext(self, func, text, *args, **kws):
# Emitter write-funcs extension to append comments to values
if ext := getattr(text, 'ext', None):
# Commented values are enums/class-reprs and such, which shouldn't be split
if args: args = [False, *args[1:]]
else: kws['split'] = False
getattr(super(), f'write_{func}')(text, *args, **kws)
if ext: super().write_plain(ext, split=False)
write_folded = lambda s,v,*a,**kw: s.write_ext('folded', v, *a, **kw)
write_literal = lambda s,v,*a,**kw: s.write_ext('literal', v, *a, **kw)
write_single_quoted = lambda s,v,*a,**kw: s.write_ext('single_quoted', v, *a, **kw)
write_double_quoted = lambda s,v,*a,**kw: s.write_ext('double_quoted', v, *a, **kw)
write_plain = lambda s,v,split=True: s.write_ext('plain', v, split)
# Unsafe was a separate class in <23.x versions, left here for compatibility
UnsafePYAMLDumper = PYAMLDumper
add_representer = PYAMLDumper.add_representer
add_representer( bool,
lambda s,o: s.represent_scalar('tag:yaml.org,2002:bool', ['no', 'yes'][o]) )
add_representer( type(None),
lambda s,o: s.represent_scalar('tag:yaml.org,2002:null', '') )
add_representer(str, PYAMLDumper.represent_str)
add_representer(cs.defaultdict, PYAMLDumper.represent_dict)
add_representer(cs.OrderedDict, PYAMLDumper.represent_dict)
add_representer(set, PYAMLDumper.represent_list)
add_representer(type(pathlib.Path('')), lambda cls,o: cls.represent_data(str(o)))
add_representer(None, PYAMLDumper.represent_undefined)
def dump_add_vspacing( yaml_str,
split_lines=40, split_count=2, oneline_group=False, oneline_split=False ):
'''Add some newlines to separate overly long YAML lists/mappings.
"long" means both >split_lines in length and has >split_count items.
oneline_group - don't split consecutive oneliner list/map items.
oneline_split - split long list/map consisting only of oneliner values.'''
def _add_vspacing(lines):
a = a_seq = ind_re = ind_re_sub = has_sub = None
blocks, item_lines = list(), list()
for n, line in enumerate(lines):
if ind_re is None and (m := re.match(r'( *)([^# ].?)', line)):
ind_re = re.compile(m[1] + r'\S')
lines.append(f'{m[1]}.') # for last add_vspacing
if ind_re_sub:
if ind_re_sub.match(line): has_sub = True; continue
if n - a > split_lines and (block := lines[a:n]):
if a_seq: block.insert(0, lines[a-1].replace('- ', ' ', 1))
blocks.append((a, n, _add_vspacing(block)[a_seq:]))
ind_re_sub = None
if ind_re.match(line): item_lines.append(n)
if m := re.match(r'( *)(- )?\S.*:(\s|$)', line):
a, a_seq, ind_re_sub = n+1, bool(m[2]), re.compile(m[1] + ' ')
if ( split_items := len(lines) > split_lines and
len(item_lines) > split_count and (oneline_split or has_sub) ):
for n in item_lines:
try:
if ( oneline_group and ind_re
and ind_re.match(lines[n-1].lstrip('\n'))
and ind_re.match(lines[n+1].lstrip('\n')) ): continue
except IndexError: continue
lines[n] = f'\n{lines[n]}'
for a, b, block in reversed(blocks): lines[a:b] = block
if ind_re: lines.pop()
if split_items: lines.append('')
return lines
yaml_str = '\n'.join(_add_vspacing(yaml_str.splitlines()))
return re.sub(r'\n\n+', '\n\n', yaml_str.strip() + '\n')
def dump( data, dst=None, safe=None, force_embed=True, vspacing=True,
string_val_style=None, sort_dicts=None, multiple_docs=False, width=100,
repr_unknown=False, **pyyaml_kws ):
'''Serialize data as pretty-YAML to specified dst file-like object,
or return as str with dst=str (default) or encoded to bytes with dst=bytes.'''
if safe is not None:
cat = DeprecationWarning if not safe else UserWarning
warnings.warn( 'pyaml module "safe" arg/keyword is ignored as implicit'
' safe=maybe-true?, as of pyaml >= 23.x', category=cat, stacklevel=2 )
if sort_dicts is not None and not isinstance(sort_dicts, PYAMLSort):
warnings.warn( 'Using pyaml module sort_dicts as boolean is deprecated as of'
' pyaml >= 23.x - translated to sort_keys PyYAML keyword, use that instead',
DeprecationWarning, stacklevel=2 )
if stream := pyyaml_kws.pop('stream', None):
if dst is not None and stream is not dst:
raise TypeError( 'Using different pyaml dst='
' and pyyaml stream= options at the same time is not supported' )
dst = stream
elif dst is None: dst = str # old default
buff = io.StringIO()
Dumper = lambda *a,**kw: PYAMLDumper( *a, **kw,
force_embed=force_embed, string_val_style=string_val_style,
sort_dicts=sort_dicts, repr_unknown=repr_unknown )
if not multiple_docs: data = [data]
else: pyyaml_kws.setdefault('explicit_start', True)
yaml.dump_all( data, buff, Dumper=Dumper, width=width,
default_flow_style=False, allow_unicode=True, **pyyaml_kws )
buff = buff.getvalue()
if vspacing not in [None, False]:
if vspacing is True: vspacing = dict()
elif not isinstance(vspacing, dict):
warnings.warn(
'Unsupported pyaml "vspacing" parameter type:'
f' [{vspacing.__class__.__name__}] {vspacing}\n'
'As of pyaml >= 23.x it should be either True or keywords-dict'
' for pyaml_add_vspacing, and any other values are ignored,'
' enabling default vspacing behavior.', DeprecationWarning, stacklevel=2 )
vspacing = dict()
if sort_dicts is PYAMLSort.oneline_group: vspacing.setdefault('oneline_group', True)
buff = dump_add_vspacing(buff, **vspacing)
if dst is bytes: return buff.encode()
elif dst is str: return buff
else:
try: dst.write(b'') # tests if dst is str- or bytestream
except: dst.write(buff)
else: dst.write(buff.encode())
# Simpler pyaml.dump() aliases
def dump_all(data, *dump_args, **dump_kws):
'Alias to dump(list, multiple_docs=True) for API compatibility with pyyaml'
return dump(data, *dump_args, multiple_docs=True, **dump_kws)
def dumps(data, **dump_kws):
'Alias to dump() for API compatibility with stdlib conventions'
return dump(data, **dump_kws)
def pprint(*data, **dump_kws):
'Similar to how print() works, with any number of arguments and stdout-default'
dst = dump_kws.pop('file', dump_kws.pop('dst', sys.stdout))
if len(data) == 1: data, = data
dump(data, dst=dst, **dump_kws)
def debug(*data, **dump_kws):
'Same as pprint, but also repr-printing any non-yaml types'
pprint(*data, repr_unknown=True, **dump_kws)
_p = lambda *a,_p=print,**kw: _p(*a, **kw, flush=True) # to use here for debug
p = print = pprint