import os, sys, re, stat, errno, json, tempfile, contextlib import yaml, pyaml @contextlib.contextmanager def safe_replacement(path, *open_args, mode=None, xattrs=None, **open_kws): 'Context to atomically create/replace file-path in-place unless errors are raised' path, xattrs = str(path), None if mode is None: try: mode = stat.S_IMODE(os.lstat(path).st_mode) except FileNotFoundError: pass if xattrs is None and getattr(os, 'getxattr', None): # MacOS try: xattrs = dict((k, os.getxattr(path, k)) for k in os.listxattr(path)) except FileNotFoundError: pass except OSError as err: if err.errno != errno.ENOTSUP: raise open_kws.update( delete=False, dir=os.path.dirname(path), prefix=os.path.basename(path) + '.' ) if not open_args: open_kws.setdefault('mode', 'w') with tempfile.NamedTemporaryFile(*open_args, **open_kws) as tmp: try: if mode is not None: os.fchmod(tmp.fileno(), mode) if xattrs: for k, v in xattrs.items(): os.setxattr(path, k, v) yield tmp if not tmp.closed: tmp.flush() try: os.fdatasync(tmp) except AttributeError: pass # MacOS os.rename(tmp.name, path) finally: try: os.unlink(tmp.name) except FileNotFoundError: pass def file_line_iter(src, sep='\0\n', bs=128 * 2**10): 'Generator for src-file chunks, split by any of the separator chars' buff0 = buff = '' while True: eof = len(buff := src.read(bs)) < bs while buff: for n in sorted(buff.find(c) for c in sep): if n >= 0: break else: buff0 += buff; break chunk, buff = buff[:n], buff[n+1:] buff0, chunk = '', buff0 + chunk yield chunk if eof: break if buff0: yield buff0 def main(argv=None, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr): import argparse, textwrap dd = lambda text: re.sub( r' \t+', ' ', textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ') parser = argparse.ArgumentParser( formatter_class=argparse.RawTextHelpFormatter, description='Process and dump prettified YAML to stdout.') parser.add_argument('path', nargs='?', metavar='path', help='Path to YAML to read (default: use stdin).') parser.add_argument('-r', '--replace', action='store_true', help='Replace specified path with prettified version in-place.') parser.add_argument('-w', '--width', type=int, metavar='chars', help=dd(''' Max line width hint to pass to pyyaml for the dump. Only used to format scalars and collections (e.g. lists).''')) parser.add_argument('-v', '--vspacing', metavar='N[/M][g]', help=dd(''' Custom thresholds for when to add vertical spacing (empty lines), to visually separate items in overly long YAML lists/mappings. "long" means both >split-lines in line-length and has >split-count items. Value has N[/M][g] format, with default being something like 40/2. N = min number of same-indent lines in a section to split. M = min count of values in a list/mapping to split. "g" can be added to clump single-line values at the top of such lists/maps. "s" to split all-onliner blocks of values. Values examples: 20g, 5/1g, 60/4, gs, 10.''')) parser.add_argument('-l', '--lines', action='store_true', help=dd(''' Read input as a list of \\0 (ascii null char) or newline-separated json/yaml "lines", common with loggers or other incremental data dumps. Each input entry will be exported as a separate YAML document (after "---"). Empty or whitespace-only input entries are skipped without errors.''')) parser.add_argument('-q', '--quiet', action='store_true', help='Disable sanity-check on the output and suppress stderr warnings.') opts = parser.parse_args(sys.argv[1:] if argv is None else argv) if opts.replace and not opts.path: parser.error('-r/--replace option can only be used with a file path, not stdin') src = open(opts.path) if opts.path else stdin try: data = list( yaml.safe_load_all(src) if not opts.lines else (yaml.safe_load(chunk) for chunk in file_line_iter(src) if chunk.strip()) ) finally: src.close() pyaml_kwargs = dict() if opts.width: pyaml_kwargs['width'] = opts.width if vspacing := opts.vspacing: if not (m := re.search(r'^(\d+(?:/\d+)?)?([gs]+)?$', vspacing)): parser.error(f'Unrecognized -v/--vspacing spec: {vspacing!r}') vspacing, (vsplit, flags) = dict(), m.groups() if flags: if 's' in flags: vspacing['oneline_split'] = True if 'g' in flags: pyaml_kwargs['sort_dicts'] = pyaml.PYAMLSort.oneline_group if vsplit: lines, _, count = vsplit.strip().strip('/').partition('/') if lines: vspacing['split_lines'] = int(lines.strip()) if count: vspacing['split_count'] = int(count.strip()) if vspacing: pyaml_kwargs['vspacing'] = vspacing if len(data) > 1: ys = pyaml.dump_all(data, **pyaml_kwargs) else: ys = pyaml.dump(data[0], **pyaml_kwargs) # avoids leading "---" if not opts.quiet: try: data_chk = list(yaml.safe_load_all(ys)) try: data_hash = json.dumps(data, sort_keys=True) except: pass # too complex for checking with json else: if json.dumps(data_chk, sort_keys=True) != data_hash: raise AssertionError('Data from before/after pyaml does not match') except Exception as err: p_err = lambda *a,**kw: print(*a, **kw, file=stderr, flush=True) p_err( 'WARNING: Failed to parse produced YAML' ' output back to data, it is likely too complicated for pyaml' ) err = f'[{err.__class__.__name__}] {err}' p_err(' raised error: ' + ' // '.join(map(str.strip, err.split('\n')))) if opts.replace: with safe_replacement(opts.path) as tmp: tmp.write(ys) else: stdout.write(ys)
Memory