diff options
Diffstat (limited to 'libs/dynaconf/vendor/ruamel/yaml/composer.py')
-rw-r--r-- | libs/dynaconf/vendor/ruamel/yaml/composer.py | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/libs/dynaconf/vendor/ruamel/yaml/composer.py b/libs/dynaconf/vendor/ruamel/yaml/composer.py new file mode 100644 index 000000000..96e67a7a9 --- /dev/null +++ b/libs/dynaconf/vendor/ruamel/yaml/composer.py @@ -0,0 +1,238 @@ +# coding: utf-8 + +from __future__ import absolute_import, print_function + +import warnings + +from .error import MarkedYAMLError, ReusedAnchorWarning +from .compat import utf8, nprint, nprintf # NOQA + +from .events import ( + StreamStartEvent, + StreamEndEvent, + MappingStartEvent, + MappingEndEvent, + SequenceStartEvent, + SequenceEndEvent, + AliasEvent, + ScalarEvent, +) +from .nodes import MappingNode, ScalarNode, SequenceNode + +if False: # MYPY + from typing import Any, Dict, Optional, List # NOQA + +__all__ = ['Composer', 'ComposerError'] + + +class ComposerError(MarkedYAMLError): + pass + + +class Composer(object): + def __init__(self, loader=None): + # type: (Any) -> None + self.loader = loader + if self.loader is not None and getattr(self.loader, '_composer', None) is None: + self.loader._composer = self + self.anchors = {} # type: Dict[Any, Any] + + @property + def parser(self): + # type: () -> Any + if hasattr(self.loader, 'typ'): + self.loader.parser + return self.loader._parser + + @property + def resolver(self): + # type: () -> Any + # assert self.loader._resolver is not None + if hasattr(self.loader, 'typ'): + self.loader.resolver + return self.loader._resolver + + def check_node(self): + # type: () -> Any + # Drop the STREAM-START event. + if self.parser.check_event(StreamStartEvent): + self.parser.get_event() + + # If there are more documents available? + return not self.parser.check_event(StreamEndEvent) + + def get_node(self): + # type: () -> Any + # Get the root node of the next document. + if not self.parser.check_event(StreamEndEvent): + return self.compose_document() + + def get_single_node(self): + # type: () -> Any + # Drop the STREAM-START event. + self.parser.get_event() + + # Compose a document if the stream is not empty. + document = None # type: Any + if not self.parser.check_event(StreamEndEvent): + document = self.compose_document() + + # Ensure that the stream contains no more documents. + if not self.parser.check_event(StreamEndEvent): + event = self.parser.get_event() + raise ComposerError( + 'expected a single document in the stream', + document.start_mark, + 'but found another document', + event.start_mark, + ) + + # Drop the STREAM-END event. + self.parser.get_event() + + return document + + def compose_document(self): + # type: (Any) -> Any + # Drop the DOCUMENT-START event. + self.parser.get_event() + + # Compose the root node. + node = self.compose_node(None, None) + + # Drop the DOCUMENT-END event. + self.parser.get_event() + + self.anchors = {} + return node + + def compose_node(self, parent, index): + # type: (Any, Any) -> Any + if self.parser.check_event(AliasEvent): + event = self.parser.get_event() + alias = event.anchor + if alias not in self.anchors: + raise ComposerError( + None, None, 'found undefined alias %r' % utf8(alias), event.start_mark + ) + return self.anchors[alias] + event = self.parser.peek_event() + anchor = event.anchor + if anchor is not None: # have an anchor + if anchor in self.anchors: + # raise ComposerError( + # "found duplicate anchor %r; first occurrence" + # % utf8(anchor), self.anchors[anchor].start_mark, + # "second occurrence", event.start_mark) + ws = ( + '\nfound duplicate anchor {!r}\nfirst occurrence {}\nsecond occurrence ' + '{}'.format((anchor), self.anchors[anchor].start_mark, event.start_mark) + ) + warnings.warn(ws, ReusedAnchorWarning) + self.resolver.descend_resolver(parent, index) + if self.parser.check_event(ScalarEvent): + node = self.compose_scalar_node(anchor) + elif self.parser.check_event(SequenceStartEvent): + node = self.compose_sequence_node(anchor) + elif self.parser.check_event(MappingStartEvent): + node = self.compose_mapping_node(anchor) + self.resolver.ascend_resolver() + return node + + def compose_scalar_node(self, anchor): + # type: (Any) -> Any + event = self.parser.get_event() + tag = event.tag + if tag is None or tag == u'!': + tag = self.resolver.resolve(ScalarNode, event.value, event.implicit) + node = ScalarNode( + tag, + event.value, + event.start_mark, + event.end_mark, + style=event.style, + comment=event.comment, + anchor=anchor, + ) + if anchor is not None: + self.anchors[anchor] = node + return node + + def compose_sequence_node(self, anchor): + # type: (Any) -> Any + start_event = self.parser.get_event() + tag = start_event.tag + if tag is None or tag == u'!': + tag = self.resolver.resolve(SequenceNode, None, start_event.implicit) + node = SequenceNode( + tag, + [], + start_event.start_mark, + None, + flow_style=start_event.flow_style, + comment=start_event.comment, + anchor=anchor, + ) + if anchor is not None: + self.anchors[anchor] = node + index = 0 + while not self.parser.check_event(SequenceEndEvent): + node.value.append(self.compose_node(node, index)) + index += 1 + end_event = self.parser.get_event() + if node.flow_style is True and end_event.comment is not None: + if node.comment is not None: + nprint( + 'Warning: unexpected end_event commment in sequence ' + 'node {}'.format(node.flow_style) + ) + node.comment = end_event.comment + node.end_mark = end_event.end_mark + self.check_end_doc_comment(end_event, node) + return node + + def compose_mapping_node(self, anchor): + # type: (Any) -> Any + start_event = self.parser.get_event() + tag = start_event.tag + if tag is None or tag == u'!': + tag = self.resolver.resolve(MappingNode, None, start_event.implicit) + node = MappingNode( + tag, + [], + start_event.start_mark, + None, + flow_style=start_event.flow_style, + comment=start_event.comment, + anchor=anchor, + ) + if anchor is not None: + self.anchors[anchor] = node + while not self.parser.check_event(MappingEndEvent): + # key_event = self.parser.peek_event() + item_key = self.compose_node(node, None) + # if item_key in node.value: + # raise ComposerError("while composing a mapping", + # start_event.start_mark, + # "found duplicate key", key_event.start_mark) + item_value = self.compose_node(node, item_key) + # node.value[item_key] = item_value + node.value.append((item_key, item_value)) + end_event = self.parser.get_event() + if node.flow_style is True and end_event.comment is not None: + node.comment = end_event.comment + node.end_mark = end_event.end_mark + self.check_end_doc_comment(end_event, node) + return node + + def check_end_doc_comment(self, end_event, node): + # type: (Any, Any) -> None + if end_event.comment and end_event.comment[1]: + # pre comments on an end_event, no following to move to + if node.comment is None: + node.comment = [None, None] + assert not isinstance(node, ScalarEvent) + # this is a post comment on a mapping node, add as third element + # in the list + node.comment.append(end_event.comment[1]) + end_event.comment[1] = None |