pa_api.xmlapi.types.utils

  1import json
  2import logging
  3import typing
  4from datetime import datetime
  5from types import new_class
  6from typing import Annotated, Any, Optional, TypeVar
  7
  8from pydantic import BaseModel, TypeAdapter
  9from pydantic.functional_validators import (
 10    BeforeValidator,
 11    PlainValidator,
 12)
 13from typing_extensions import Self, TypedDict
 14
 15from pa_api.utils import first
 16from pa_api.xmlapi.utils import el2dict
 17
 18TIME_FORMAT = "%H:%M:%S"
 19DATETIME_FORMAT = f"%Y/%m/%d {TIME_FORMAT}"
 20DATETIME_MS_FORMAT = f"{DATETIME_FORMAT}.%f"
 21NoneType: type = type(None)
 22
 23
 24class XMLBaseModel(BaseModel):
 25    @classmethod
 26    def from_xml(cls, xml) -> Self:
 27        data = first(el2dict(xml).values())
 28        return cls.model_validate(data)
 29
 30
 31def parse_datetime(d):
 32    try:
 33        if d is None or d in ("none", "Unknown"):
 34            return None
 35        try:
 36            return datetime.strptime(d, DATETIME_FORMAT)
 37        except Exception:
 38            return datetime.strptime(d, DATETIME_MS_FORMAT)
 39    except Exception as e:
 40        logging.debug(e)
 41        logging.debug(f"Failed to parse {d} as datetime")
 42        # print(d, type(d))
 43        raise
 44    return d
 45
 46
 47def parse_time(d):
 48    return datetime.strptime(d, TIME_FORMAT).time()
 49
 50
 51# https://docs.pydantic.dev/latest/concepts/types/#custom-types
 52# JobProgress = TypeAliasType('JobProgress', PlainValidator(parse_progress))
 53# Datetime = TypeAliasType(
 54#     "Datetime", Annotated[datetime, PlainValidator(parse_datetime)]
 55# )
 56Datetime = Annotated[datetime, PlainValidator(parse_datetime)]
 57
 58
 59def single_xpath(xml, xpath, parser=None, default=None):
 60    try:
 61        res = xml.xpath(xpath)
 62        res = first(res, None)
 63    except Exception:
 64        return default
 65    if res is None:
 66        return default
 67    if not isinstance(res, str):
 68        res = res.text
 69    if parser:
 70        res = parser(res)
 71    return res
 72
 73
 74pd = parse_datetime
 75sx = single_xpath
 76
 77
 78def mksx(xml):
 79    def single_xpath(xpath, parser=None, default=None):
 80        res = sx(xml, xpath, parser=parser, default=default)
 81        logging.debug(res)
 82        return res
 83
 84    return single_xpath
 85
 86
 87def ensure_list(v: Any) -> typing.List[Any]:
 88    if v is None:
 89        return []
 90    if isinstance(v, dict) and len(v) == 1 and "member" in v:
 91        return ensure_list(v["member"])
 92    if isinstance(v, list):
 93        return v
 94    return [v]
 95
 96
 97# https://docs.pydantic.dev/latest/concepts/types/#generics
 98Element = TypeVar("Element", bound=Any)
 99# Similar to typing.List, but ensure to always return a list
100List = Annotated[typing.List[Element], BeforeValidator(ensure_list)]
101
102
103def xml_text(v: Any):
104    if isinstance(v, dict) and "#text" in v:
105        return v["#text"]
106    return v
107
108
109def ensure_str(v: Any) -> str:
110    if v is None:
111        return ""
112    if isinstance(v, dict):
113        return v["#text"]
114    return v
115
116
117def validate_ip(v: Any) -> str:
118    if v is None:
119        return ""
120    if isinstance(v, dict):
121        return v["@name"]
122    return v
123
124
125String = Annotated[str, BeforeValidator(ensure_str)]
126Bool = Annotated[bool, BeforeValidator(xml_text)]
127Ip = Annotated[str, BeforeValidator(validate_ip)]
128
129
130def _xml2schema(values: list):
131    types: typing.List[Any] = [
132        t.__name__ for t in {type(v) for v in values if not isinstance(v, (dict, list))}
133    ]
134    list_values = [v for v in values if isinstance(v, list)]
135    if list_values:
136        types.append(_xml2schema([e for sublist in list_values for e in sublist]))
137    dict_values = [v for v in values if isinstance(v, dict)]
138    if dict_values:
139        all_keys = {k for d in dict_values for k in d}
140        dict_schema = {
141            k: _xml2schema([d.get(k) for d in dict_values]) for k in all_keys
142        }
143        types.append(dict_schema)
144    if not types:
145        raise Exception("NO TYPE")
146    if len(types) == 1:
147        return types[0]
148    return types
149
150
151def _clean_key(k):
152    return k.replace("@", "").replace("#", "")
153
154
155def _slug(k):
156    return _clean_key(k).replace("-", "_")
157
158
159def _keyastypename(k):
160    return "".join(p.title() for p in _clean_key(k).split("-"))
161
162
163def _schematype(values: list, name: Optional[str] = None) -> type:
164    if not name:
165        name = "Dict"
166    optional = None in values
167    values = [v for v in values if v is not None]
168    schema_types: typing.List[Any] = list(
169        {type(v) for v in values if not isinstance(v, (dict, list))}
170    )
171    list_values = [v for v in values if isinstance(v, list)]
172    if list_values:
173        t = _schematype([e for sublist in list_values for e in sublist], name=name)
174        schema_types.append(t)
175    dict_values = [v for v in values if isinstance(v, dict)]
176    if dict_values:
177        all_keys = {k for d in dict_values for k in d}
178        annotations = {
179            _slug(k): _schematype(
180                [d.get(k) for d in dict_values], name=_keyastypename(k)
181            )
182            for k in all_keys
183        }
184        t = new_class(
185            name,
186            (TypedDict,),
187            None,
188            lambda ns: ns.update({"__annotations__": annotations}),
189        )
190        schema_types.append(t)
191    if not schema_types:
192        return NoneType
193        # raise Exception("NO TYPE")
194
195    final_type = (
196        schema_types[0] if len(schema_types) == 1 else typing.Union[tuple(schema_types)]
197    )
198    if optional:
199        final_type = Optional[final_type]
200    return final_type
201
202
203def xml2schema(xml):
204    """
205    Similar to schematype function:
206    The result is a recursive schema that can be dumped with json.
207    """
208    data = el2dict(xml)
209    return _xml2schema([data])
210
211
212def schematype(data, name: Optional[str] = None):
213    """
214    Recursively parse the data to infer the schema.
215    The schema is returned as a type.
216
217    We can dump the json schema using pydantic:
218    TypeAdapter(schematype({"test": 5})).json_schema()
219    """
220    return _schematype([data], name=name)
221
222
223def xml2schematype(xml):
224    """
225    Same to schematype function but takes an xml Element as parameter
226    """
227    name, data = first(el2dict(xml).items())
228    return schematype(data, name)
229
230
231def jsonschema(data, name=None, indent=4) -> str:
232    ta = TypeAdapter(schematype(data, name))
233    return json.dumps(ta.json_schema(), indent=indent)
234
235
236def xml2jsonschema(data, indent=4) -> str:
237    ta = TypeAdapter(xml2schematype(data))
238    return json.dumps(ta.json_schema(), indent=indent)
TIME_FORMAT = '%H:%M:%S'
DATETIME_FORMAT = '%Y/%m/%d %H:%M:%S'
DATETIME_MS_FORMAT = '%Y/%m/%d %H:%M:%S.%f'
class NoneType:
class XMLBaseModel(pydantic.main.BaseModel):
25class XMLBaseModel(BaseModel):
26    @classmethod
27    def from_xml(cls, xml) -> Self:
28        data = first(el2dict(xml).values())
29        return cls.model_validate(data)

Usage docs: https://docs.pydantic.dev/2.9/concepts/models/

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of the class variables defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.

__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
    is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
@classmethod
def from_xml(cls, xml) -> typing_extensions.Self:
26    @classmethod
27    def from_xml(cls, xml) -> Self:
28        data = first(el2dict(xml).values())
29        return cls.model_validate(data)
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
def parse_datetime(d):
32def parse_datetime(d):
33    try:
34        if d is None or d in ("none", "Unknown"):
35            return None
36        try:
37            return datetime.strptime(d, DATETIME_FORMAT)
38        except Exception:
39            return datetime.strptime(d, DATETIME_MS_FORMAT)
40    except Exception as e:
41        logging.debug(e)
42        logging.debug(f"Failed to parse {d} as datetime")
43        # print(d, type(d))
44        raise
45    return d
def parse_time(d):
48def parse_time(d):
49    return datetime.strptime(d, TIME_FORMAT).time()
Datetime = typing.Annotated[datetime.datetime, PlainValidator(func=<function parse_datetime>, json_schema_input_type=typing.Any)]
def single_xpath(xml, xpath, parser=None, default=None):
60def single_xpath(xml, xpath, parser=None, default=None):
61    try:
62        res = xml.xpath(xpath)
63        res = first(res, None)
64    except Exception:
65        return default
66    if res is None:
67        return default
68    if not isinstance(res, str):
69        res = res.text
70    if parser:
71        res = parser(res)
72    return res
def pd(d):
32def parse_datetime(d):
33    try:
34        if d is None or d in ("none", "Unknown"):
35            return None
36        try:
37            return datetime.strptime(d, DATETIME_FORMAT)
38        except Exception:
39            return datetime.strptime(d, DATETIME_MS_FORMAT)
40    except Exception as e:
41        logging.debug(e)
42        logging.debug(f"Failed to parse {d} as datetime")
43        # print(d, type(d))
44        raise
45    return d
def sx(xml, xpath, parser=None, default=None):
60def single_xpath(xml, xpath, parser=None, default=None):
61    try:
62        res = xml.xpath(xpath)
63        res = first(res, None)
64    except Exception:
65        return default
66    if res is None:
67        return default
68    if not isinstance(res, str):
69        res = res.text
70    if parser:
71        res = parser(res)
72    return res
def mksx(xml):
79def mksx(xml):
80    def single_xpath(xpath, parser=None, default=None):
81        res = sx(xml, xpath, parser=parser, default=default)
82        logging.debug(res)
83        return res
84
85    return single_xpath
def ensure_list(v: Any) -> List[Any]:
88def ensure_list(v: Any) -> typing.List[Any]:
89    if v is None:
90        return []
91    if isinstance(v, dict) and len(v) == 1 and "member" in v:
92        return ensure_list(v["member"])
93    if isinstance(v, list):
94        return v
95    return [v]
List = typing.Annotated[typing.List[~Element], BeforeValidator(func=<function ensure_list>, json_schema_input_type=PydanticUndefined)]
def xml_text(v: Any):
104def xml_text(v: Any):
105    if isinstance(v, dict) and "#text" in v:
106        return v["#text"]
107    return v
def ensure_str(v: Any) -> str:
110def ensure_str(v: Any) -> str:
111    if v is None:
112        return ""
113    if isinstance(v, dict):
114        return v["#text"]
115    return v
def validate_ip(v: Any) -> str:
118def validate_ip(v: Any) -> str:
119    if v is None:
120        return ""
121    if isinstance(v, dict):
122        return v["@name"]
123    return v
String = typing.Annotated[str, BeforeValidator(func=<function ensure_str>, json_schema_input_type=PydanticUndefined)]
Bool = typing.Annotated[bool, BeforeValidator(func=<function xml_text>, json_schema_input_type=PydanticUndefined)]
Ip = typing.Annotated[str, BeforeValidator(func=<function validate_ip>, json_schema_input_type=PydanticUndefined)]
def xml2schema(xml):
204def xml2schema(xml):
205    """
206    Similar to schematype function:
207    The result is a recursive schema that can be dumped with json.
208    """
209    data = el2dict(xml)
210    return _xml2schema([data])

Similar to schematype function: The result is a recursive schema that can be dumped with json.

def schematype(data, name: Optional[str] = None):
213def schematype(data, name: Optional[str] = None):
214    """
215    Recursively parse the data to infer the schema.
216    The schema is returned as a type.
217
218    We can dump the json schema using pydantic:
219    TypeAdapter(schematype({"test": 5})).json_schema()
220    """
221    return _schematype([data], name=name)

Recursively parse the data to infer the schema. The schema is returned as a type.

We can dump the json schema using pydantic: TypeAdapter(schematype({"test": 5})).json_schema()

def xml2schematype(xml):
224def xml2schematype(xml):
225    """
226    Same to schematype function but takes an xml Element as parameter
227    """
228    name, data = first(el2dict(xml).items())
229    return schematype(data, name)

Same to schematype function but takes an xml Element as parameter

def jsonschema(data, name=None, indent=4) -> str:
232def jsonschema(data, name=None, indent=4) -> str:
233    ta = TypeAdapter(schematype(data, name))
234    return json.dumps(ta.json_schema(), indent=indent)
def xml2jsonschema(data, indent=4) -> str:
237def xml2jsonschema(data, indent=4) -> str:
238    ta = TypeAdapter(xml2schematype(data))
239    return json.dumps(ta.json_schema(), indent=indent)