pa_api.xmlapi.types.utils
1import json 2import logging 3import typing 4from datetime import datetime 5from types import new_class 6from typing import TYPE_CHECKING, Annotated, Any, Optional, TypeVar 7 8from pydantic import ( 9 BaseModel, 10 TypeAdapter, 11) 12from pydantic.functional_validators import ( 13 BeforeValidator, 14 PlainValidator, 15) 16from typing_extensions import Self, TypedDict 17 18from pa_api.utils import first 19from pa_api.xmlapi.utils import el2dict 20 21TIME_FORMAT = "%H:%M:%S" 22DATETIME_FORMAT = f"%Y/%m/%d {TIME_FORMAT}" 23DATETIME_MS_FORMAT = f"{DATETIME_FORMAT}.%f" 24NoneType: type = type(None) 25 26if TYPE_CHECKING: 27 from pa_api.xmlapi.clients import Client 28 29 30class XMLBaseModel(BaseModel): 31 # raw_xml: Optional[Any] = None 32 33 # @model_validator(mode="before") 34 # @classmethod 35 # def _get_raw_xml(cls, data, info: ValidationInfo): 36 # if isinstance(info.context, dict): 37 # raw_xml = info.context.get("raw_xml") 38 # if raw_xml is not None: 39 # data["raw_xml"] = raw_xml 40 # return data 41 42 # @classmethod 43 # def from_xml(cls, xml) -> Self: 44 # data = first(el2dict(xml).values()) 45 # return cls.model_validate(data, context={"raw_xml": xml}) 46 47 # _client: Optional["Client"] 48 # def bind_client(self, client: Optional["Client"]): 49 # self._client = client 50 # return self 51 52 # @model_validator(mode="after") 53 # def _auto_bind_client(self, info: ValidationInfo): 54 # client = None 55 # if isinstance(info.context, dict): 56 # client = info.context.get("client") 57 # self.bind_client(client) 58 # return self 59 60 @classmethod 61 def from_xml(cls, xml) -> Self: 62 # print(etree_tostring(xml)) 63 data = first(el2dict(xml).values()) 64 # print(data) 65 # context = {} 66 # if client is not None: 67 # context["client"] = client 68 return cls.model_validate(data) # , context=context 69 70 71class ObjectBaseModel(XMLBaseModel): 72 def _remove_member(self, subpath, client: "Client", member: str, rulebase=None): 73 """ 74 Remove the member from destination. 75 76 NOTE: Rulebase information is required for panorama 77 """ 78 subpath = subpath.strip("/") 79 rule_xpath = self.get_xpath(rulebase) 80 # panorama_rule_xpath = f"/config/devices/entry/vsys/entry/rulebase/security/rules/entry[@uuid='{self.uuid}']" 81 member_xpath = f"{rule_xpath}/{subpath}/member[text()='{member}']" 82 return client.configuration.delete(member_xpath) 83 84 85def parse_datetime(d): 86 try: 87 if d is None or d in ("none", "Unknown", "(null)"): 88 return None 89 try: 90 return datetime.strptime(d, DATETIME_FORMAT) 91 except Exception: 92 return datetime.strptime(d, DATETIME_MS_FORMAT) 93 except Exception as e: 94 logging.debug(e) 95 logging.debug(f"Failed to parse {d} as datetime") 96 # print(d, type(d)) 97 raise 98 return d 99 100 101def parse_time(d): 102 return datetime.strptime(d, TIME_FORMAT).time() 103 104 105# https://docs.pydantic.dev/latest/concepts/types/#custom-types 106# JobProgress = TypeAliasType('JobProgress', PlainValidator(parse_progress)) 107# Datetime = TypeAliasType( 108# "Datetime", Annotated[datetime, PlainValidator(parse_datetime)] 109# ) 110Datetime = Annotated[datetime, PlainValidator(parse_datetime)] 111 112 113def single_xpath(xml, xpath, parser=None, default=None): 114 try: 115 res = xml.xpath(xpath) 116 res = first(res, None) 117 except Exception: 118 return default 119 if res is None: 120 return default 121 if not isinstance(res, str): 122 res = res.text 123 if parser: 124 res = parser(res) 125 return res 126 127 128pd = parse_datetime 129sx = single_xpath 130 131 132def mksx(xml): 133 def single_xpath(xpath, parser=None, default=None): 134 res = sx(xml, xpath, parser=parser, default=default) 135 logging.debug(res) 136 return res 137 138 return single_xpath 139 140 141def ensure_list(v: Any) -> typing.List[Any]: 142 if v is None: 143 return [] 144 if isinstance(v, dict) and len(v) == 1 and "member" in v: 145 return ensure_list(v["member"]) 146 if isinstance(v, list): 147 return v 148 return [v] 149 150 151# https://docs.pydantic.dev/latest/concepts/types/#generics 152Element = TypeVar("Element", bound=Any) 153# Similar to typing.List, but ensure to always return a list 154List = Annotated[typing.List[Element], BeforeValidator(ensure_list)] 155 156 157def xml_text(v: Any): 158 if isinstance(v, dict) and "#text" in v: 159 return v["#text"] 160 return v 161 162 163def ensure_str(v: Any) -> str: 164 if v is None: 165 return "" 166 if isinstance(v, dict): 167 text = v.get("#text") 168 if text: 169 return text 170 lines = v.get("line") 171 if lines is not None: 172 if isinstance(lines, str): 173 return lines 174 return "\n".join(lines) 175 raise Exception(f"Cannot convert value to string: {v}") 176 return v 177 178 179def validate_ip(v: Any) -> str: 180 if v is None: 181 return "" 182 if isinstance(v, dict): 183 return v["@name"] 184 return v 185 186 187String = Annotated[str, BeforeValidator(ensure_str)] 188Bool = Annotated[bool, BeforeValidator(xml_text)] 189Ip = Annotated[str, BeforeValidator(validate_ip)] 190 191 192def _xml2schema(values: list): 193 types: typing.List[Any] = [ 194 t.__name__ for t in {type(v) for v in values if not isinstance(v, (dict, list))} 195 ] 196 list_values = [v for v in values if isinstance(v, list)] 197 if list_values: 198 types.append(_xml2schema([e for sublist in list_values for e in sublist])) 199 dict_values = [v for v in values if isinstance(v, dict)] 200 if dict_values: 201 all_keys = {k for d in dict_values for k in d} 202 dict_schema = { 203 k: _xml2schema([d.get(k) for d in dict_values]) for k in all_keys 204 } 205 types.append(dict_schema) 206 if not types: 207 raise Exception("NO TYPE") 208 if len(types) == 1: 209 return types[0] 210 return types 211 212 213def _clean_key(k): 214 return k.replace("@", "").replace("#", "") 215 216 217def _slug(k): 218 return _clean_key(k).replace("-", "_") 219 220 221def _keyastypename(k): 222 return "".join(p.title() for p in _clean_key(k).split("-")) 223 224 225def _schematype(values: list, name: Optional[str] = None) -> type: 226 if not name: 227 name = "Dict" 228 optional = None in values 229 values = [v for v in values if v is not None] 230 schema_types: typing.List[Any] = list( 231 {type(v) for v in values if not isinstance(v, (dict, list))} 232 ) 233 list_values = [v for v in values if isinstance(v, list)] 234 if list_values: 235 t = _schematype([e for sublist in list_values for e in sublist], name=name) 236 schema_types.append(t) 237 dict_values = [v for v in values if isinstance(v, dict)] 238 if dict_values: 239 all_keys = {k for d in dict_values for k in d} 240 annotations = { 241 _slug(k): _schematype( 242 [d.get(k) for d in dict_values], name=_keyastypename(k) 243 ) 244 for k in all_keys 245 } 246 t = new_class( 247 name, 248 (TypedDict,), 249 None, 250 lambda ns: ns.update({"__annotations__": annotations}), 251 ) 252 schema_types.append(t) 253 if not schema_types: 254 return NoneType 255 # raise Exception("NO TYPE") 256 257 final_type = ( 258 schema_types[0] if len(schema_types) == 1 else typing.Union[tuple(schema_types)] 259 ) 260 if optional: 261 final_type = Optional[final_type] 262 return final_type 263 264 265def xml2schema(xml): 266 """ 267 Similar to schematype function: 268 The result is a recursive schema that can be dumped with json. 269 """ 270 data = el2dict(xml) 271 return _xml2schema([data]) 272 273 274def schematype(data, name: Optional[str] = None): 275 """ 276 Recursively parse the data to infer the schema. 277 The schema is returned as a type. 278 279 We can dump the json schema using pydantic: 280 TypeAdapter(schematype({"test": 5})).json_schema() 281 """ 282 return _schematype([data], name=name) 283 284 285def xml2schematype(xml): 286 """ 287 Same to schematype function but takes an xml Element as parameter 288 """ 289 name, data = first(el2dict(xml).items()) 290 return schematype(data, name) 291 292 293def jsonschema(data, name=None, indent=4) -> str: 294 ta = TypeAdapter(schematype(data, name)) 295 return json.dumps(ta.json_schema(), indent=indent) 296 297 298def xml2jsonschema(data, indent=4) -> str: 299 ta = TypeAdapter(xml2schematype(data)) 300 return json.dumps(ta.json_schema(), indent=indent)
31class XMLBaseModel(BaseModel): 32 # raw_xml: Optional[Any] = None 33 34 # @model_validator(mode="before") 35 # @classmethod 36 # def _get_raw_xml(cls, data, info: ValidationInfo): 37 # if isinstance(info.context, dict): 38 # raw_xml = info.context.get("raw_xml") 39 # if raw_xml is not None: 40 # data["raw_xml"] = raw_xml 41 # return data 42 43 # @classmethod 44 # def from_xml(cls, xml) -> Self: 45 # data = first(el2dict(xml).values()) 46 # return cls.model_validate(data, context={"raw_xml": xml}) 47 48 # _client: Optional["Client"] 49 # def bind_client(self, client: Optional["Client"]): 50 # self._client = client 51 # return self 52 53 # @model_validator(mode="after") 54 # def _auto_bind_client(self, info: ValidationInfo): 55 # client = None 56 # if isinstance(info.context, dict): 57 # client = info.context.get("client") 58 # self.bind_client(client) 59 # return self 60 61 @classmethod 62 def from_xml(cls, xml) -> Self: 63 # print(etree_tostring(xml)) 64 data = first(el2dict(xml).values()) 65 # print(data) 66 # context = {} 67 # if client is not None: 68 # context["client"] = client 69 return cls.model_validate(data) # , context=context
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
__class_vars__: The names of the class variables defined on the model.
__private_attributes__: Metadata about the private attributes of the model.
__signature__: The synthesized __init__
[Signature
][inspect.Signature] of the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.
__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
72class ObjectBaseModel(XMLBaseModel): 73 def _remove_member(self, subpath, client: "Client", member: str, rulebase=None): 74 """ 75 Remove the member from destination. 76 77 NOTE: Rulebase information is required for panorama 78 """ 79 subpath = subpath.strip("/") 80 rule_xpath = self.get_xpath(rulebase) 81 # panorama_rule_xpath = f"/config/devices/entry/vsys/entry/rulebase/security/rules/entry[@uuid='{self.uuid}']" 82 member_xpath = f"{rule_xpath}/{subpath}/member[text()='{member}']" 83 return client.configuration.delete(member_xpath)
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
__class_vars__: The names of the class variables defined on the model.
__private_attributes__: Metadata about the private attributes of the model.
__signature__: The synthesized __init__
[Signature
][inspect.Signature] of the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.
__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Inherited Members
86def parse_datetime(d): 87 try: 88 if d is None or d in ("none", "Unknown", "(null)"): 89 return None 90 try: 91 return datetime.strptime(d, DATETIME_FORMAT) 92 except Exception: 93 return datetime.strptime(d, DATETIME_MS_FORMAT) 94 except Exception as e: 95 logging.debug(e) 96 logging.debug(f"Failed to parse {d} as datetime") 97 # print(d, type(d)) 98 raise 99 return d
114def single_xpath(xml, xpath, parser=None, default=None): 115 try: 116 res = xml.xpath(xpath) 117 res = first(res, None) 118 except Exception: 119 return default 120 if res is None: 121 return default 122 if not isinstance(res, str): 123 res = res.text 124 if parser: 125 res = parser(res) 126 return res
86def parse_datetime(d): 87 try: 88 if d is None or d in ("none", "Unknown", "(null)"): 89 return None 90 try: 91 return datetime.strptime(d, DATETIME_FORMAT) 92 except Exception: 93 return datetime.strptime(d, DATETIME_MS_FORMAT) 94 except Exception as e: 95 logging.debug(e) 96 logging.debug(f"Failed to parse {d} as datetime") 97 # print(d, type(d)) 98 raise 99 return d
114def single_xpath(xml, xpath, parser=None, default=None): 115 try: 116 res = xml.xpath(xpath) 117 res = first(res, None) 118 except Exception: 119 return default 120 if res is None: 121 return default 122 if not isinstance(res, str): 123 res = res.text 124 if parser: 125 res = parser(res) 126 return res
164def ensure_str(v: Any) -> str: 165 if v is None: 166 return "" 167 if isinstance(v, dict): 168 text = v.get("#text") 169 if text: 170 return text 171 lines = v.get("line") 172 if lines is not None: 173 if isinstance(lines, str): 174 return lines 175 return "\n".join(lines) 176 raise Exception(f"Cannot convert value to string: {v}") 177 return v
266def xml2schema(xml): 267 """ 268 Similar to schematype function: 269 The result is a recursive schema that can be dumped with json. 270 """ 271 data = el2dict(xml) 272 return _xml2schema([data])
Similar to schematype function: The result is a recursive schema that can be dumped with json.
275def schematype(data, name: Optional[str] = None): 276 """ 277 Recursively parse the data to infer the schema. 278 The schema is returned as a type. 279 280 We can dump the json schema using pydantic: 281 TypeAdapter(schematype({"test": 5})).json_schema() 282 """ 283 return _schematype([data], name=name)
Recursively parse the data to infer the schema. The schema is returned as a type.
We can dump the json schema using pydantic: TypeAdapter(schematype({"test": 5})).json_schema()
286def xml2schematype(xml): 287 """ 288 Same to schematype function but takes an xml Element as parameter 289 """ 290 name, data = first(el2dict(xml).items()) 291 return schematype(data, name)
Same to schematype function but takes an xml Element as parameter