Source code for amazon.ion.writer_binary

# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at:
#
#    http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
# OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the
# License.

"""Binary Ion writer with symbol table management."""

# Python 2/3 compatibility
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import six

from .core import ION_STREAM_END_EVENT, IonEventType, IonType, IonEvent, DataEvent, Transition
from .exceptions import IonException
from .symbols import SID_ION_SYMBOL_TABLE, SID_IMPORTS, SHARED_TABLE_TYPE, \
                     SID_NAME, SID_VERSION, SID_MAX_ID, SID_SYMBOLS, LOCAL_TABLE_TYPE, \
                     SymbolTable, _SYSTEM_SYMBOL_TOKENS
from .util import coroutine, Enum, record
from .writer import NOOP_WRITER_EVENT, writer_trampoline, partial_transition, WriteEventType, \
                    _drain
from .writer_binary_raw import _WRITER_EVENT_NEEDS_INPUT_EMPTY, _raw_binary_writer
from .writer_buffer import BufferTree

_IVM = b'\xE0\x01\x00\xEA'


class _SymbolEventType(Enum):
    """Enumeration of events that impact a symbol table.

    Attributes:
        START_LST: Indicates that a local symbol table should be started.
        SYMBOL: Indicates that a symbol should be interned into the local symbol,
            and the resulting symbol ID returned.
        FINISH: Indicates that the local symbol table (if any) should be ended.
    """
    START_LST = 0
    SYMBOL = 1
    FINISH = 2


class _SymbolEvent(record('event_type', ('symbol', None))):
    """Symbol event used by the managed writer coroutine to trigger an action in the symbol
    table coroutine.

    Only events of type SYMBOL should have symbol text associated with them.

    Args:
        event_type (_SymbolEventType): The type of symbol event.
        symbol (Optional[SymbolToken | unicode]): The symbol token or text associated with the event.
    """


def _system_token(sid):
    return _SYSTEM_SYMBOL_TOKENS[sid - 1]

_SYMBOL_EVENT_START_LST = _SymbolEvent(_SymbolEventType.START_LST)
_SYMBOL_EVENT_FINISH = _SymbolEvent(_SymbolEventType.FINISH)

_ION_EVENT_STRUCT_START = IonEvent(IonEventType.CONTAINER_START, IonType.STRUCT)
_ION_EVENT_STREAM_END = ION_STREAM_END_EVENT
_ION_EVENT_CONTAINER_END = IonEvent(IonEventType.CONTAINER_END)
_ION_EVENT_RAW_LST_STRUCT_START = IonEvent(
    IonEventType.CONTAINER_START, IonType.STRUCT, annotations=[_system_token(SID_ION_SYMBOL_TABLE)])
_ION_EVENT_RAW_IMPORTS_LIST_START = IonEvent(
    IonEventType.CONTAINER_START, IonType.LIST, field_name=_system_token(SID_IMPORTS))
_ION_EVENT_RAW_SYMBOLS_LIST_START = IonEvent(
    IonEventType.CONTAINER_START, IonType.LIST, field_name=_system_token(SID_SYMBOLS))


@coroutine
def _symbol_table_coroutine(writer_buffer, imports):

    def start_lst():
        write(_ION_EVENT_RAW_LST_STRUCT_START)
        if imports:
            write(_ION_EVENT_RAW_IMPORTS_LIST_START)
            for imported in imports:
                # TODO The system table could be allowed as the first import.
                if imported.table_type is not SHARED_TABLE_TYPE:
                    # TODO This should probably fail at creation of the managed writer coroutine,
                    # but that currently requires two imports iterations.
                    raise IonException('Only shared tables may be imported.')
                write(_ION_EVENT_STRUCT_START)
                write(IonEvent(
                    IonEventType.SCALAR, IonType.STRING, imported.name, field_name=_system_token(SID_NAME)))
                write(IonEvent(
                    IonEventType.SCALAR, IonType.INT, imported.version, field_name=_system_token(SID_VERSION)))
                write(IonEvent(
                    IonEventType.SCALAR, IonType.INT, imported.max_id, field_name=_system_token(SID_MAX_ID)))
                write(_ION_EVENT_CONTAINER_END)
            write(_ION_EVENT_CONTAINER_END)
        return _WRITER_EVENT_NEEDS_INPUT_EMPTY

    def write_symbol(symbol):
        if symbol is None:
            raise IonException('Illegal state: local symbol event with None symbol.')
        try:
            key = symbol.sid
            symbol_text = symbol.text
            if symbol_text is not None:
                key = symbol_text
        except AttributeError:
            assert isinstance(symbol, six.text_type)
            key = symbol
            symbol_text = symbol
        token = local_symbols.get(key)
        if token is None:
            assert symbol_text is not None
            token = local_symbols.intern(symbol_text)  # This duplicates the 'get' call...
            write(IonEvent(IonEventType.SCALAR, IonType.STRING, token.text))
        return DataEvent(WriteEventType.NEEDS_INPUT, token)

    # TODO support extending the current LST (by importing it)
    # TODO symtab locking?
    local_symbols = None
    symbol_writer = _raw_binary_writer(writer_buffer)
    write = symbol_writer.send
    has_local_symbols = False
    write_result = None
    while True:
        symbol_event, self = (yield write_result)
        if symbol_event.event_type is _SymbolEventType.START_LST:
            write_event = start_lst()
            local_symbols = SymbolTable(LOCAL_TABLE_TYPE, [], imports=imports)
        elif symbol_event.event_type is _SymbolEventType.SYMBOL:
            if local_symbols is None:
                raise IonException('Illegal state: local symbol table not started.')
            if not has_local_symbols:
                write(_ION_EVENT_RAW_SYMBOLS_LIST_START)
                has_local_symbols = True
            write_event = write_symbol(symbol_event.symbol)
        elif symbol_event.event_type is _SymbolEventType.FINISH:
            # If there are no local symbols or imports, there is no need for an explicit LST.
            if has_local_symbols or imports:
                if has_local_symbols:
                    write(_ION_EVENT_CONTAINER_END)  # End the symbols list.
                write(_ION_EVENT_CONTAINER_END)  # End the symbol table struct.
                for partial in _drain(symbol_writer, _ION_EVENT_STREAM_END):
                    yield partial_transition(partial.data, self)
            write_event = NOOP_WRITER_EVENT
        else:
            raise TypeError('Invalid event: %s' % symbol_event)
        write_result = Transition(write_event, self)


@coroutine
def _managed_binary_writer_coroutine(imports):

    def init():
        _value_writer = _raw_binary_writer(BufferTree())
        _symbol_writer = _raw_symbol_writer(BufferTree(), imports)
        return _value_writer, _symbol_writer

    def intern_symbol(sym):
        return symbol_writer.send(_SymbolEvent(_SymbolEventType.SYMBOL, sym)).data

    def intern_symbols(event):
        field_name = event.field_name
        annotations = event.annotations
        if field_name or field_name == '':
            event = event.derive_field_name(intern_symbol(field_name))
        if annotations:
            event = event.derive_annotations(
                [intern_symbol(annotation) for annotation in annotations])
        if event.ion_type is IonType.SYMBOL and event.value is not None:
            event = event.derive_value(intern_symbol(ion_event.value))
        return event

    value_writer, symbol_writer = init()
    write_result = None
    has_written_values = False
    ivm_needed = True
    while True:
        ion_event, self = (yield write_result)
        if ion_event.event_type is IonEventType.VERSION_MARKER:
            if has_written_values:
                # TODO This could be handled by flushing first.
                raise IonException('Unable to write IVM before STREAM_END')
            else:
                if ivm_needed:
                    yield partial_transition(_IVM, self)
                ivm_needed = False
            write_event = _WRITER_EVENT_NEEDS_INPUT_EMPTY
        elif ion_event.event_type is IonEventType.STREAM_END:
            if has_written_values:
                for partial in _drain(symbol_writer, _SYMBOL_EVENT_FINISH):
                    yield partial_transition(partial.data, self)
                for partial in _drain(value_writer, _ION_EVENT_STREAM_END):
                    yield partial_transition(partial.data, self)
                value_writer, symbol_writer = init()
                has_written_values = False
            write_event = NOOP_WRITER_EVENT
            ivm_needed = True
        else:
            # Intern any symbols and delegate to the raw writer.
            if not has_written_values:
                if ivm_needed:
                    yield partial_transition(_IVM, self)
                ivm_needed = False
                symbol_writer.send(_SYMBOL_EVENT_START_LST)
                has_written_values = True
            ion_event = intern_symbols(ion_event)
            write_event = value_writer.send(ion_event)
        write_result = Transition(write_event, self)


def _raw_symbol_writer(writer_buffer, imports):
    """Returns a raw binary symbol table writer co-routine.

    Keyword Args:
        writer_buffer (BufferTree): The buffer in which this writer's values will be stored.
        imports (Optional[Sequence[SymbolTable]]): A list of shared symbol tables to
            be used by this writer.

    Yields:
        DataEvent: serialization events to write out

        Receives :class:`amazon.ion.core.IonEvent`.
    """
    return writer_trampoline(_symbol_table_coroutine(writer_buffer, imports))


[docs]def binary_writer(imports=None): """Returns a binary writer co-routine. Keyword Args: imports (Optional[Sequence[SymbolTable]]): A list of shared symbol tables to be used by this writer. Yields: DataEvent: serialization events to write out Receives :class:`amazon.ion.core.IonEvent`. """ return writer_trampoline(_managed_binary_writer_coroutine(imports))