pa_api.xmlapi.types.utils

  1import json
  2import logging
  3import typing
  4from datetime import datetime
  5from types import new_class
  6from typing import TYPE_CHECKING, Annotated, Any, Optional, TypeVar
  7
  8from pydantic import (
  9    BaseModel,
 10    TypeAdapter,
 11)
 12from pydantic.functional_validators import (
 13    BeforeValidator,
 14    PlainValidator,
 15)
 16from typing_extensions import Self, TypedDict
 17
 18from pa_api.utils import first
 19from pa_api.xmlapi.utils import el2dict
 20
 21TIME_FORMAT = "%H:%M:%S"
 22DATETIME_FORMAT = f"%Y/%m/%d {TIME_FORMAT}"
 23DATETIME_MS_FORMAT = f"{DATETIME_FORMAT}.%f"
 24NoneType: type = type(None)
 25
 26if TYPE_CHECKING:
 27    from pa_api.xmlapi.clients import Client
 28
 29
 30class XMLBaseModel(BaseModel):
 31    # raw_xml: Optional[Any] = None
 32
 33    # @model_validator(mode="before")
 34    # @classmethod
 35    # def _get_raw_xml(cls, data, info: ValidationInfo):
 36    #     if isinstance(info.context, dict):
 37    #         raw_xml = info.context.get("raw_xml")
 38    #         if raw_xml is not None:
 39    #             data["raw_xml"] = raw_xml
 40    #     return data
 41
 42    # @classmethod
 43    # def from_xml(cls, xml) -> Self:
 44    #     data = first(el2dict(xml).values())
 45    #     return cls.model_validate(data, context={"raw_xml": xml})
 46
 47    # _client: Optional["Client"]
 48    # def bind_client(self, client: Optional["Client"]):
 49    #     self._client = client
 50    #     return self
 51
 52    # @model_validator(mode="after")
 53    # def _auto_bind_client(self, info: ValidationInfo):
 54    #     client = None
 55    #     if isinstance(info.context, dict):
 56    #         client = info.context.get("client")
 57    #     self.bind_client(client)
 58    #     return self
 59
 60    @classmethod
 61    def from_xml(cls, xml) -> Self:
 62        # print(etree_tostring(xml))
 63        data = first(el2dict(xml).values())
 64        # print(data)
 65        # context = {}
 66        # if client is not None:
 67        #     context["client"] = client
 68        return cls.model_validate(data)  # , context=context
 69
 70
 71class ObjectBaseModel(XMLBaseModel):
 72    def _remove_member(self, subpath, client: "Client", member: str, rulebase=None):
 73        """
 74        Remove the member from destination.
 75
 76        NOTE: Rulebase information is required for panorama
 77        """
 78        subpath = subpath.strip("/")
 79        rule_xpath = self.get_xpath(rulebase)
 80        # panorama_rule_xpath = f"/config/devices/entry/vsys/entry/rulebase/security/rules/entry[@uuid='{self.uuid}']"
 81        member_xpath = f"{rule_xpath}/{subpath}/member[text()='{member}']"
 82        return client.configuration.delete(member_xpath)
 83
 84
 85def parse_datetime(d):
 86    try:
 87        if d is None or d in ("none", "Unknown", "(null)"):
 88            return None
 89        try:
 90            return datetime.strptime(d, DATETIME_FORMAT)
 91        except Exception:
 92            return datetime.strptime(d, DATETIME_MS_FORMAT)
 93    except Exception as e:
 94        logging.debug(e)
 95        logging.debug(f"Failed to parse {d} as datetime")
 96        # print(d, type(d))
 97        raise
 98    return d
 99
100
101def parse_time(d):
102    return datetime.strptime(d, TIME_FORMAT).time()
103
104
105# https://docs.pydantic.dev/latest/concepts/types/#custom-types
106# JobProgress = TypeAliasType('JobProgress', PlainValidator(parse_progress))
107# Datetime = TypeAliasType(
108#     "Datetime", Annotated[datetime, PlainValidator(parse_datetime)]
109# )
110Datetime = Annotated[datetime, PlainValidator(parse_datetime)]
111
112
113def single_xpath(xml, xpath, parser=None, default=None):
114    try:
115        res = xml.xpath(xpath)
116        res = first(res, None)
117    except Exception:
118        return default
119    if res is None:
120        return default
121    if not isinstance(res, str):
122        res = res.text
123    if parser:
124        res = parser(res)
125    return res
126
127
128pd = parse_datetime
129sx = single_xpath
130
131
132def mksx(xml):
133    def single_xpath(xpath, parser=None, default=None):
134        res = sx(xml, xpath, parser=parser, default=default)
135        logging.debug(res)
136        return res
137
138    return single_xpath
139
140
141def ensure_list(v: Any) -> typing.List[Any]:
142    if v is None:
143        return []
144    if isinstance(v, dict) and len(v) == 1 and "member" in v:
145        return ensure_list(v["member"])
146    if isinstance(v, list):
147        return v
148    return [v]
149
150
151# https://docs.pydantic.dev/latest/concepts/types/#generics
152Element = TypeVar("Element", bound=Any)
153# Similar to typing.List, but ensure to always return a list
154List = Annotated[typing.List[Element], BeforeValidator(ensure_list)]
155
156
157def xml_text(v: Any):
158    if isinstance(v, dict) and "#text" in v:
159        return v["#text"]
160    return v
161
162
163def ensure_str(v: Any) -> str:
164    if v is None:
165        return ""
166    if isinstance(v, dict):
167        text = v.get("#text")
168        if text:
169            return text
170        lines = v.get("line")
171        if lines is not None:
172            if isinstance(lines, str):
173                return lines
174            return "\n".join(lines)
175        raise Exception(f"Cannot convert value to string: {v}")
176    return v
177
178
179def validate_ip(v: Any) -> str:
180    if v is None:
181        return ""
182    if isinstance(v, dict):
183        return v["@name"]
184    return v
185
186
187String = Annotated[str, BeforeValidator(ensure_str)]
188Bool = Annotated[bool, BeforeValidator(xml_text)]
189Ip = Annotated[str, BeforeValidator(validate_ip)]
190
191
192def _xml2schema(values: list):
193    types: typing.List[Any] = [
194        t.__name__ for t in {type(v) for v in values if not isinstance(v, (dict, list))}
195    ]
196    list_values = [v for v in values if isinstance(v, list)]
197    if list_values:
198        types.append(_xml2schema([e for sublist in list_values for e in sublist]))
199    dict_values = [v for v in values if isinstance(v, dict)]
200    if dict_values:
201        all_keys = {k for d in dict_values for k in d}
202        dict_schema = {
203            k: _xml2schema([d.get(k) for d in dict_values]) for k in all_keys
204        }
205        types.append(dict_schema)
206    if not types:
207        raise Exception("NO TYPE")
208    if len(types) == 1:
209        return types[0]
210    return types
211
212
213def _clean_key(k):
214    return k.replace("@", "").replace("#", "")
215
216
217def _slug(k):
218    return _clean_key(k).replace("-", "_")
219
220
221def _keyastypename(k):
222    return "".join(p.title() for p in _clean_key(k).split("-"))
223
224
225def _schematype(values: list, name: Optional[str] = None) -> type:
226    if not name:
227        name = "Dict"
228    optional = None in values
229    values = [v for v in values if v is not None]
230    schema_types: typing.List[Any] = list(
231        {type(v) for v in values if not isinstance(v, (dict, list))}
232    )
233    list_values = [v for v in values if isinstance(v, list)]
234    if list_values:
235        t = _schematype([e for sublist in list_values for e in sublist], name=name)
236        schema_types.append(t)
237    dict_values = [v for v in values if isinstance(v, dict)]
238    if dict_values:
239        all_keys = {k for d in dict_values for k in d}
240        annotations = {
241            _slug(k): _schematype(
242                [d.get(k) for d in dict_values], name=_keyastypename(k)
243            )
244            for k in all_keys
245        }
246        t = new_class(
247            name,
248            (TypedDict,),
249            None,
250            lambda ns: ns.update({"__annotations__": annotations}),
251        )
252        schema_types.append(t)
253    if not schema_types:
254        return NoneType
255        # raise Exception("NO TYPE")
256
257    final_type = (
258        schema_types[0] if len(schema_types) == 1 else typing.Union[tuple(schema_types)]
259    )
260    if optional:
261        final_type = Optional[final_type]
262    return final_type
263
264
265def xml2schema(xml):
266    """
267    Similar to schematype function:
268    The result is a recursive schema that can be dumped with json.
269    """
270    data = el2dict(xml)
271    return _xml2schema([data])
272
273
274def schematype(data, name: Optional[str] = None):
275    """
276    Recursively parse the data to infer the schema.
277    The schema is returned as a type.
278
279    We can dump the json schema using pydantic:
280    TypeAdapter(schematype({"test": 5})).json_schema()
281    """
282    return _schematype([data], name=name)
283
284
285def xml2schematype(xml):
286    """
287    Same to schematype function but takes an xml Element as parameter
288    """
289    name, data = first(el2dict(xml).items())
290    return schematype(data, name)
291
292
293def jsonschema(data, name=None, indent=4) -> str:
294    ta = TypeAdapter(schematype(data, name))
295    return json.dumps(ta.json_schema(), indent=indent)
296
297
298def xml2jsonschema(data, indent=4) -> str:
299    ta = TypeAdapter(xml2schematype(data))
300    return json.dumps(ta.json_schema(), indent=indent)
TIME_FORMAT = '%H:%M:%S'
DATETIME_FORMAT = '%Y/%m/%d %H:%M:%S'
DATETIME_MS_FORMAT = '%Y/%m/%d %H:%M:%S.%f'
class NoneType:
class XMLBaseModel(pydantic.main.BaseModel):
31class XMLBaseModel(BaseModel):
32    # raw_xml: Optional[Any] = None
33
34    # @model_validator(mode="before")
35    # @classmethod
36    # def _get_raw_xml(cls, data, info: ValidationInfo):
37    #     if isinstance(info.context, dict):
38    #         raw_xml = info.context.get("raw_xml")
39    #         if raw_xml is not None:
40    #             data["raw_xml"] = raw_xml
41    #     return data
42
43    # @classmethod
44    # def from_xml(cls, xml) -> Self:
45    #     data = first(el2dict(xml).values())
46    #     return cls.model_validate(data, context={"raw_xml": xml})
47
48    # _client: Optional["Client"]
49    # def bind_client(self, client: Optional["Client"]):
50    #     self._client = client
51    #     return self
52
53    # @model_validator(mode="after")
54    # def _auto_bind_client(self, info: ValidationInfo):
55    #     client = None
56    #     if isinstance(info.context, dict):
57    #         client = info.context.get("client")
58    #     self.bind_client(client)
59    #     return self
60
61    @classmethod
62    def from_xml(cls, xml) -> Self:
63        # print(etree_tostring(xml))
64        data = first(el2dict(xml).values())
65        # print(data)
66        # context = {}
67        # if client is not None:
68        #     context["client"] = client
69        return cls.model_validate(data)  # , context=context

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of the class variables defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.

__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.

__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
    is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
@classmethod
def from_xml(cls, xml) -> typing_extensions.Self:
61    @classmethod
62    def from_xml(cls, xml) -> Self:
63        # print(etree_tostring(xml))
64        data = first(el2dict(xml).values())
65        # print(data)
66        # context = {}
67        # if client is not None:
68        #     context["client"] = client
69        return cls.model_validate(data)  # , context=context
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ObjectBaseModel(XMLBaseModel):
72class ObjectBaseModel(XMLBaseModel):
73    def _remove_member(self, subpath, client: "Client", member: str, rulebase=None):
74        """
75        Remove the member from destination.
76
77        NOTE: Rulebase information is required for panorama
78        """
79        subpath = subpath.strip("/")
80        rule_xpath = self.get_xpath(rulebase)
81        # panorama_rule_xpath = f"/config/devices/entry/vsys/entry/rulebase/security/rules/entry[@uuid='{self.uuid}']"
82        member_xpath = f"{rule_xpath}/{subpath}/member[text()='{member}']"
83        return client.configuration.delete(member_xpath)

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of the class variables defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.

__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.

__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
    is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
XMLBaseModel
from_xml
def parse_datetime(d):
86def parse_datetime(d):
87    try:
88        if d is None or d in ("none", "Unknown", "(null)"):
89            return None
90        try:
91            return datetime.strptime(d, DATETIME_FORMAT)
92        except Exception:
93            return datetime.strptime(d, DATETIME_MS_FORMAT)
94    except Exception as e:
95        logging.debug(e)
96        logging.debug(f"Failed to parse {d} as datetime")
97        # print(d, type(d))
98        raise
99    return d
def parse_time(d):
102def parse_time(d):
103    return datetime.strptime(d, TIME_FORMAT).time()
Datetime = typing.Annotated[datetime.datetime, PlainValidator(func=<function parse_datetime>, json_schema_input_type=typing.Any)]
def single_xpath(xml, xpath, parser=None, default=None):
114def single_xpath(xml, xpath, parser=None, default=None):
115    try:
116        res = xml.xpath(xpath)
117        res = first(res, None)
118    except Exception:
119        return default
120    if res is None:
121        return default
122    if not isinstance(res, str):
123        res = res.text
124    if parser:
125        res = parser(res)
126    return res
def pd(d):
86def parse_datetime(d):
87    try:
88        if d is None or d in ("none", "Unknown", "(null)"):
89            return None
90        try:
91            return datetime.strptime(d, DATETIME_FORMAT)
92        except Exception:
93            return datetime.strptime(d, DATETIME_MS_FORMAT)
94    except Exception as e:
95        logging.debug(e)
96        logging.debug(f"Failed to parse {d} as datetime")
97        # print(d, type(d))
98        raise
99    return d
def sx(xml, xpath, parser=None, default=None):
114def single_xpath(xml, xpath, parser=None, default=None):
115    try:
116        res = xml.xpath(xpath)
117        res = first(res, None)
118    except Exception:
119        return default
120    if res is None:
121        return default
122    if not isinstance(res, str):
123        res = res.text
124    if parser:
125        res = parser(res)
126    return res
def mksx(xml):
133def mksx(xml):
134    def single_xpath(xpath, parser=None, default=None):
135        res = sx(xml, xpath, parser=parser, default=default)
136        logging.debug(res)
137        return res
138
139    return single_xpath
def ensure_list(v: Any) -> List[Any]:
142def ensure_list(v: Any) -> typing.List[Any]:
143    if v is None:
144        return []
145    if isinstance(v, dict) and len(v) == 1 and "member" in v:
146        return ensure_list(v["member"])
147    if isinstance(v, list):
148        return v
149    return [v]
List = typing.Annotated[typing.List[~Element], BeforeValidator(func=<function ensure_list>, json_schema_input_type=PydanticUndefined)]
def xml_text(v: Any):
158def xml_text(v: Any):
159    if isinstance(v, dict) and "#text" in v:
160        return v["#text"]
161    return v
def ensure_str(v: Any) -> str:
164def ensure_str(v: Any) -> str:
165    if v is None:
166        return ""
167    if isinstance(v, dict):
168        text = v.get("#text")
169        if text:
170            return text
171        lines = v.get("line")
172        if lines is not None:
173            if isinstance(lines, str):
174                return lines
175            return "\n".join(lines)
176        raise Exception(f"Cannot convert value to string: {v}")
177    return v
def validate_ip(v: Any) -> str:
180def validate_ip(v: Any) -> str:
181    if v is None:
182        return ""
183    if isinstance(v, dict):
184        return v["@name"]
185    return v
String = typing.Annotated[str, BeforeValidator(func=<function ensure_str>, json_schema_input_type=PydanticUndefined)]
Bool = typing.Annotated[bool, BeforeValidator(func=<function xml_text>, json_schema_input_type=PydanticUndefined)]
Ip = typing.Annotated[str, BeforeValidator(func=<function validate_ip>, json_schema_input_type=PydanticUndefined)]
def xml2schema(xml):
266def xml2schema(xml):
267    """
268    Similar to schematype function:
269    The result is a recursive schema that can be dumped with json.
270    """
271    data = el2dict(xml)
272    return _xml2schema([data])

Similar to schematype function: The result is a recursive schema that can be dumped with json.

def schematype(data, name: Optional[str] = None):
275def schematype(data, name: Optional[str] = None):
276    """
277    Recursively parse the data to infer the schema.
278    The schema is returned as a type.
279
280    We can dump the json schema using pydantic:
281    TypeAdapter(schematype({"test": 5})).json_schema()
282    """
283    return _schematype([data], name=name)

Recursively parse the data to infer the schema. The schema is returned as a type.

We can dump the json schema using pydantic: TypeAdapter(schematype({"test": 5})).json_schema()

def xml2schematype(xml):
286def xml2schematype(xml):
287    """
288    Same to schematype function but takes an xml Element as parameter
289    """
290    name, data = first(el2dict(xml).items())
291    return schematype(data, name)

Same to schematype function but takes an xml Element as parameter

def jsonschema(data, name=None, indent=4) -> str:
294def jsonschema(data, name=None, indent=4) -> str:
295    ta = TypeAdapter(schematype(data, name))
296    return json.dumps(ta.json_schema(), indent=indent)
def xml2jsonschema(data, indent=4) -> str:
299def xml2jsonschema(data, indent=4) -> str:
300    ta = TypeAdapter(xml2schematype(data))
301    return json.dumps(ta.json_schema(), indent=indent)