Source code for amazon.ion.writer

# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at:
#
#    http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
# OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the
# License.

"""Provides common functionality for Ion binary and text writers."""

# Python 2/3 compatibility
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from .core import DataEvent, IonEventType, Transition, IonType
from .util import coroutine, Enum


[docs]class WriteEventType(Enum): """Events that can come from an Ion writer co-routine. Attributes: HAS_PENDING: Indicates that the writer has more pending events to yield, and that it should be sent ``None`` as the event to *flush* them out. Generally, this is signalled when an internal buffer has been filled by an input event and needs to be flushed in order to make progress. NEEDS_INPUT: Indicates that the writer has no pending events to yield and that it should be sent a :class:`amazon.ion.core.IonEvent`. COMPLETE: Indicates that the writer has flushed out complete Ion values at the top-level. This is similar to ``NEEDS_INPUT`` except that it signifies point that all data emitted by the writer is in sync with the events given to it. """ HAS_PENDING = 1 NEEDS_INPUT = 2 COMPLETE = 3
NOOP_WRITER_EVENT = DataEvent(WriteEventType.COMPLETE, b'')
[docs]def partial_transition(data, delegate): """Generates a :class:`Transition` that has an event indicating ``HAS_PENDING``.""" return Transition(DataEvent(WriteEventType.HAS_PENDING, data), delegate)
[docs]def validate_scalar_value(value, expected_types): if not isinstance(value, expected_types): raise TypeError('Expected type %s, found %s.' % (expected_types, type(value)))
# To be used when an event of type IonType.NULL is encountered, but its value is not None.
[docs]def illegal_state_null(ion_event): assert ion_event.ion_type is IonType.NULL assert ion_event.value is not None validate_scalar_value(ion_event.value, type(None))
def _serialize_null(ion_event, null_table): return null_table[ion_event.ion_type]
[docs]def serialize_scalar(ion_event, jump_table, null_table): ion_type = ion_event.ion_type if ion_event.value is None: return _serialize_null(ion_event, null_table) if ion_type.is_container: raise TypeError('Expected scalar type in event: %s' % (ion_event,)) return jump_table[ion_type](ion_event)
@coroutine def writer_trampoline(start): """Provides the co-routine trampoline for a writer state machine. The given co-routine is a state machine that yields :class:`Transition` and takes a :class:`Transition` with a :class:`amazon.ion.core.IonEvent` and the co-routine itself. Notes: A writer delimits its logical flush points with ``WriteEventType.COMPLETE``, depending on the configuration, a user may need to send an ``IonEventType.STREAM_END`` to force this to occur. Args: start: The writer co-routine to initially delegate to. Yields: DataEvent: the result of serialization. Receives :class:`amazon.ion.core.IonEvent` to serialize into :class:`DataEvent`. """ trans = Transition(None, start) while True: ion_event = (yield trans.event) if trans.event is None: if ion_event is None: raise TypeError('Cannot start Writer with no event') else: if trans.event.type is WriteEventType.HAS_PENDING and ion_event is not None: raise TypeError('Writer expected to receive no event: %r' % (ion_event,)) if trans.event.type is not WriteEventType.HAS_PENDING and ion_event is None: raise TypeError('Writer expected to receive event') if ion_event is not None and ion_event.event_type is IonEventType.INCOMPLETE: raise TypeError('Writer cannot receive INCOMPLETE event') trans = trans.delegate.send(Transition(ion_event, trans.delegate)) _WRITE_EVENT_HAS_PENDING_EMPTY = DataEvent(WriteEventType.HAS_PENDING, None) def _drain(writer, ion_event): """Drain the writer of its pending write events. Args: writer (Coroutine): A writer co-routine. ion_event (amazon.ion.core.IonEvent): The first event to apply to the writer. Yields: DataEvent: Yields each pending data event. """ result_event = _WRITE_EVENT_HAS_PENDING_EMPTY while result_event.type is WriteEventType.HAS_PENDING: result_event = writer.send(ion_event) ion_event = None yield result_event @coroutine def blocking_writer(writer, output): """Provides an implementation of using the writer co-routine with a file-like object. Args: writer (Coroutine): A writer co-routine. output (BaseIO): The file-like object to pipe events to. Yields: WriteEventType: Yields when no events are pending. Receives :class:`amazon.ion.core.IonEvent` to write to the ``output``. """ result_type = None while True: ion_event = (yield result_type) for result_event in _drain(writer, ion_event): output.write(result_event.data) result_type = result_event.type