from __future__ import annotations
import logging
import xml.etree.ElementTree as etree
from typing import Any
from typing import Dict
from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Union
from common.validators import UpdateType
from importer.namespaces import Tag
from importer.nursery import get_nursery
logger = logging.getLogger(__name__)
[docs]class ParserError(Exception):
pass
[docs]class InvalidDataError(Exception):
pass
[docs]class ElementParser:
"""
Base class for element specific parsers.
ElementParser classes uses introspection to build a lookup table of child element
parsers to their output JSON field name.
This allows 2 options for adding child elements to a Parent element.
Option 1:
.. code:: python
class ChildElement(ElementParser):
tag = Tag("child", prefix="ns")
field = TextElement("field")
class ParentElement(ElementParser):
tag = Tag("parent", prefix="ns")
child = ChildElement()
Option 2:
.. code:: python
class ParentElement(ElementParser):
tag = Tag("parent", prefix="ns")
@ParentElement.register_child("child")
class ChildElement(ElementParser):
tag = Tag("child", prefix="ns")
some_field = TextElement("field")
When handling XML such as:
.. code:: xml
<ns:parent>
<ns:child id="2">
<ns:field>Text</ns:field>
</ns:child>
</ns:parent>
This class will build a JSON object in `self.data` with
the following structure:
.. code:: json
{"child": {"id": 2, "field": "Text"}}
"""
record_code: str
"""
The type id of this model's type family in the TARIC specification.
This number groups together a number of different models into 'records'.
Where two models share a record code, they are conceptually expressing
different properties of the same logical model.
In theory each :class:`~common.transactions.Transaction` should only contain
models with a single :attr:`record_code` (but differing
:attr:`subrecord_code`.)
"""
subrecord_code: str
"""
The type id of this model in the TARIC specification. The
:attr:`subrecord_code` when combined with the :attr:`record_code` uniquely
identifies the type within the specification.
The subrecord code gives the intended order for models in a transaction,
with comparatively smaller subrecord codes needing to come before larger
ones.
"""
tag: Optional[Tag] = None
extra_fields: Sequence[str] = tuple()
data: Union[Dict[str, Any], Any]
def __init__(self, tag: Tag = None, many: bool = False, depth: int = 1):
self.child = None
self.parent: Optional[ElementParser] = None
self.data = dict()
self.depth = depth
self.many = many
self.parent = None
self.text = None
self.started = False
if tag:
self.tag = tag
@property
def _field_lookup(self) -> Dict[ElementParser, str]:
field_lookup = {
parser: field
for field, parser in self.__class__.__dict__.items()
if isinstance(parser, ElementParser)
}
field_lookup.update(getattr(self, "_additional_components", {}))
return field_lookup
[docs] def is_parser_for_element(
self,
parser: ElementParser,
element: etree.Element,
) -> bool:
"""Check if the parser matches the element."""
return parser.tag == element.tag
def get_parser(self, element: etree.Element) -> Optional[ElementParser]:
for parser in self._field_lookup.keys():
if self.is_parser_for_element(parser, element):
return parser
[docs] def start(self, element: etree.Element, parent: ElementParser = None):
"""
Handle the start of an XML tag. The tag may not yet have all of its
children.
We have a few cases where there are tags nested within a tag of the same name.
Example:
.. code:: xml
<oub:additional.code>
<oub:additional.code.sid>00000001</oub:additional.code.sid>
<oub:additional.code.type.id>A</oub:additional.code.type.id>
<oub:additional.code>AAA</oub:additional.code>
<oub:validity.start.date>2021-01-01</oub:validity.start.date>
</oub:additional.code>
In this case matching on tags is not enough and so we also need to keep
track of whether this parser is already parsing an element. If it is, we
don't want to select any child parsers. If it is not, we know that this
is an element that this parser should be parsing.
"""
self.parent = parent
if not self.started:
self.data = dict()
self.started = True
else:
# if the tag matches one of the child elements of this element, get the
# parser for that element
if not self.child:
self.child = self.get_parser(element)
# if currently in a child element, delegate to the child parser
if self.child:
self.child.start(element, self)
def end(self, element: etree.Element):
# if currently in a child element, delegate to the child parser
if self.child:
self.child.end(element)
# leaving the child element, so stop delegating
if not self.child.started and self.is_parser_for_element(
self.child,
element,
):
field_name = self._field_lookup[self.child]
if self.child.many and self.child.extra_fields:
raise NotImplementedError("Many child parsers with extra_fields")
if self.child.many:
self.data.setdefault(field_name, []).append(self.child.data)
elif self.child.extra_fields:
for index, sub_field_name in enumerate(
[field_name, *self.child.extra_fields],
):
self.data[sub_field_name] = self.child.data[index]
else:
self.data[field_name] = self.child.data
self.child = None
# leaving this element, so marshal the data
elif self.is_parser_for_element(self, element):
if element.text:
self.text = element.text.strip()
self.data.update(element.attrib.items())
self.started = False
self.clean()
self.validate()
[docs] def clean(self):
"""Clean up data."""
[docs] def validate(self):
"""Validate data."""
@classmethod
def register_child(cls, name, *args, **kwargs):
if not hasattr(cls, "_additional_components"):
cls._additional_components = {}
def wraps(parser):
cls._additional_components[parser(*args, **kwargs)] = name
return parser
return wraps
[docs]class ValueElementMixin:
"""Provides a convenient way to define a parser for elements that contain
only a text value and have no attributes or children."""
native_type: type
"""The Python type that most closely matches the type of the XML element."""
def clean(self):
super().clean()
self.data = self.native_type(self.text)
[docs]class ConstantElement(ValueElementMixin, ElementParser):
"""
Represents an element that is always a constant value in the XML.
The actual value is ignored and not put into the database. The value
specified in the constructor will be put back into the XML.
"""
def __init__(
self,
tag: Tag,
value: str, # pylint: disable=unused-argument
) -> None:
super().__init__(tag)
[docs]class TextElement(ValueElementMixin, ElementParser):
"""
Represents an element which contains a text value.
.. code-block:: XML
<msg:record.code>Example Text</msg:record.code>
"""
native_type = str
[docs]class IntElement(ValueElementMixin, ElementParser):
"""
Represents an element which contains an integer value.
.. code-block:: XML
<msg:record.code>430</msg:record.code>
"""
native_type = int
def __init__(
self,
*args,
format: str = "FM99999999999999999999", # pylint: disable=unused-argument
):
super().__init__(*args)
[docs]class BooleanElement(ValueElementMixin, ElementParser):
"""
Represents an element which contains a true or false value.
The actual value in the XML by default is assumed to be a 1 for True and a 0
for False. This can be customised by passing in different values.
.. code-block:: XML
<msg:some.value>1</msg:some.value>
<msg:some.value>0</msg:some.value>
"""
native_type = bool
def __init__(self, *args, true_value: str = "1", false_value: str = "0", **kwargs):
self.true_value = true_value
self.false_value = false_value
super().__init__(*args, **kwargs)
[docs] def clean(self):
if self.text == self.true_value:
self.data = True
elif self.text == self.false_value:
self.data = False
else:
self.data = None
[docs]class RangeLowerElement(TextElement):
"""Represents an element that is the lower part of a range."""
[docs]class RangeUpperElement(TextElement):
"""Represents an element that is the upper part of a range."""
[docs]class CompoundElement(ValueElementMixin, ElementParser):
"""
Represents an element in XML that is actually a concatenation of one or more
logical values and separators.
The separator by default is assumed to be a pipe character. The parsed data
will always contain a tuple that is the size of the number of expected
fields (the original field and any extras) – if less than the specified
number of separators occur the rightmost fields will have value ``None``.
.. code-block:: XML
<msg:some.value>one|two|three</msg:some.value>
"""
native_type = tuple
def __init__(
self,
tag: Tag,
*extra_fields: str,
separator: str = "|",
):
super().__init__(tag)
self.extra_fields = extra_fields
self.separator = separator
[docs] def clean(self):
parts = self.text.split(self.separator, len(self.extra_fields))
missing = len(self.extra_fields) - len(parts) + 1
self.data = self.native_type([*parts, *([None] * missing)])
[docs]class ValidityMixin:
"""Parse validity start and end dates."""
valid_between_lower = RangeLowerElement(Tag("validity.start.date"))
valid_between_upper = RangeUpperElement(Tag("validity.end.date"))
def clean(self):
super().clean()
valid_between = {}
lower_name = self._field_lookup[self.valid_between_lower]
upper_name = self._field_lookup[self.valid_between_upper]
if lower_name in self.data:
valid_between["lower"] = self.data.pop(lower_name)
if upper_name in self.data:
valid_between["upper"] = self.data.pop(upper_name)
if valid_between:
*field_names, _ = lower_name.split("__")
real_name = "__".join([*field_names, "valid_between"])
self.data[real_name] = valid_between
[docs]class ValidityStartMixin:
"""Parse validity start date."""
validity_start = TextElement(Tag("validity.start.date"))
[docs]class Writable:
"""
A parser which implements the Writable interface can write its changes to
the database.
Not all TARIC3 elements correspond to database entities (particularly simple
text elements, but also envelopes and app.messages).
"""
nursery = get_nursery()
[docs] def create(self, data: Mapping[str, Any], transaction_id: int):
"""Preps the given data as a create record and submits it to the nursery
for processing."""
data.update(update_type=UpdateType.CREATE)
dispatch_object = {
"data": data,
"tag": self.tag.name,
"transaction_id": transaction_id,
}
self.nursery.submit(dispatch_object)
[docs] def update(self, data: Mapping[str, Any], transaction_id: int):
"""Update a DB record with provided data."""
data.update(update_type=UpdateType.UPDATE.value)
dispatch_object = {
"data": data,
"tag": self.tag.name,
"transaction_id": transaction_id,
}
self.nursery.submit(dispatch_object)
[docs] def delete(self, data: Mapping[str, Any], transaction_id: int):
"""Delete a DB record with provided data."""
data.update(update_type=UpdateType.DELETE.value)
dispatch_object = {
"data": data,
"tag": self.tag.name,
"transaction_id": transaction_id,
}
self.nursery.submit(dispatch_object)