Source code for amazon.ion.reader

# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at:
#
#    http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
# OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the
# License.

"""Provides common functionality for Ion binary and text readers."""

# Python 2/3 compatibility
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import six

from collections import deque

import sys

from amazon.ion.symbols import SymbolToken
from .core import DataEvent, IonEventType, Transition
from .core import ION_STREAM_END_EVENT
from .util import coroutine, Enum


[docs]class CodePoint(int): """Evaluates as a code point ordinal, while also containing the unicode character representation and indicating whether the code point was escaped. """ def __init__(self, *args, **kwargs): self.char = None self.is_escaped = False
def _narrow_unichr(code_point): """Retrieves the unicode character representing any given code point, in a way that won't break on narrow builds. This is necessary because the built-in unichr function will fail for ordinals above 0xFFFF on narrow builds (UCS2); ordinals above 0xFFFF would require recalculating and combining surrogate pairs. This avoids that by retrieving the unicode character that was initially read. Args: code_point (int|CodePoint): An int or a subclass of int that contains the unicode character representing its code point in an attribute named 'char'. """ try: if len(code_point.char) > 1: return code_point.char except AttributeError: pass return six.unichr(code_point) _NARROW_BUILD = sys.maxunicode < 0x10ffff _WIDE_BUILD = not _NARROW_BUILD safe_unichr = six.unichr if (six.PY3 or _WIDE_BUILD) else _narrow_unichr
[docs]class CodePointArray: """A mutable sequence of code points. Used in place of bytearray() for text values.""" def __init__(self, initial_bytes=None): self.__text = u'' if initial_bytes is not None: for b in initial_bytes: self.append(b)
[docs] def append(self, value): self.__text += safe_unichr(value)
[docs] def extend(self, values): if isinstance(values, six.text_type): self.__text += values else: assert isinstance(values, six.binary_type) for b in six.iterbytes(values): self.append(b)
[docs] def as_symbol(self): return SymbolToken(self.__text, sid=None, location=None)
[docs] def as_text(self): return self.__text
def __len__(self): return len(self.__text) def __repr__(self): return 'CodePointArray(text=%s)' % (self.__text,) __str__ = __repr__
[docs] def insert(self, index, value): raise ValueError('Attempted to add code point in middle of sequence.')
def __setitem__(self, index, value): raise ValueError('Attempted to set code point in middle of sequence.') def __getitem__(self, index): return self.__text[index] def __delitem__(self, index): raise ValueError('Attempted to delete from code point sequence.')
_EOF = b'\x04' # End of transmission character.
[docs]class BufferQueue(object): """A simple circular buffer of buffers.""" def __init__(self, is_unicode=False): self.__segments = deque() self.__offset = 0 self.__size = 0 self.__data_cls = CodePointArray if is_unicode else bytearray if is_unicode: self.__chr = safe_unichr self.__element_type = six.text_type else: self.__chr = chr if six.PY2 else lambda x: x self.__element_type = six.binary_type self.__ord = ord if (six.PY3 and is_unicode) else lambda x: x self.position = 0 self.is_unicode = is_unicode
[docs] @staticmethod def is_eof(c): return c is _EOF # Note reference equality, ensuring that the EOF literal is still illegal as part of the data.
@staticmethod def _incompatible_types(element_type, data): raise ValueError('Incompatible input data types. Expected %r, got %r.' % (element_type, type(data)))
[docs] def extend(self, data): # TODO Determine if there are any other accumulation strategies that make sense. # TODO Determine if we should use memoryview to avoid copying. if not isinstance(data, self.__element_type): BufferQueue._incompatible_types(self.__element_type, data) self.__segments.append(data) self.__size += len(data)
[docs] def mark_eof(self): self.__segments.append(_EOF) self.__size += 1
[docs] def read(self, length, skip=False): """Consumes the first ``length`` bytes from the accumulator.""" if length > self.__size: raise IndexError( 'Cannot pop %d bytes, %d bytes in buffer queue' % (length, self.__size)) self.position += length self.__size -= length segments = self.__segments offset = self.__offset data = self.__data_cls() while length > 0: segment = segments[0] segment_off = offset segment_len = len(segment) segment_rem = segment_len - segment_off segment_read_len = min(segment_rem, length) if segment_off == 0 and segment_read_len == segment_rem: # consume an entire segment if skip: segment_slice = self.__element_type() else: segment_slice = segment else: # Consume a part of the segment. if skip: segment_slice = self.__element_type() else: segment_slice = segment[segment_off:segment_off + segment_read_len] offset = 0 segment_off += segment_read_len if segment_off == segment_len: segments.popleft() self.__offset = 0 else: self.__offset = segment_off if length <= segment_rem and len(data) == 0: return segment_slice data.extend(segment_slice) length -= segment_read_len if self.is_unicode: return data.as_text() else: return data
[docs] def read_byte(self): if self.__size < 1: raise IndexError('Buffer queue is empty') segments = self.__segments segment = segments[0] segment_len = len(segment) offset = self.__offset if BufferQueue.is_eof(segment): octet = _EOF else: octet = self.__ord(six.indexbytes(segment, offset)) offset += 1 if offset == segment_len: offset = 0 segments.popleft() self.__offset = offset self.__size -= 1 self.position += 1 return octet
[docs] def unread(self, c): """Unread the given character, byte, or code point. If this is a unicode buffer and the input is an int or byte, it will be interpreted as an ordinal representing a unicode code point. If this is a binary buffer, the input must be a byte or int; a unicode character will raise an error. """ if self.position < 1: raise IndexError('Cannot unread an empty buffer queue.') if isinstance(c, six.text_type): if not self.is_unicode: BufferQueue._incompatible_types(self.is_unicode, c) else: c = self.__chr(c) num_code_units = self.is_unicode and len(c) or 1 if self.__offset == 0: if num_code_units == 1 and six.PY3: if self.is_unicode: segment = c else: segment = six.int2byte(c) else: segment = c self.__segments.appendleft(segment) else: self.__offset -= num_code_units def verify(ch, idx): existing = self.__segments[0][self.__offset + idx] if existing != ch: raise ValueError('Attempted to unread %s when %s was expected.' % (ch, existing)) if num_code_units == 1: verify(c, 0) else: for i in range(num_code_units): verify(c[i], i) self.__size += num_code_units self.position -= num_code_units
[docs] def skip(self, length): """Removes ``length`` bytes and returns the number length still required to skip""" if length >= self.__size: skip_amount = self.__size rem = length - skip_amount self.__segments.clear() self.__offset = 0 self.__size = 0 self.position += skip_amount else: rem = 0 self.read(length, skip=True) return rem
def __iter__(self): while self.__size > 0: yield self.read_byte() def __len__(self): return self.__size
[docs]class ReadEventType(Enum): """Events that are pushed into an Ion reader co-routine. Attributes: DATA: Indicates more data for the reader. The expected type is :class:`bytes`. NEXT: Indicates that the reader should yield the next event. SKIP: Indicates that the reader should proceed to the end of the current container. This type is not meaningful at the top-level. """ DATA = 0 NEXT = 1 SKIP = 2
NEXT_EVENT = DataEvent(ReadEventType.NEXT, None) SKIP_EVENT = DataEvent(ReadEventType.SKIP, None)
[docs]def read_data_event(data): """Simple wrapper over the :class:`DataEvent` constructor to wrap a :class:`bytes` like with the ``DATA`` :class:`ReadEventType`. Args: data (bytes|unicode): The data for the event. Bytes are accepted by both binary and text readers, while unicode is accepted by text readers with is_unicode=True. """ return DataEvent(ReadEventType.DATA, data)
@coroutine def reader_trampoline(start, allow_flush=False): """Provides the co-routine trampoline for a reader state machine. The given co-routine is a state machine that yields :class:`Transition` and takes a Transition of :class:`amazon.ion.core.DataEvent` and the co-routine itself. A reader must start with a ``ReadEventType.NEXT`` event to prime the parser. In many cases this will lead to an ``IonEventType.INCOMPLETE`` being yielded, but not always (consider a reader over an in-memory data structure). Notes: A reader delimits its incomplete parse points with ``IonEventType.INCOMPLETE``. Readers also delimit complete parse points with ``IonEventType.STREAM_END``; this is similar to the ``INCOMPLETE`` case except that it denotes that a logical termination of data is *allowed*. When these event are received, the only valid input event type is a ``ReadEventType.DATA``. Generally, ``ReadEventType.NEXT`` is used to get the next parse event, but ``ReadEventType.SKIP`` can be used to skip over the current container. An internal state machine co-routine can delimit a state change without yielding to the caller by yielding ``None`` event, this will cause the trampoline to invoke the transition delegate, immediately. Args: start: The reader co-routine to initially delegate to. allow_flush(Optional[bool]): True if this reader supports receiving ``NEXT`` after yielding ``INCOMPLETE`` to trigger an attempt to flush pending parse events, otherwise False. Yields: amazon.ion.core.IonEvent: the result of parsing. Receives :class:`DataEvent` to parse into :class:`amazon.ion.core.IonEvent`. """ data_event = yield if data_event is None or data_event.type is not ReadEventType.NEXT: raise TypeError('Reader must be started with NEXT') trans = Transition(None, start) while True: trans = trans.delegate.send(Transition(data_event, trans.delegate)) data_event = None if trans.event is not None: # Only yield if there is an event. data_event = (yield trans.event) if trans.event.event_type.is_stream_signal: if data_event.type is not ReadEventType.DATA: if not allow_flush or not (trans.event.event_type is IonEventType.INCOMPLETE and data_event.type is ReadEventType.NEXT): raise TypeError('Reader expected data: %r' % (data_event,)) else: if data_event.type is ReadEventType.DATA: raise TypeError('Reader did not expect data') if data_event.type is ReadEventType.DATA and len(data_event.data) == 0: raise ValueError('Empty data not allowed') if trans.event.depth == 0 \ and trans.event.event_type is not IonEventType.CONTAINER_START \ and data_event.type is ReadEventType.SKIP: raise TypeError('Cannot skip at the top-level') _DEFAULT_BUFFER_SIZE = 8196 @coroutine def blocking_reader(reader, input, buffer_size=_DEFAULT_BUFFER_SIZE): """Provides an implementation of using the reader co-routine with a file-like object. Args: reader(Coroutine): A reader co-routine. input(BaseIO): The file-like object to read from. buffer_size(Optional[int]): The optional buffer size to use. """ ion_event = None while True: read_event = (yield ion_event) ion_event = reader.send(read_event) while ion_event is not None and ion_event.event_type.is_stream_signal: data = input.read(buffer_size) if len(data) == 0: # End of file. if ion_event.event_type is IonEventType.INCOMPLETE: ion_event = reader.send(NEXT_EVENT) continue else: yield ION_STREAM_END_EVENT return ion_event = reader.send(read_data_event(data))