pa_api.xmlapi

 1from . import types
 2from .client import XMLApi
 3from .clients import Client
 4from .exceptions import ServerError
 5
 6__all__ = [
 7    "Client",
 8    "XMLApi",
 9    "types",
10]
class Client(pa_api.xmlapi.clients.base.BaseXMLApiClient):
15class Client(BaseXMLApiClient):
16    operation: Operation
17    configuration: Config
18    commit: Commit
19    logs: Log
20    misc: Misc
21
22    def _post_init(self):
23        self.configuration = Config(self)
24        self.operation = Operation(self)
25        self.commit = Commit(self)
26        self.logs = Log(self)
27        self.misc = Misc(self)
28
29    # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/export-files-api
30    # https://knowledgebase.paloaltonetworks.com/KCSArticleDetail?id=kA10g000000ClaOCAS#:~:text=From%20the%20GUI%2C%20go%20to%20Device%20%3E%20Setup,%3E%20scp%20export%20configuration%20%5Btab%20for%20command%20help%5D
31    def _export_request(
32        self,
33        category,
34        method="GET",
35        params=None,
36        verify=None,
37        stream=None,
38        timeout=None,
39    ):
40        if params is None:
41            params = {}
42        params = {"category": category, **params}
43        return self._request(
44            "export",
45            method=method,
46            params=params,
47            verify=verify,
48            parse=False,
49            stream=stream,
50            timeout=timeout,
51        ).content
52
53    def export_configuration(
54        self,
55        verify=None,
56        timeout=None,
57    ) -> Element:
58        return self._export_request(
59            category="configuration",
60            verify=verify,
61            timeout=timeout,
62        )
63
64    def export_device_state(
65        self,
66        verify=None,
67        timeout=None,
68    ) -> Element:
69        return self._export_request(
70            category="device-state",
71            verify=verify,
72            timeout=timeout,
73        )
74
75    # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/get-version-info-api
76    def api_version(self):
77        return el2dict(
78            self._request(
79                "version",
80                method="POST",
81            ).xpath(".//result")[0]
82        )["result"]
83
84    def _check_is_panorama(self) -> bool:
85        return self.configuration.check_is_panorama()
86
87    @property
88    def ispanorama(self):
89        if self._ispanorama is None:
90            self._ispanorama = self._check_is_panorama()
91        return self._ispanorama
operation: pa_api.xmlapi.clients.operations.operation.Operation
configuration: pa_api.xmlapi.clients.config.Config
commit: pa_api.xmlapi.clients.commit.Commit
logs: pa_api.xmlapi.clients.logs.Log
misc: pa_api.xmlapi.clients.misc.Misc
def export_configuration( self, verify=None, timeout=None) -> <cyfunction Element at 0x7f4bd79982b0>:
53    def export_configuration(
54        self,
55        verify=None,
56        timeout=None,
57    ) -> Element:
58        return self._export_request(
59            category="configuration",
60            verify=verify,
61            timeout=timeout,
62        )
def export_device_state( self, verify=None, timeout=None) -> <cyfunction Element at 0x7f4bd79982b0>:
64    def export_device_state(
65        self,
66        verify=None,
67        timeout=None,
68    ) -> Element:
69        return self._export_request(
70            category="device-state",
71            verify=verify,
72            timeout=timeout,
73        )
def api_version(self):
76    def api_version(self):
77        return el2dict(
78            self._request(
79                "version",
80                method="POST",
81            ).xpath(".//result")[0]
82        )["result"]
ispanorama
87    @property
88    def ispanorama(self):
89        if self._ispanorama is None:
90            self._ispanorama = self._check_is_panorama()
91        return self._ispanorama
class XMLApi:
  25class XMLApi:
  26    def __init__(
  27        self,
  28        host=None,
  29        api_key=None,
  30        ispanorama=None,
  31        target=None,
  32        verify=False,
  33        timeout=None,
  34        logger=None,
  35    ):
  36        env_host, env_apikey = get_credentials_from_env()
  37        host = host or env_host
  38        api_key = api_key or env_apikey
  39        if not host:
  40            raise Exception("Missing Host")
  41        if not api_key:
  42            raise Exception("Missing API Key")
  43        host, _, _ = clean_url_host(host)
  44
  45        default_params = {}
  46        if target:
  47            default_params["target"] = target
  48
  49        self._host = host
  50        self._api_key = api_key
  51        self._url = f"{host}/api"
  52        self._verify = verify
  53        self._timeout = timeout
  54        self._ispanorama = ispanorama
  55        self._default_params = default_params
  56        self.logger = logger or logging
  57
  58    def _request(
  59        self,
  60        type,
  61        method="GET",
  62        vsys=None,
  63        params=None,
  64        remove_blank_text=True,
  65        verify=None,
  66        parse=True,
  67        stream=None,
  68        timeout=None,
  69    ):
  70        if verify is None:
  71            verify = self._verify
  72        if timeout is None:
  73            timeout = self._timeout
  74        headers = {"X-PAN-KEY": self._api_key}
  75        params = {**self._default_params, **(params or {})}
  76        return raw_request(
  77            self._url,
  78            type,
  79            method,
  80            vsys=vsys,
  81            params=params,
  82            headers=headers,
  83            remove_blank_text=remove_blank_text,
  84            verify=verify,
  85            logger=self.logger,
  86            parse=parse,
  87            stream=stream,
  88            timeout=timeout,
  89        )
  90
  91    # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/export-files-api
  92    # https://knowledgebase.paloaltonetworks.com/KCSArticleDetail?id=kA10g000000ClaOCAS#:~:text=From%20the%20GUI%2C%20go%20to%20Device%20%3E%20Setup,%3E%20scp%20export%20configuration%20%5Btab%20for%20command%20help%5D
  93    def _export_request(
  94        self,
  95        category,
  96        method="GET",
  97        params=None,
  98        verify=None,
  99        stream=None,
 100        timeout=None,
 101    ):
 102        if params is None:
 103            params = {}
 104        params = {"category": category, **params}
 105        return self._request(
 106            "export",
 107            method=method,
 108            params=params,
 109            verify=verify,
 110            parse=False,
 111            stream=stream,
 112            timeout=timeout,
 113        ).content
 114
 115    def export_configuration(
 116        self,
 117        verify=None,
 118        timeout=None,
 119    ) -> Element:
 120        return self._export_request(
 121            category="configuration",
 122            verify=verify,
 123            timeout=timeout,
 124        )
 125
 126    def export_device_state(
 127        self,
 128        verify=None,
 129        timeout=None,
 130    ) -> Element:
 131        return self._export_request(
 132            category="device-state",
 133            verify=verify,
 134            timeout=timeout,
 135        )
 136
 137    def _conf_request(
 138        self,
 139        xpath,
 140        action="get",
 141        method="GET",
 142        vsys=None,
 143        params=None,
 144        remove_blank_text=True,
 145        verify=None,
 146        timeout=None,
 147    ) -> Element:
 148        if params is None:
 149            params = {}
 150        params = {"action": action, "xpath": xpath, **params}
 151        return self._request(
 152            "config",
 153            method=method,
 154            vsys=vsys,
 155            params=params,
 156            remove_blank_text=remove_blank_text,
 157            verify=verify,
 158            timeout=timeout,
 159        )
 160
 161    def _op_request(
 162        self,
 163        cmd,
 164        method="POST",
 165        vsys=None,
 166        params=None,
 167        remove_blank_text=True,
 168        verify=None,
 169        timeout=None,
 170    ) -> Element:
 171        if params is None:
 172            params = {}
 173        params = {"cmd": cmd, **params}
 174        return self._request(
 175            "op",
 176            method=method,
 177            vsys=vsys,
 178            params=params,
 179            remove_blank_text=remove_blank_text,
 180            verify=verify,
 181            timeout=timeout,
 182        )
 183
 184    def _commit_request(
 185        self,
 186        cmd,
 187        method="POST",
 188        params=None,
 189        remove_blank_text=True,
 190        verify=None,
 191        timeout=None,
 192    ):
 193        if params is None:
 194            params = {}
 195        params = {"cmd": cmd, **params}
 196        return self._request(
 197            "commit",
 198            method=method,
 199            params=params,
 200            remove_blank_text=remove_blank_text,
 201            verify=verify,
 202            timeout=timeout,
 203        )
 204
 205    # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/get-started-with-the-pan-os-xml-api/get-your-api-key
 206    def generate_apikey(self, username, password: str) -> str:
 207        """
 208        Generate a new API-Key for the user connected.
 209        """
 210        params = {"user": username, "password": password}
 211        return self._request(
 212            "keygen",
 213            method="POST",
 214            params=params,
 215        ).xpath(".//key/text()")[0]
 216
 217    # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/get-version-info-api
 218    def api_version(self):
 219        return el2dict(
 220            self._request(
 221                "version",
 222                method="POST",
 223            ).xpath(".//result")[0]
 224        )["result"]
 225
 226    def configuration(
 227        self,
 228        xpath,
 229        action="get",
 230        method="GET",
 231        params=None,
 232        remove_blank_text=True,
 233    ):
 234        return self._conf_request(
 235            xpath,
 236            action=action,
 237            method=method,
 238            params=params,
 239            remove_blank_text=remove_blank_text,
 240        )
 241
 242    def operation(
 243        self,
 244        cmd,
 245        method="POST",
 246        params=None,
 247        remove_blank_text=True,
 248    ):
 249        return self._op_request(
 250            cmd,
 251            method=method,
 252            params=params,
 253            remove_blank_text=remove_blank_text,
 254        )
 255
 256    def _check_is_panorama(self) -> bool:
 257        try:
 258            self.configuration("/config/panorama/vsys")
 259            return False
 260        except Exception:
 261            return True
 262
 263    @property
 264    def ispanorama(self):
 265        if self._ispanorama is None:
 266            self._ispanorama = self._check_is_panorama()
 267        return self._ispanorama
 268
 269    def get_tree(self, extended=False) -> Element:
 270        """
 271        Return the running configuration
 272        The differences with `running_config` are not known
 273        """
 274        tree = get_tree(
 275            self._host, self._api_key, verify=self._verify, logger=self.logger
 276        )
 277        if extended:
 278            self._extend_tree_information(tree)
 279        return tree
 280
 281    def _get_rule_use(self, device_group, position, rule_type, number: int = 200):
 282        results = []
 283        for i in range(100):
 284            cmd = _get_rule_use_cmd(
 285                device_group,
 286                position,
 287                rule_type,
 288                i * number,
 289                number,
 290            )
 291            res = self._op_request(cmd).xpath("result")[0]
 292            total_count = int(res.attrib["total-count"])
 293            results.extend(res.xpath("entry"))
 294            if len(results) >= total_count:
 295                break
 296        return results
 297
 298    def get_rule_use(self, tree=None, max_threads: Optional[int] = None):
 299        if tree is None:
 300            tree = self.get_tree()
 301        device_groups = tree.xpath("devices/*/device-group/*/@name")
 302        positions = ("pre", "post")
 303        # rule_types = tuple({x.tag for x in tree.xpath(
 304        # "devices/*/device-group/*"
 305        # "/*[self::post-rulebase or self::pre-rulebase]/*")})
 306        rule_types = ("security", "pbf", "nat", "application-override")
 307        args_list = list(product(device_groups, positions, rule_types))
 308
 309        def func(args):
 310            return self._get_rule_use(*args)
 311
 312        threads = len(args_list)
 313        threads = min(max_threads or threads, threads)
 314        with Pool(len(args_list)) as pool:
 315            data = pool.map(func, args_list)
 316        return [entry for entry_list in data for entry in entry_list]
 317
 318    def _get_rule_hit_count(self, device_group, rulebase, rule_type):
 319        cmd = (
 320            "<show><rule-hit-count><device-group>"
 321            f"<entry name='{device_group}'><{rulebase}><entry name='{rule_type}'>"
 322            f"<rules><all/></rules></entry></{rulebase}></entry>"
 323            "</device-group></rule-hit-count></show>"
 324        )
 325        res = self._op_request(cmd)
 326        entries = res.xpath(".//rules/entry") or []
 327        # return entries
 328        return [(device_group, rulebase, rule_type, e) for e in entries]
 329
 330    def get_rule_hit_count(self, tree=None, max_threads=None):
 331        if tree is None:
 332            tree = self.get_tree()
 333        device_groups = tree.xpath("devices/*/device-group/*/@name")
 334        rulebases = ("pre-rulebase", "post-rulebase")
 335        rule_types = ("security", "pbf", "nat", "application-override")
 336        args_list = list(product(device_groups, rulebases, rule_types))
 337
 338        def func(args):
 339            return self._get_rule_hit_count(*args)
 340
 341        threads = len(args_list)
 342        threads = min(max_threads or threads, threads)
 343        with Pool(len(args_list)) as pool:
 344            data = pool.map(func, args_list)
 345        return [entry for entry_list in data for entry in entry_list]
 346
 347    def _extend_tree_information(
 348        self,
 349        tree,
 350        extended=None,
 351        max_threads=None,
 352    ):
 353        """
 354        Incorporate usage statistics into the configuration.
 355        tree: the configuration as a XML object
 356        extended: rule-use data (if not provided, the function will retrieve them automatically)
 357        """
 358        if extended is None:
 359            extended = self.get_rule_use(tree, max_threads=max_threads)
 360        rules = tree.xpath(
 361            ".//device-group/entry/"
 362            "*[self::pre-rulebase or self::post-rulebase]/*/rules/entry[@uuid]",
 363        )
 364        ext_dict = {x.attrib.get("uuid"): x for x in extended}
 365        rules_dict = {x.attrib["uuid"]: x for x in rules}
 366        for ext, rule in map_dicts(ext_dict, rules_dict):
 367            extend_element(rule, ext)
 368            # NOTE: Do not use rule.extend(ext)
 369            # => This is causing duplicates entries
 370        return tree, extended
 371
 372    def get(self, xpath: str):
 373        """
 374        This will retrieve the xml definition based on the xpath
 375        The xpath doesn't need to be exact
 376        and can select multiple values at once.
 377        Still, it must at least speciy /config at is begining
 378        """
 379        return self._conf_request(xpath, action="show", method="GET")
 380
 381    def delete(self, xpath: str):
 382        """
 383        This will REMOVE the xml definition at the provided xpath.
 384        The xpath must be exact.
 385        """
 386        return self._conf_request(
 387            xpath,
 388            action="delete",
 389            method="DELETE",
 390        )
 391
 392    def create(self, xpath: str, xml_definition):
 393        """
 394        This will ADD the xml definition
 395        INSIDE the element at the provided xpath.
 396        The xpath must be exact.
 397        """
 398        # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration
 399        params = {"element": xml_definition}
 400        return self._conf_request(
 401            xpath,
 402            action="set",
 403            method="POST",
 404            params=params,
 405        )
 406
 407    def update(self, xpath: str, xml_definition):
 408        """
 409        This will REPLACE the xml definition
 410        INSTEAD of the element at the provided xpath
 411        The xpath must be exact.
 412        Nb: We can pull the whole config, update it locally,
 413        and push the final result
 414        """
 415        # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration
 416        params = {"element": xml_definition}
 417        return self._conf_request(
 418            xpath,
 419            action="edit",
 420            method="POST",
 421            params=params,
 422        )
 423
 424    def revert_changes(self, skip_validated: bool = False):
 425        """
 426        Revert all the changes made on Panorama.
 427        NOTE:
 428        - This only applies on non-commited changes.
 429        - This revert everything (not scoped by users)
 430
 431        skip_validated: Do not revert changes that were validated
 432        """
 433        skip = "<skip-validate>yes</skip-validate>" if skip_validated else ""
 434        cmd = f"<revert><config>{skip}</config></revert>"
 435        return self._op_request(cmd)
 436
 437    def validate_changes(self):
 438        """
 439        Validated all the changes currently made
 440        """
 441        cmd = "<validate><full></full></validate>"
 442        return self._op_request(cmd)
 443
 444    def _raw_get_push_scope(self, admin=None):
 445        """
 446        Gives detailed information about pending changes
 447        (e.g. xpath, owner, action, ...)
 448        """
 449        filter = f"<admin><member>{admin}</member></admin>" if admin else ""
 450        cmd = f"<show><config><push-scope>{filter}</push-scope></config></show>"
 451        return self._op_request(cmd)
 452
 453    def get_push_scope_devicegroups(self, admin=None):
 454        """
 455        Gives detailed information about pending changes
 456        (e.g. xpath, owner, action, ...)
 457        """
 458        scope = self._raw_get_push_scope(admin=admin)
 459        return list(set(scope.xpath(".//objects/entry[@loc-type='device-group']/@loc")))
 460
 461    def uncommited_changes(self):
 462        """
 463        Gives detailed information about pending changes
 464        (e.g. xpath, owner, action, ...)
 465        """
 466        cmd = "<show><config><list><changes></changes></list></config></show>"
 467        return self._op_request(cmd)
 468
 469    def uncommited_changes_summary(self, admin=None):
 470        """
 471        Only gives the concern device groups
 472        """
 473        admin = (
 474            f"<partial><admin><member>{admin}</member></admin></partial>"
 475            if admin
 476            else ""
 477        )
 478        cmd = f"<show><config><list><change-summary>{admin}</change-summary></list></config></show>"
 479        return self._op_request(cmd)
 480
 481    def pending_changes(self):
 482        """
 483        Result content is either 'yes' or 'no'
 484        """
 485        cmd = "<check><pending-changes></pending-changes></check>"
 486        return self._op_request(cmd)
 487
 488    def save_config(self, name):
 489        """
 490        Create a named snapshot of the current configuration
 491        """
 492        cmd = f"<save><config><to>{name}</to></config></save>"
 493        return "\n".join(self._op_request(cmd).xpath(".//result/text()"))
 494
 495    def save_device_state(self):
 496        """
 497        Create a snapshot of the current device state
 498        """
 499        cmd = "<save><device-state></device-state></save>"
 500        return "\n".join(self._op_request(cmd).xpath(".//result/text()"))
 501
 502    def get_named_configuration(self, name):
 503        """
 504        Get the configuration from a named snapshot as an XML object
 505        """
 506        cmd = f"<show><config><saved>{name}</saved></config></show>"
 507        return self._op_request(cmd, remove_blank_text=False).xpath("./result/config")[
 508            0
 509        ]
 510
 511    def candidate_config(self) -> Element:
 512        """
 513        Get the configuration to be commited as an XML object
 514        """
 515        cmd = "<show><config><candidate></candidate></config></show>"
 516        return self._op_request(cmd, remove_blank_text=False)
 517
 518    def running_config(self) -> Element:
 519        """
 520        Get the current running configuration as an XML object
 521        """
 522        cmd = "<show><config><running></running></config></show>"
 523        return self._op_request(cmd, remove_blank_text=False)
 524
 525    def _raw_get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> Element:
 526        """
 527        Get information of job(s) as an XML object.
 528        Retrieve all jobs by default.
 529
 530        If job_id is provided, then only retrieve the job requested.
 531        """
 532        filter = "<all></all>"
 533        if job_ids:
 534            if isinstance(job_ids, str):
 535                job_ids = [job_ids]
 536            filter = "".join(f"<id>{j}</id>" for j in job_ids)
 537        cmd = f"<show><jobs>{filter}</jobs></show>"
 538        return self._op_request(cmd)
 539
 540    def get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> List[types.Job]:
 541        """
 542        Get information of job(s)
 543        Retrieve all jobs by default.
 544
 545        If job_id is provided, then only retrieve the job requested.
 546        """
 547        job_xmls = self._raw_get_jobs(job_ids).xpath(".//job")
 548        transformed = (types.Job.from_xml(x) for x in job_xmls)
 549        return [j for j in transformed if j]
 550
 551    def get_job(self, job_id) -> types.Job:
 552        """
 553        Get information of job(s)
 554        Retrieve all jobs by default.
 555
 556        If job_id is provided, then only retrieve the job requested.
 557        """
 558        return self.get_jobs(job_id)[0]
 559
 560    def _raw_get_versions(self) -> Element:
 561        """
 562        Get the versions informations as a XML object.
 563        """
 564        cmd = "<request><system><software><check></check></software></system></request>"
 565        return self.operation(cmd)
 566
 567    def get_versions(self) -> List[types.SoftwareVersion]:
 568        """
 569        Get the versions informations
 570        """
 571        res = self._raw_get_versions()
 572        return [
 573            types.SoftwareVersion.from_xml(entry)
 574            for entry in res.xpath(".//sw-updates/versions/entry")
 575        ]
 576
 577    def wait_job_completion(self, job_id: str, waiter=None) -> types.Job:
 578        """
 579        Block until the job complete.
 580
 581        job_id: the job to wait upon
 582        waiter: a generator that yield when a new query must be done.
 583                see `wait` function (the default waiter) for an example
 584        """
 585        if not waiter:
 586            waiter = wait()
 587        for _ in waiter:
 588            job = self.get_job(job_id)
 589            if job.progress >= 100:
 590                return job
 591            self.logger.info(f"Job {job_id} progress: {job.progress}")
 592        raise Exception("Timeout while waiting for job completion")
 593
 594    def raw_get_pending_jobs(self):
 595        """
 596        Get all the jobs that are pending as a XML object
 597        """
 598        cmd = "<show><jobs><pending></pending></jobs></show>"
 599        return self._op_request(cmd)
 600
 601    def commit_changes(self, force: bool = False):
 602        """
 603        Commit all changes
 604        """
 605        cmd = "<commit>{}</commit>".format("<force></force>" if force else "")
 606        return self._commit_request(cmd)
 607
 608    def _lock_cmd(self, cmd, vsys, no_exception=False) -> bool:
 609        """
 610        Utility function for commands that tries to manipulate the lock
 611        on Panorama.
 612        """
 613        try:
 614            result = "".join(self._op_request(cmd, vsys=vsys).itertext())
 615            self.logger.debug(result)
 616        except Exception as e:
 617            if no_exception:
 618                self.logger.error(e)
 619                return False
 620            raise
 621        return True
 622
 623    # https://github.com/PaloAltoNetworks/pan-os-python/blob/a6b018e3864ff313fed36c3804394e2c92ca87b3/panos/base.py#L4459
 624    def add_config_lock(self, comment=None, vsys="shared", no_exception=False) -> bool:
 625        comment = f"<comment>{comment}</comment>" if comment else ""
 626        cmd = f"<request><config-lock><add>{comment}</add></config-lock></request>"
 627        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
 628
 629    def remove_config_lock(self, vsys="shared", no_exception=False) -> bool:
 630        cmd = "<request><config-lock><remove></remove></config-lock></request>"
 631        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
 632
 633    def add_commit_lock(self, comment=None, vsys="shared", no_exception=False) -> bool:
 634        comment = f"<comment>{comment}</comment>" if comment else ""
 635        cmd = f"<request><commit-lock><add>{comment}</add></commit-lock></request>"
 636        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
 637
 638    def remove_commit_lock(self, vsys="shared", no_exception=False) -> bool:
 639        cmd = "<request><commit-lock><remove></remove></commit-lock></request>"
 640        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
 641
 642    def set_ha_status(self, active: bool = True, target: Optional[str] = None):
 643        """
 644        Activate or Deactivate (suspend) the HA pair.
 645
 646        """
 647        status = "<functional></functional>" if active else "<suspend></suspend>"
 648        cmd = f"<request><high-availability><state>{status}</state></high-availability></request>"
 649        params = {"target": target} if target else None
 650        return self._op_request(cmd, params=params).xpath(".//result/text()")[0]
 651
 652    def set_ha_preemption(self, active=True, target=None):
 653        """
 654        NOT WORKING:
 655        There is currently no way to deactivate the preemption using the API.
 656        """
 657        raise Exception("set_ha_preemption not implementend")
 658
 659    def _raw_get_ha_info(self, state_only=False, target=None) -> Element:
 660        """
 661        Get the current state of a HA pair as a XML object.
 662        """
 663        filter = "<state></state>" if state_only else "<all></all>"
 664        cmd = f"<show><high-availability>{filter}</high-availability></show>"
 665        params = {"target": target} if target else None
 666        return self._op_request(cmd, params=params)
 667
 668    def get_ha_info(self, state_only=False, target=None) -> types.HAInfo:
 669        """
 670        Get the current state of a HA pair as a python object.
 671        """
 672        res = self._raw_get_ha_info(state_only=state_only, target=target)
 673        hainfo_xml = res.xpath(".//result")[0]
 674        # pprint(hainfo_xml)
 675        return types.HAInfo.from_xml(hainfo_xml)
 676
 677    def get_ha_pairs(
 678        self, connected=True
 679    ) -> Tuple[List[Tuple[types.Device, Optional[types.Device]]], List[types.Device]]:
 680        """
 681        Retrieve a tuple containing 2 values:
 682        1. The list of HA pairs and their members
 683        2. A list of devices that are not part of a HA pair
 684        """
 685        # Get all devices and index them using their serial number
 686        devices: List[types.Device] = self.get_devices(connected=connected)
 687        device_map = {d.serial: d for d in devices}
 688
 689        # Create the 2 lists by iterating over the devices
 690        done = set()
 691        ha_pairs = []
 692        without_ha = []
 693        for d in devices:
 694            # Do not manage twice the same device
 695            if d.serial in done:
 696                continue
 697            # The device does not have an HA peer
 698            if not d.ha_peer_serial:
 699                without_ha.append(d)
 700                done.add(d.serial)
 701                continue
 702            # Get the current device's HA peer
 703            # push them in the ha_pairs list
 704            # and mark both of them as done
 705            peer = device_map.get(d.ha_peer_serial)
 706            ha_pairs.append((d, peer))
 707            done.update((d.serial, d.ha_peer_serial))
 708        return ha_pairs, without_ha
 709
 710    def get_ha_pairs_map(self, connected=True):
 711        """
 712        Same as `get_ha_pairs`, but the ha_pairs are return as a map.
 713        This provides an easier and more readable lookup to find a pair:
 714
 715        mapping, _ = client.get_ha_pairs_map()
 716        serial = "12345"
 717        pair_of_serial = mapping[serial]
 718        """
 719        ha_pairs, without_ha = self.get_ha_pairs(connected=connected)
 720        map = {}
 721        for pair in ha_pairs:
 722            for device in pair:
 723                map[device.serial] = pair
 724        return map, without_ha
 725
 726    def get_panorama_status(self):
 727        """
 728        Get the current status of Panorama server.
 729        """
 730        cmd = "<show><panorama-status></panorama-status></show>"
 731        return self.operation(cmd).xpath(".//result")
 732
 733    def raw_get_local_panorama(self):
 734        return self.configuration(
 735            "/config/devices/entry/deviceconfig/system/panorama/local-panorama/panorama-server"
 736        )
 737
 738    def get_local_panorama_ip(self) -> Optional[str]:
 739        res = self.raw_get_local_panorama()
 740        return first(res.xpath("//panorama-server/text()"))
 741
 742    def _raw_get_devices(self, connected=False):
 743        """
 744        Return the list of device known from Panorama as a XML object.
 745        NOTE: This only works if the client is a Panorama server.
 746
 747        connected: only returns the devices that are connected
 748        """
 749        # This only works on Panorama, not the FW
 750        filter = "<connected></connected>" if connected else "<all></all>"
 751        cmd = f"<show><devices>{filter}</devices></show>"
 752        return self.operation(cmd)
 753
 754    def get_devices(self, connected=False) -> List[types.Device]:
 755        """
 756        Return the list of device known from Panorama as a python structure.
 757        NOTE: This only works if the client is a Panorama server.
 758
 759        connected: only returns the devices that are connected
 760        """
 761        res = self._raw_get_devices(connected=connected)
 762        entries = res.xpath(".//devices/entry")
 763        devices = (types.Device.from_xml(e) for e in entries)
 764        return [d for d in devices if d]
 765
 766    def _raw_get_dg_hierarchy(self):
 767        """
 768        Return the hierarchy of device groups as a XML object.
 769        """
 770        cmd = "<show><dg-hierarchy></dg-hierarchy></show>"
 771        return self.operation(cmd)
 772
 773    def get_plan_dg_hierarchy(self, recursive=False):
 774        """
 775        Return the hierarchy of device groups as a dict.
 776        The keys are the names of the device groups.
 777
 778        The values are the children device groups and depends on the recursive parameter.
 779        recursive: if False, the values are only the direct children of the device group.
 780            Otherwise, the values are all the descendant device groups.
 781        """
 782        devicegroups = {}  # name: children
 783        hierarchy = self._raw_get_dg_hierarchy().xpath(".//dg-hierarchy")[0]
 784        xpath = ".//dg" if recursive else "./dg"
 785        for dg in hierarchy.xpath(".//dg"):
 786            devicegroups[dg.attrib["name"]] = [
 787                x.attrib["name"] for x in dg.xpath(xpath)
 788            ]
 789        return devicegroups
 790
 791    def _raw_get_devicegroups(self):
 792        """
 793        Return the list of device groups as a XML object.
 794        """
 795        cmd = "<show><devicegroups></devicegroups></show>"
 796        return self.operation(cmd)
 797
 798    def get_devicegroups_name(
 799        self,
 800        parents=None,
 801        with_connected_devices=None,
 802    ):
 803        """
 804        This returns the names of the devicegroups:
 805        - parents: the returned list will only contain children of the provided parents (parents included)
 806        - with_devices: the returned list will only contain devicegroups that have direct devices under them
 807        """
 808        devicegroups = self._raw_get_devicegroups().xpath(".//devicegroups/entry")
 809        if with_connected_devices:
 810            names = [
 811                dg.attrib["name"]
 812                for dg in devicegroups
 813                if dg.xpath("./devices/entry/connected[text() = 'yes']")
 814            ]
 815        else:
 816            names = [dg.attrib["name"] for dg in devicegroups]
 817        if parents:
 818            hierarchy = self.get_plan_dg_hierarchy(recursive=True)
 819            tokeep = set(chain(*(hierarchy.get(p, []) for p in parents))) | set(parents)
 820            names = list(set(names) & tokeep)
 821        return names
 822
 823    def _raw_get_addresses(self):
 824        """
 825        Return the list of addresses known from Panorama as a XML object.
 826        NOTE: This only works if the client is a Firewall.
 827        """
 828        if self.ispanorama:
 829            return self.configuration(
 830                "/config/devices/entry/device-group/entry/address"
 831            )
 832        return self.configuration("/config/panorama/vsys//address")
 833
 834    def get_addresses(self) -> List[types.Address]:
 835        """
 836        Return the list of addresses known from Panorama as a python structure.
 837        NOTE: This only works if the client is a Firewall.
 838        """
 839        res = self._raw_get_addresses()
 840        addresses = res.xpath(".//address/entry")
 841        return [types.Address.from_xml(i) for i in addresses]
 842
 843    def _raw_get_routing_tables(self) -> Element:
 844        """
 845        Return the list of interfaces known from Panorama as a XML object.
 846        NOTE: This only works if the client is a Firewall.
 847        """
 848        return self.configuration(
 849            "/config/devices/entry/network/virtual-router/entry/routing-table"
 850        )
 851
 852    def get_routing_tables(self) -> List[types.RoutingTable]:
 853        """
 854        Return the list of interface known from Panorama as a python structure.
 855        NOTE: This only works if the client is a Firewall.
 856        """
 857        res = self._raw_get_routing_tables()
 858        routing_tables = res.xpath(".//routing-table")
 859        # print(len(routing_tables))
 860        # from pa_api.xmlapi.utils import pprint
 861        # for r in routing_tables:
 862        #     pprint(r)
 863        return [types.RoutingTable.from_xml(i) for i in routing_tables]
 864
 865    def _raw_get_interfaces(self) -> Element:
 866        """
 867        Return the list of interfaces known from Panorama as a XML object.
 868        NOTE: This only works if the client is a Firewall.
 869        """
 870        return self.configuration("/config/devices/entry/network/interface")
 871
 872    def get_interfaces(self) -> List[types.Interface]:
 873        """
 874        Return the list of interface known from Panorama as a python structure.
 875        NOTE: This only works if the client is a Firewall.
 876        """
 877        res = self._raw_get_interfaces()
 878        interfaces = res.xpath(".//interface")
 879        return [types.Interface.from_xml(i) for i in interfaces]
 880
 881    def _raw_get_zones(self) -> Element:
 882        """
 883        Return the list of zones known from Panorama as a XML object.
 884        NOTE: This only works if the client is a Firewall.
 885        """
 886        if self.ispanorama:
 887            return self.configuration(
 888                "/config/devices/entry/*/entry/config/devices/entry/vsys/entry/zone"
 889            )
 890        return self.configuration("/config/devices/entry/vsys/entry/zone")
 891
 892    def get_zones(self) -> Element:
 893        """
 894        Return the list of zones known from Panorama as a python structure.
 895        NOTE: This only works if the client is a Firewall.
 896        """
 897        res = self._raw_get_zones()
 898        zones = res.xpath(".//zone/entry")
 899        return [types.Zone.from_xml(i) for i in zones]
 900
 901    def _raw_get_templates(self, name=None) -> Element:
 902        """
 903        Return the synchronization status the templates per devices as a XML object.
 904        A device is in sync if it is up-to-date with the current version on Panorama.
 905        NOTE: This only works on Panorama.
 906        """
 907        # This only works on Panorama, not the FW
 908        filter = f"<name>{name}</name>" if name else ""
 909        cmd = f"<show><templates>{filter}</templates></show>"
 910        return self.operation(cmd)
 911
 912    def get_templates_sync_status(self):
 913        """
 914        Return the synchronization status the templates per devices
 915        A device is in sync if it is up-to-date with the current version on Panorama.
 916        NOTE: This only works on Panorama.
 917
 918        The result is a list of tuple of 3 values:
 919        1. the template's name
 920        2. the device's name
 921        3. the sync status
 922        """
 923        res = self._raw_get_templates()
 924        statuses = []
 925        for entry in res.xpath("./result/templates/entry"):
 926            template_name = entry.attrib["name"]
 927            for device in entry.xpath("./devices/entry"):
 928                device_name = device.attrib["name"]
 929                template_status = next(device.xpath("./template-status/text()"), None)
 930                status = (template_name, device_name, template_status)
 931                statuses.append(status)
 932        return statuses
 933
 934    def _raw_get_vpn_flows(self, name=None) -> List[Element]:
 935        """
 936        Returns the VPN flow information as a XML object.
 937        NOTE: This only works on Panorama server, not firewalls
 938        """
 939        # This only works on Panorama, not the FW"
 940        filter = f"<name>{name}</name>" if name else "<all></all>"
 941        cmd = f"<show><vpn><flow>{filter}</flow></vpn></show>"
 942        return self.operation(cmd)
 943
 944    def get_vpn_flows(self, name=None):
 945        """
 946        Returns the VPN flow information as a python structure.
 947        NOTE: This only works on Panorama server, not firewalls
 948        """
 949        entries = self._raw_get_vpn_flows(name=name).xpath(".//IPSec/entry")
 950        return [types.VPNFlow.from_xml(e) for e in entries]
 951
 952    def _raw_system_info(self):
 953        """
 954        Returns informations about the system as a XML object.
 955        """
 956        cmd = "<show><system><info></info></system></show>"
 957        return self.operation(cmd)
 958
 959    def system_info(self) -> types.operations.SystemInfo:
 960        """
 961        Returns informations about the system as a XML object.
 962        """
 963        xml = self._raw_system_info()
 964        info = xml.xpath("./result/system")[0]
 965        return types.operations.SystemInfo.from_xml(info)
 966
 967    def _raw_system_resources(self):
 968        """
 969        Get the system resouces as a XML object.
 970        NOTE: The string is the raw output of a `ps` command.
 971        """
 972        cmd = "<show><system><resources></resources></system></show>"
 973        return self.operation(cmd)
 974
 975    def system_resources(self):
 976        """
 977        Get the system resouces as a string.
 978        The string is the raw output of a `ps` command.
 979        """
 980        res = self._raw_system_resources()
 981        text = res.xpath(".//result/text()")[0]
 982        return text.split("\n\n")[0]
 983
 984    def _raw_download_software(self, version):
 985        """
 986        Download the software version on the device.
 987        version: the software version to download
 988        """
 989        cmd = f"<request><system><software><download><version>{version}</version></download></software></system></request>"
 990        return self.operation(cmd)
 991
 992    def download_software(self, version) -> Optional[str]:
 993        """
 994        Download the software version on the device.
 995        version: the software version to download
 996
 997        Returns the download job's ID in case of successful launch,
 998        None is returned otherwise.
 999        """
1000        res = self._raw_download_software(version)
1001        try:
1002            return res.xpath(".//job/text()")[0]
1003        except Exception:
1004            self.logger.debug("Download has not started")
1005        return None
1006
1007    def _raw_install_software(self, version):
1008        """
1009        Install the software version on the device.
1010        version: the software version to install
1011        """
1012        cmd = f"<request><system><software><install><version>{version}</version></install></software></system></request>"
1013        return self.operation(cmd)
1014
1015    def install_software(
1016        self, version: Union[None, str, types.SoftwareVersion]
1017    ) -> Optional[str]:
1018        """
1019        Install the software version on the device.
1020        version: the software version to install
1021
1022        Returns the download job's ID in case of successful launch,
1023        None is returned otherwise.
1024        """
1025        if isinstance(version, types.SoftwareVersion):
1026            version = version.version
1027        res = self._raw_install_software(version)
1028        try:
1029            return res.xpath(".//job/text()")[0]
1030        except Exception:
1031            self.logger.debug("Download has not started")
1032        return None
1033
1034    def _raw_restart(self):
1035        """
1036        Restart the device
1037        """
1038        cmd = "<request><restart><system></system></restart></request>"
1039        return self.operation(cmd)
1040
1041    def restart(self):
1042        """
1043        Restart the device
1044        """
1045        return "".join(self._raw_restart().xpath(".//result/text()"))
1046
1047    def automatic_download_software(
1048        self, version: Optional[str] = None
1049    ) -> types.SoftwareVersion:
1050        """
1051        Automatically download the requested software version.
1052        if the version is not provided, it defaults to the latest one.
1053
1054        NOTE: This does not do the installation.
1055        This is usefull to download in anticipation of the upgrade.
1056        For automatic install, see `automatic_software_upgrade`
1057        """
1058        version_str = version
1059        try:
1060            versions = self.get_versions()
1061        except ServerError:
1062            raise Exception(
1063                "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers."
1064            )
1065        sw_version = None
1066        if not version_str:
1067            sw_version = next((v for v in versions if v.latest), None)
1068        else:
1069            sw_version = next((v for v in versions if v.version == version_str), None)
1070        if not sw_version:
1071            self.logger.error(f"Version {version_str} not found")
1072            return exit(1)
1073
1074        # Already downloaded: Nothing to do
1075        if sw_version.downloaded:
1076            self.logger.info(f"Version {sw_version.version} already downloaded")
1077            return sw_version
1078
1079        # Download minor version first (required)
1080        base_version = next(
1081            v for v in versions if v.version == sw_version.base_minor_version
1082        )
1083        if not base_version.downloaded:
1084            self.logger.info(
1085                f"Launching download of minor version {base_version.version}"
1086            )
1087            job_id = self.download_software(base_version.version)
1088            if not job_id:
1089                raise Exception("Download has not started")
1090            job = self.wait_job_completion(job_id)
1091            if job.result != "OK":
1092                self.logger.debug(job)
1093                raise Exception(job.details)
1094            print(job.details)
1095
1096        # Actually download the wanted version
1097        self.logger.info(f"Launching download of version {sw_version.version}")
1098        job_id = self.download_software(sw_version.version)
1099        if not job_id:
1100            raise Exception("Download has not started")
1101        job = self.wait_job_completion(job_id)
1102        if job.result != "OK":
1103            self.logger.debug(job)
1104            raise Exception(job.details)
1105        self.logger.info(job.details)
1106        return sw_version
1107
1108    def automatic_software_upgrade(
1109        self,
1110        version: Optional[str] = None,
1111        install: bool = True,
1112        restart: bool = True,
1113        suspend: bool = False,
1114    ):
1115        """
1116        Automatically download and install the requested software version.
1117        if the version is not provided, it defaults to the latest one.
1118
1119        NOTE: This does the software install and restart by default.
1120        If you only want to download, prefer to use `automatic_download_software` method,
1121        or set install=False. See the parameters for more information.
1122
1123        install: install the software after the download
1124        restart: restart the device after the installation. This option is ignored if install=False
1125
1126        """
1127        sw_version = self.automatic_download_software(version)
1128        if sw_version.current:
1129            self.logger.info(f"Version {sw_version.version} is already installed")
1130            return sw_version
1131        if not install:
1132            return sw_version
1133        # We may get the following error:
1134        # "Error: Upgrading from 10.2.4-h10 to 11.1.2 requires a content version of 8761 or greater and found 8638-7689."
1135        # This should never happen, we decided to report the error and handle this manually
1136        self.logger.info(f"Launching install of version {sw_version.version}")
1137
1138        with self.suspended(suspend):
1139            job_id = self.install_software(sw_version.version)
1140            if not job_id:
1141                self.logger.error("Install has not started")
1142                raise Exception("Install has not started")
1143            job = self.wait_job_completion(job_id)
1144            self.logger.info(job.details)
1145
1146            # Do not restart if install failed
1147            if job.result != "OK":
1148                self.logger.error("Failed to install software version")
1149                return sw_version
1150
1151            if restart:
1152                self.logger.info("Restarting the device")
1153                restart_response = self.restart()
1154                self.logger.info(restart_response)
1155            return sw_version
1156
1157    @contextmanager
1158    def suspended(self, suspend=True):
1159        if not suspend:
1160            try:
1161                yield self
1162            finally:
1163                return
1164        self.logger.info("Suspending device")
1165        self.set_ha_status(active=False)
1166        try:
1167            yield self
1168        finally:
1169            self.check_availability()
1170            self.logger.info("Unsuspending device...")
1171            for _ in range(3):
1172                try:
1173                    self.set_ha_status(active=True)
1174                    break
1175                except Exception as e:
1176                    self.logger.error(e)
1177                    # time.sleep()
1178            else:
1179                raise UnsuspendError("Failed to unsuspend device")
1180            self.logger.info("Device successfully unsuspended")
1181
1182    def wait_availability(self):
1183        logger = self.logger
1184        logger.info("Checking availability. Waiting for response...")
1185        max_duration = 1800
1186        for duration in wait(duration=max_duration):
1187            try:
1188                versions = self.get_versions()
1189                current = next(v for v in versions if v.current)
1190                if current is None:
1191                    logger.warning("Device is not not answering")
1192                else:
1193                    logger.info(f"Device responded after {duration} seconds")
1194                    return current
1195            except (
1196                urllib3.exceptions.MaxRetryError,
1197                requests.exceptions.ConnectionError,
1198            ):
1199                logger.warning("Firewall still not responding")
1200            except ServerError:
1201                raise Exception(
1202                    "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers."
1203                )
1204            except Exception as e:
1205                logger.debug(f"Unexpected error of type {type(e)} occured on firewall")
1206                logger.error(f"Firewall is still not responding: {e}")
1207        raise Exception(
1208            f"Timeout while waiting for availability of firewall. Waited for {max_duration} seconds"
1209        )
1210
1211    def check_availability(self):
1212        logger = self.logger
1213        ## Wait for the FW to respond
1214        version = self.wait_availability()
1215        if not version:
1216            logger.error("Device never responded")
1217            return False
1218        logger.info(f"Firewall is available on version {version.version}")
1219        return True
XMLApi( host=None, api_key=None, ispanorama=None, target=None, verify=False, timeout=None, logger=None)
26    def __init__(
27        self,
28        host=None,
29        api_key=None,
30        ispanorama=None,
31        target=None,
32        verify=False,
33        timeout=None,
34        logger=None,
35    ):
36        env_host, env_apikey = get_credentials_from_env()
37        host = host or env_host
38        api_key = api_key or env_apikey
39        if not host:
40            raise Exception("Missing Host")
41        if not api_key:
42            raise Exception("Missing API Key")
43        host, _, _ = clean_url_host(host)
44
45        default_params = {}
46        if target:
47            default_params["target"] = target
48
49        self._host = host
50        self._api_key = api_key
51        self._url = f"{host}/api"
52        self._verify = verify
53        self._timeout = timeout
54        self._ispanorama = ispanorama
55        self._default_params = default_params
56        self.logger = logger or logging
logger
def export_configuration( self, verify=None, timeout=None) -> <cyfunction Element at 0x7f4bd79982b0>:
115    def export_configuration(
116        self,
117        verify=None,
118        timeout=None,
119    ) -> Element:
120        return self._export_request(
121            category="configuration",
122            verify=verify,
123            timeout=timeout,
124        )
def export_device_state( self, verify=None, timeout=None) -> <cyfunction Element at 0x7f4bd79982b0>:
126    def export_device_state(
127        self,
128        verify=None,
129        timeout=None,
130    ) -> Element:
131        return self._export_request(
132            category="device-state",
133            verify=verify,
134            timeout=timeout,
135        )
def generate_apikey(self, username, password: str) -> str:
206    def generate_apikey(self, username, password: str) -> str:
207        """
208        Generate a new API-Key for the user connected.
209        """
210        params = {"user": username, "password": password}
211        return self._request(
212            "keygen",
213            method="POST",
214            params=params,
215        ).xpath(".//key/text()")[0]

Generate a new API-Key for the user connected.

def api_version(self):
218    def api_version(self):
219        return el2dict(
220            self._request(
221                "version",
222                method="POST",
223            ).xpath(".//result")[0]
224        )["result"]
def configuration( self, xpath, action='get', method='GET', params=None, remove_blank_text=True):
226    def configuration(
227        self,
228        xpath,
229        action="get",
230        method="GET",
231        params=None,
232        remove_blank_text=True,
233    ):
234        return self._conf_request(
235            xpath,
236            action=action,
237            method=method,
238            params=params,
239            remove_blank_text=remove_blank_text,
240        )
def operation(self, cmd, method='POST', params=None, remove_blank_text=True):
242    def operation(
243        self,
244        cmd,
245        method="POST",
246        params=None,
247        remove_blank_text=True,
248    ):
249        return self._op_request(
250            cmd,
251            method=method,
252            params=params,
253            remove_blank_text=remove_blank_text,
254        )
ispanorama
263    @property
264    def ispanorama(self):
265        if self._ispanorama is None:
266            self._ispanorama = self._check_is_panorama()
267        return self._ispanorama
def get_tree(self, extended=False) -> <cyfunction Element at 0x7f4bd79982b0>:
269    def get_tree(self, extended=False) -> Element:
270        """
271        Return the running configuration
272        The differences with `running_config` are not known
273        """
274        tree = get_tree(
275            self._host, self._api_key, verify=self._verify, logger=self.logger
276        )
277        if extended:
278            self._extend_tree_information(tree)
279        return tree

Return the running configuration The differences with running_config are not known

def get_rule_use(self, tree=None, max_threads: Optional[int] = None):
298    def get_rule_use(self, tree=None, max_threads: Optional[int] = None):
299        if tree is None:
300            tree = self.get_tree()
301        device_groups = tree.xpath("devices/*/device-group/*/@name")
302        positions = ("pre", "post")
303        # rule_types = tuple({x.tag for x in tree.xpath(
304        # "devices/*/device-group/*"
305        # "/*[self::post-rulebase or self::pre-rulebase]/*")})
306        rule_types = ("security", "pbf", "nat", "application-override")
307        args_list = list(product(device_groups, positions, rule_types))
308
309        def func(args):
310            return self._get_rule_use(*args)
311
312        threads = len(args_list)
313        threads = min(max_threads or threads, threads)
314        with Pool(len(args_list)) as pool:
315            data = pool.map(func, args_list)
316        return [entry for entry_list in data for entry in entry_list]
def get_rule_hit_count(self, tree=None, max_threads=None):
330    def get_rule_hit_count(self, tree=None, max_threads=None):
331        if tree is None:
332            tree = self.get_tree()
333        device_groups = tree.xpath("devices/*/device-group/*/@name")
334        rulebases = ("pre-rulebase", "post-rulebase")
335        rule_types = ("security", "pbf", "nat", "application-override")
336        args_list = list(product(device_groups, rulebases, rule_types))
337
338        def func(args):
339            return self._get_rule_hit_count(*args)
340
341        threads = len(args_list)
342        threads = min(max_threads or threads, threads)
343        with Pool(len(args_list)) as pool:
344            data = pool.map(func, args_list)
345        return [entry for entry_list in data for entry in entry_list]
def get(self, xpath: str):
372    def get(self, xpath: str):
373        """
374        This will retrieve the xml definition based on the xpath
375        The xpath doesn't need to be exact
376        and can select multiple values at once.
377        Still, it must at least speciy /config at is begining
378        """
379        return self._conf_request(xpath, action="show", method="GET")

This will retrieve the xml definition based on the xpath The xpath doesn't need to be exact and can select multiple values at once. Still, it must at least speciy /config at is begining

def delete(self, xpath: str):
381    def delete(self, xpath: str):
382        """
383        This will REMOVE the xml definition at the provided xpath.
384        The xpath must be exact.
385        """
386        return self._conf_request(
387            xpath,
388            action="delete",
389            method="DELETE",
390        )

This will REMOVE the xml definition at the provided xpath. The xpath must be exact.

def create(self, xpath: str, xml_definition):
392    def create(self, xpath: str, xml_definition):
393        """
394        This will ADD the xml definition
395        INSIDE the element at the provided xpath.
396        The xpath must be exact.
397        """
398        # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration
399        params = {"element": xml_definition}
400        return self._conf_request(
401            xpath,
402            action="set",
403            method="POST",
404            params=params,
405        )

This will ADD the xml definition INSIDE the element at the provided xpath. The xpath must be exact.

def update(self, xpath: str, xml_definition):
407    def update(self, xpath: str, xml_definition):
408        """
409        This will REPLACE the xml definition
410        INSTEAD of the element at the provided xpath
411        The xpath must be exact.
412        Nb: We can pull the whole config, update it locally,
413        and push the final result
414        """
415        # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration
416        params = {"element": xml_definition}
417        return self._conf_request(
418            xpath,
419            action="edit",
420            method="POST",
421            params=params,
422        )

This will REPLACE the xml definition INSTEAD of the element at the provided xpath The xpath must be exact. Nb: We can pull the whole config, update it locally, and push the final result

def revert_changes(self, skip_validated: bool = False):
424    def revert_changes(self, skip_validated: bool = False):
425        """
426        Revert all the changes made on Panorama.
427        NOTE:
428        - This only applies on non-commited changes.
429        - This revert everything (not scoped by users)
430
431        skip_validated: Do not revert changes that were validated
432        """
433        skip = "<skip-validate>yes</skip-validate>" if skip_validated else ""
434        cmd = f"<revert><config>{skip}</config></revert>"
435        return self._op_request(cmd)

Revert all the changes made on Panorama. NOTE:

  • This only applies on non-commited changes.
  • This revert everything (not scoped by users)

skip_validated: Do not revert changes that were validated

def validate_changes(self):
437    def validate_changes(self):
438        """
439        Validated all the changes currently made
440        """
441        cmd = "<validate><full></full></validate>"
442        return self._op_request(cmd)

Validated all the changes currently made

def get_push_scope_devicegroups(self, admin=None):
453    def get_push_scope_devicegroups(self, admin=None):
454        """
455        Gives detailed information about pending changes
456        (e.g. xpath, owner, action, ...)
457        """
458        scope = self._raw_get_push_scope(admin=admin)
459        return list(set(scope.xpath(".//objects/entry[@loc-type='device-group']/@loc")))

Gives detailed information about pending changes (e.g. xpath, owner, action, ...)

def uncommited_changes(self):
461    def uncommited_changes(self):
462        """
463        Gives detailed information about pending changes
464        (e.g. xpath, owner, action, ...)
465        """
466        cmd = "<show><config><list><changes></changes></list></config></show>"
467        return self._op_request(cmd)

Gives detailed information about pending changes (e.g. xpath, owner, action, ...)

def uncommited_changes_summary(self, admin=None):
469    def uncommited_changes_summary(self, admin=None):
470        """
471        Only gives the concern device groups
472        """
473        admin = (
474            f"<partial><admin><member>{admin}</member></admin></partial>"
475            if admin
476            else ""
477        )
478        cmd = f"<show><config><list><change-summary>{admin}</change-summary></list></config></show>"
479        return self._op_request(cmd)

Only gives the concern device groups

def pending_changes(self):
481    def pending_changes(self):
482        """
483        Result content is either 'yes' or 'no'
484        """
485        cmd = "<check><pending-changes></pending-changes></check>"
486        return self._op_request(cmd)

Result content is either 'yes' or 'no'

def save_config(self, name):
488    def save_config(self, name):
489        """
490        Create a named snapshot of the current configuration
491        """
492        cmd = f"<save><config><to>{name}</to></config></save>"
493        return "\n".join(self._op_request(cmd).xpath(".//result/text()"))

Create a named snapshot of the current configuration

def save_device_state(self):
495    def save_device_state(self):
496        """
497        Create a snapshot of the current device state
498        """
499        cmd = "<save><device-state></device-state></save>"
500        return "\n".join(self._op_request(cmd).xpath(".//result/text()"))

Create a snapshot of the current device state

def get_named_configuration(self, name):
502    def get_named_configuration(self, name):
503        """
504        Get the configuration from a named snapshot as an XML object
505        """
506        cmd = f"<show><config><saved>{name}</saved></config></show>"
507        return self._op_request(cmd, remove_blank_text=False).xpath("./result/config")[
508            0
509        ]

Get the configuration from a named snapshot as an XML object

def candidate_config(self) -> <cyfunction Element at 0x7f4bd79982b0>:
511    def candidate_config(self) -> Element:
512        """
513        Get the configuration to be commited as an XML object
514        """
515        cmd = "<show><config><candidate></candidate></config></show>"
516        return self._op_request(cmd, remove_blank_text=False)

Get the configuration to be commited as an XML object

def running_config(self) -> <cyfunction Element at 0x7f4bd79982b0>:
518    def running_config(self) -> Element:
519        """
520        Get the current running configuration as an XML object
521        """
522        cmd = "<show><config><running></running></config></show>"
523        return self._op_request(cmd, remove_blank_text=False)

Get the current running configuration as an XML object

def get_jobs( self, job_ids: Union[NoneType, str, List[str]] = None) -> List[pa_api.xmlapi.types.operations.job.Job]:
540    def get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> List[types.Job]:
541        """
542        Get information of job(s)
543        Retrieve all jobs by default.
544
545        If job_id is provided, then only retrieve the job requested.
546        """
547        job_xmls = self._raw_get_jobs(job_ids).xpath(".//job")
548        transformed = (types.Job.from_xml(x) for x in job_xmls)
549        return [j for j in transformed if j]

Get information of job(s) Retrieve all jobs by default.

If job_id is provided, then only retrieve the job requested.

def get_job(self, job_id) -> pa_api.xmlapi.types.operations.job.Job:
551    def get_job(self, job_id) -> types.Job:
552        """
553        Get information of job(s)
554        Retrieve all jobs by default.
555
556        If job_id is provided, then only retrieve the job requested.
557        """
558        return self.get_jobs(job_id)[0]

Get information of job(s) Retrieve all jobs by default.

If job_id is provided, then only retrieve the job requested.

def get_versions(self) -> List[pa_api.xmlapi.types.operations.software.SoftwareVersion]:
567    def get_versions(self) -> List[types.SoftwareVersion]:
568        """
569        Get the versions informations
570        """
571        res = self._raw_get_versions()
572        return [
573            types.SoftwareVersion.from_xml(entry)
574            for entry in res.xpath(".//sw-updates/versions/entry")
575        ]

Get the versions informations

def wait_job_completion(self, job_id: str, waiter=None) -> pa_api.xmlapi.types.operations.job.Job:
577    def wait_job_completion(self, job_id: str, waiter=None) -> types.Job:
578        """
579        Block until the job complete.
580
581        job_id: the job to wait upon
582        waiter: a generator that yield when a new query must be done.
583                see `wait` function (the default waiter) for an example
584        """
585        if not waiter:
586            waiter = wait()
587        for _ in waiter:
588            job = self.get_job(job_id)
589            if job.progress >= 100:
590                return job
591            self.logger.info(f"Job {job_id} progress: {job.progress}")
592        raise Exception("Timeout while waiting for job completion")

Block until the job complete.

job_id: the job to wait upon waiter: a generator that yield when a new query must be done. see wait function (the default waiter) for an example

def raw_get_pending_jobs(self):
594    def raw_get_pending_jobs(self):
595        """
596        Get all the jobs that are pending as a XML object
597        """
598        cmd = "<show><jobs><pending></pending></jobs></show>"
599        return self._op_request(cmd)

Get all the jobs that are pending as a XML object

def commit_changes(self, force: bool = False):
601    def commit_changes(self, force: bool = False):
602        """
603        Commit all changes
604        """
605        cmd = "<commit>{}</commit>".format("<force></force>" if force else "")
606        return self._commit_request(cmd)

Commit all changes

def add_config_lock(self, comment=None, vsys='shared', no_exception=False) -> bool:
624    def add_config_lock(self, comment=None, vsys="shared", no_exception=False) -> bool:
625        comment = f"<comment>{comment}</comment>" if comment else ""
626        cmd = f"<request><config-lock><add>{comment}</add></config-lock></request>"
627        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
def remove_config_lock(self, vsys='shared', no_exception=False) -> bool:
629    def remove_config_lock(self, vsys="shared", no_exception=False) -> bool:
630        cmd = "<request><config-lock><remove></remove></config-lock></request>"
631        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
def add_commit_lock(self, comment=None, vsys='shared', no_exception=False) -> bool:
633    def add_commit_lock(self, comment=None, vsys="shared", no_exception=False) -> bool:
634        comment = f"<comment>{comment}</comment>" if comment else ""
635        cmd = f"<request><commit-lock><add>{comment}</add></commit-lock></request>"
636        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
def remove_commit_lock(self, vsys='shared', no_exception=False) -> bool:
638    def remove_commit_lock(self, vsys="shared", no_exception=False) -> bool:
639        cmd = "<request><commit-lock><remove></remove></commit-lock></request>"
640        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
def set_ha_status(self, active: bool = True, target: Optional[str] = None):
642    def set_ha_status(self, active: bool = True, target: Optional[str] = None):
643        """
644        Activate or Deactivate (suspend) the HA pair.
645
646        """
647        status = "<functional></functional>" if active else "<suspend></suspend>"
648        cmd = f"<request><high-availability><state>{status}</state></high-availability></request>"
649        params = {"target": target} if target else None
650        return self._op_request(cmd, params=params).xpath(".//result/text()")[0]

Activate or Deactivate (suspend) the HA pair.

def set_ha_preemption(self, active=True, target=None):
652    def set_ha_preemption(self, active=True, target=None):
653        """
654        NOT WORKING:
655        There is currently no way to deactivate the preemption using the API.
656        """
657        raise Exception("set_ha_preemption not implementend")

NOT WORKING: There is currently no way to deactivate the preemption using the API.

def get_ha_info( self, state_only=False, target=None) -> pa_api.xmlapi.types.operations.ha.HAInfo:
668    def get_ha_info(self, state_only=False, target=None) -> types.HAInfo:
669        """
670        Get the current state of a HA pair as a python object.
671        """
672        res = self._raw_get_ha_info(state_only=state_only, target=target)
673        hainfo_xml = res.xpath(".//result")[0]
674        # pprint(hainfo_xml)
675        return types.HAInfo.from_xml(hainfo_xml)

Get the current state of a HA pair as a python object.

def get_ha_pairs( self, connected=True) -> Tuple[List[Tuple[pa_api.xmlapi.types.operations.device.Device, Optional[pa_api.xmlapi.types.operations.device.Device]]], List[pa_api.xmlapi.types.operations.device.Device]]:
677    def get_ha_pairs(
678        self, connected=True
679    ) -> Tuple[List[Tuple[types.Device, Optional[types.Device]]], List[types.Device]]:
680        """
681        Retrieve a tuple containing 2 values:
682        1. The list of HA pairs and their members
683        2. A list of devices that are not part of a HA pair
684        """
685        # Get all devices and index them using their serial number
686        devices: List[types.Device] = self.get_devices(connected=connected)
687        device_map = {d.serial: d for d in devices}
688
689        # Create the 2 lists by iterating over the devices
690        done = set()
691        ha_pairs = []
692        without_ha = []
693        for d in devices:
694            # Do not manage twice the same device
695            if d.serial in done:
696                continue
697            # The device does not have an HA peer
698            if not d.ha_peer_serial:
699                without_ha.append(d)
700                done.add(d.serial)
701                continue
702            # Get the current device's HA peer
703            # push them in the ha_pairs list
704            # and mark both of them as done
705            peer = device_map.get(d.ha_peer_serial)
706            ha_pairs.append((d, peer))
707            done.update((d.serial, d.ha_peer_serial))
708        return ha_pairs, without_ha

Retrieve a tuple containing 2 values:

  1. The list of HA pairs and their members
  2. A list of devices that are not part of a HA pair
def get_ha_pairs_map(self, connected=True):
710    def get_ha_pairs_map(self, connected=True):
711        """
712        Same as `get_ha_pairs`, but the ha_pairs are return as a map.
713        This provides an easier and more readable lookup to find a pair:
714
715        mapping, _ = client.get_ha_pairs_map()
716        serial = "12345"
717        pair_of_serial = mapping[serial]
718        """
719        ha_pairs, without_ha = self.get_ha_pairs(connected=connected)
720        map = {}
721        for pair in ha_pairs:
722            for device in pair:
723                map[device.serial] = pair
724        return map, without_ha

Same as get_ha_pairs, but the ha_pairs are return as a map. This provides an easier and more readable lookup to find a pair:

mapping, _ = client.get_ha_pairs_map() serial = "12345" pair_of_serial = mapping[serial]

def get_panorama_status(self):
726    def get_panorama_status(self):
727        """
728        Get the current status of Panorama server.
729        """
730        cmd = "<show><panorama-status></panorama-status></show>"
731        return self.operation(cmd).xpath(".//result")

Get the current status of Panorama server.

def raw_get_local_panorama(self):
733    def raw_get_local_panorama(self):
734        return self.configuration(
735            "/config/devices/entry/deviceconfig/system/panorama/local-panorama/panorama-server"
736        )
def get_local_panorama_ip(self) -> Optional[str]:
738    def get_local_panorama_ip(self) -> Optional[str]:
739        res = self.raw_get_local_panorama()
740        return first(res.xpath("//panorama-server/text()"))
def get_devices( self, connected=False) -> List[pa_api.xmlapi.types.operations.device.Device]:
754    def get_devices(self, connected=False) -> List[types.Device]:
755        """
756        Return the list of device known from Panorama as a python structure.
757        NOTE: This only works if the client is a Panorama server.
758
759        connected: only returns the devices that are connected
760        """
761        res = self._raw_get_devices(connected=connected)
762        entries = res.xpath(".//devices/entry")
763        devices = (types.Device.from_xml(e) for e in entries)
764        return [d for d in devices if d]

Return the list of device known from Panorama as a python structure. NOTE: This only works if the client is a Panorama server.

connected: only returns the devices that are connected

def get_plan_dg_hierarchy(self, recursive=False):
773    def get_plan_dg_hierarchy(self, recursive=False):
774        """
775        Return the hierarchy of device groups as a dict.
776        The keys are the names of the device groups.
777
778        The values are the children device groups and depends on the recursive parameter.
779        recursive: if False, the values are only the direct children of the device group.
780            Otherwise, the values are all the descendant device groups.
781        """
782        devicegroups = {}  # name: children
783        hierarchy = self._raw_get_dg_hierarchy().xpath(".//dg-hierarchy")[0]
784        xpath = ".//dg" if recursive else "./dg"
785        for dg in hierarchy.xpath(".//dg"):
786            devicegroups[dg.attrib["name"]] = [
787                x.attrib["name"] for x in dg.xpath(xpath)
788            ]
789        return devicegroups

Return the hierarchy of device groups as a dict. The keys are the names of the device groups.

The values are the children device groups and depends on the recursive parameter. recursive: if False, the values are only the direct children of the device group. Otherwise, the values are all the descendant device groups.

def get_devicegroups_name(self, parents=None, with_connected_devices=None):
798    def get_devicegroups_name(
799        self,
800        parents=None,
801        with_connected_devices=None,
802    ):
803        """
804        This returns the names of the devicegroups:
805        - parents: the returned list will only contain children of the provided parents (parents included)
806        - with_devices: the returned list will only contain devicegroups that have direct devices under them
807        """
808        devicegroups = self._raw_get_devicegroups().xpath(".//devicegroups/entry")
809        if with_connected_devices:
810            names = [
811                dg.attrib["name"]
812                for dg in devicegroups
813                if dg.xpath("./devices/entry/connected[text() = 'yes']")
814            ]
815        else:
816            names = [dg.attrib["name"] for dg in devicegroups]
817        if parents:
818            hierarchy = self.get_plan_dg_hierarchy(recursive=True)
819            tokeep = set(chain(*(hierarchy.get(p, []) for p in parents))) | set(parents)
820            names = list(set(names) & tokeep)
821        return names

This returns the names of the devicegroups:

  • parents: the returned list will only contain children of the provided parents (parents included)
  • with_devices: the returned list will only contain devicegroups that have direct devices under them
def get_addresses(self) -> List[pa_api.xmlapi.types.config.address.Address]:
834    def get_addresses(self) -> List[types.Address]:
835        """
836        Return the list of addresses known from Panorama as a python structure.
837        NOTE: This only works if the client is a Firewall.
838        """
839        res = self._raw_get_addresses()
840        addresses = res.xpath(".//address/entry")
841        return [types.Address.from_xml(i) for i in addresses]

Return the list of addresses known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.

def get_routing_tables( self) -> List[pa_api.xmlapi.types.config.routing.routing_table.RoutingTable]:
852    def get_routing_tables(self) -> List[types.RoutingTable]:
853        """
854        Return the list of interface known from Panorama as a python structure.
855        NOTE: This only works if the client is a Firewall.
856        """
857        res = self._raw_get_routing_tables()
858        routing_tables = res.xpath(".//routing-table")
859        # print(len(routing_tables))
860        # from pa_api.xmlapi.utils import pprint
861        # for r in routing_tables:
862        #     pprint(r)
863        return [types.RoutingTable.from_xml(i) for i in routing_tables]

Return the list of interface known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.

def get_interfaces(self) -> List[pa_api.xmlapi.types.config.interface.Interface]:
872    def get_interfaces(self) -> List[types.Interface]:
873        """
874        Return the list of interface known from Panorama as a python structure.
875        NOTE: This only works if the client is a Firewall.
876        """
877        res = self._raw_get_interfaces()
878        interfaces = res.xpath(".//interface")
879        return [types.Interface.from_xml(i) for i in interfaces]

Return the list of interface known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.

def get_zones(self) -> <cyfunction Element at 0x7f4bd79982b0>:
892    def get_zones(self) -> Element:
893        """
894        Return the list of zones known from Panorama as a python structure.
895        NOTE: This only works if the client is a Firewall.
896        """
897        res = self._raw_get_zones()
898        zones = res.xpath(".//zone/entry")
899        return [types.Zone.from_xml(i) for i in zones]

Return the list of zones known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.

def get_templates_sync_status(self):
912    def get_templates_sync_status(self):
913        """
914        Return the synchronization status the templates per devices
915        A device is in sync if it is up-to-date with the current version on Panorama.
916        NOTE: This only works on Panorama.
917
918        The result is a list of tuple of 3 values:
919        1. the template's name
920        2. the device's name
921        3. the sync status
922        """
923        res = self._raw_get_templates()
924        statuses = []
925        for entry in res.xpath("./result/templates/entry"):
926            template_name = entry.attrib["name"]
927            for device in entry.xpath("./devices/entry"):
928                device_name = device.attrib["name"]
929                template_status = next(device.xpath("./template-status/text()"), None)
930                status = (template_name, device_name, template_status)
931                statuses.append(status)
932        return statuses

Return the synchronization status the templates per devices A device is in sync if it is up-to-date with the current version on Panorama. NOTE: This only works on Panorama.

The result is a list of tuple of 3 values:

  1. the template's name
  2. the device's name
  3. the sync status
def get_vpn_flows(self, name=None):
944    def get_vpn_flows(self, name=None):
945        """
946        Returns the VPN flow information as a python structure.
947        NOTE: This only works on Panorama server, not firewalls
948        """
949        entries = self._raw_get_vpn_flows(name=name).xpath(".//IPSec/entry")
950        return [types.VPNFlow.from_xml(e) for e in entries]

Returns the VPN flow information as a python structure. NOTE: This only works on Panorama server, not firewalls

def system_info(self) -> pa_api.xmlapi.types.operations.system.SystemInfo:
959    def system_info(self) -> types.operations.SystemInfo:
960        """
961        Returns informations about the system as a XML object.
962        """
963        xml = self._raw_system_info()
964        info = xml.xpath("./result/system")[0]
965        return types.operations.SystemInfo.from_xml(info)

Returns informations about the system as a XML object.

def system_resources(self):
975    def system_resources(self):
976        """
977        Get the system resouces as a string.
978        The string is the raw output of a `ps` command.
979        """
980        res = self._raw_system_resources()
981        text = res.xpath(".//result/text()")[0]
982        return text.split("\n\n")[0]

Get the system resouces as a string. The string is the raw output of a ps command.

def download_software(self, version) -> Optional[str]:
 992    def download_software(self, version) -> Optional[str]:
 993        """
 994        Download the software version on the device.
 995        version: the software version to download
 996
 997        Returns the download job's ID in case of successful launch,
 998        None is returned otherwise.
 999        """
1000        res = self._raw_download_software(version)
1001        try:
1002            return res.xpath(".//job/text()")[0]
1003        except Exception:
1004            self.logger.debug("Download has not started")
1005        return None

Download the software version on the device. version: the software version to download

Returns the download job's ID in case of successful launch, None is returned otherwise.

def install_software( self, version: Union[NoneType, str, pa_api.xmlapi.types.operations.software.SoftwareVersion]) -> Optional[str]:
1015    def install_software(
1016        self, version: Union[None, str, types.SoftwareVersion]
1017    ) -> Optional[str]:
1018        """
1019        Install the software version on the device.
1020        version: the software version to install
1021
1022        Returns the download job's ID in case of successful launch,
1023        None is returned otherwise.
1024        """
1025        if isinstance(version, types.SoftwareVersion):
1026            version = version.version
1027        res = self._raw_install_software(version)
1028        try:
1029            return res.xpath(".//job/text()")[0]
1030        except Exception:
1031            self.logger.debug("Download has not started")
1032        return None

Install the software version on the device. version: the software version to install

Returns the download job's ID in case of successful launch, None is returned otherwise.

def restart(self):
1041    def restart(self):
1042        """
1043        Restart the device
1044        """
1045        return "".join(self._raw_restart().xpath(".//result/text()"))

Restart the device

def automatic_download_software( self, version: Optional[str] = None) -> pa_api.xmlapi.types.operations.software.SoftwareVersion:
1047    def automatic_download_software(
1048        self, version: Optional[str] = None
1049    ) -> types.SoftwareVersion:
1050        """
1051        Automatically download the requested software version.
1052        if the version is not provided, it defaults to the latest one.
1053
1054        NOTE: This does not do the installation.
1055        This is usefull to download in anticipation of the upgrade.
1056        For automatic install, see `automatic_software_upgrade`
1057        """
1058        version_str = version
1059        try:
1060            versions = self.get_versions()
1061        except ServerError:
1062            raise Exception(
1063                "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers."
1064            )
1065        sw_version = None
1066        if not version_str:
1067            sw_version = next((v for v in versions if v.latest), None)
1068        else:
1069            sw_version = next((v for v in versions if v.version == version_str), None)
1070        if not sw_version:
1071            self.logger.error(f"Version {version_str} not found")
1072            return exit(1)
1073
1074        # Already downloaded: Nothing to do
1075        if sw_version.downloaded:
1076            self.logger.info(f"Version {sw_version.version} already downloaded")
1077            return sw_version
1078
1079        # Download minor version first (required)
1080        base_version = next(
1081            v for v in versions if v.version == sw_version.base_minor_version
1082        )
1083        if not base_version.downloaded:
1084            self.logger.info(
1085                f"Launching download of minor version {base_version.version}"
1086            )
1087            job_id = self.download_software(base_version.version)
1088            if not job_id:
1089                raise Exception("Download has not started")
1090            job = self.wait_job_completion(job_id)
1091            if job.result != "OK":
1092                self.logger.debug(job)
1093                raise Exception(job.details)
1094            print(job.details)
1095
1096        # Actually download the wanted version
1097        self.logger.info(f"Launching download of version {sw_version.version}")
1098        job_id = self.download_software(sw_version.version)
1099        if not job_id:
1100            raise Exception("Download has not started")
1101        job = self.wait_job_completion(job_id)
1102        if job.result != "OK":
1103            self.logger.debug(job)
1104            raise Exception(job.details)
1105        self.logger.info(job.details)
1106        return sw_version

Automatically download the requested software version. if the version is not provided, it defaults to the latest one.

NOTE: This does not do the installation. This is usefull to download in anticipation of the upgrade. For automatic install, see automatic_software_upgrade

def automatic_software_upgrade( self, version: Optional[str] = None, install: bool = True, restart: bool = True, suspend: bool = False):
1108    def automatic_software_upgrade(
1109        self,
1110        version: Optional[str] = None,
1111        install: bool = True,
1112        restart: bool = True,
1113        suspend: bool = False,
1114    ):
1115        """
1116        Automatically download and install the requested software version.
1117        if the version is not provided, it defaults to the latest one.
1118
1119        NOTE: This does the software install and restart by default.
1120        If you only want to download, prefer to use `automatic_download_software` method,
1121        or set install=False. See the parameters for more information.
1122
1123        install: install the software after the download
1124        restart: restart the device after the installation. This option is ignored if install=False
1125
1126        """
1127        sw_version = self.automatic_download_software(version)
1128        if sw_version.current:
1129            self.logger.info(f"Version {sw_version.version} is already installed")
1130            return sw_version
1131        if not install:
1132            return sw_version
1133        # We may get the following error:
1134        # "Error: Upgrading from 10.2.4-h10 to 11.1.2 requires a content version of 8761 or greater and found 8638-7689."
1135        # This should never happen, we decided to report the error and handle this manually
1136        self.logger.info(f"Launching install of version {sw_version.version}")
1137
1138        with self.suspended(suspend):
1139            job_id = self.install_software(sw_version.version)
1140            if not job_id:
1141                self.logger.error("Install has not started")
1142                raise Exception("Install has not started")
1143            job = self.wait_job_completion(job_id)
1144            self.logger.info(job.details)
1145
1146            # Do not restart if install failed
1147            if job.result != "OK":
1148                self.logger.error("Failed to install software version")
1149                return sw_version
1150
1151            if restart:
1152                self.logger.info("Restarting the device")
1153                restart_response = self.restart()
1154                self.logger.info(restart_response)
1155            return sw_version

Automatically download and install the requested software version. if the version is not provided, it defaults to the latest one.

NOTE: This does the software install and restart by default. If you only want to download, prefer to use automatic_download_software method, or set install=False. See the parameters for more information.

install: install the software after the download restart: restart the device after the installation. This option is ignored if install=False

@contextmanager
def suspended(self, suspend=True):
1157    @contextmanager
1158    def suspended(self, suspend=True):
1159        if not suspend:
1160            try:
1161                yield self
1162            finally:
1163                return
1164        self.logger.info("Suspending device")
1165        self.set_ha_status(active=False)
1166        try:
1167            yield self
1168        finally:
1169            self.check_availability()
1170            self.logger.info("Unsuspending device...")
1171            for _ in range(3):
1172                try:
1173                    self.set_ha_status(active=True)
1174                    break
1175                except Exception as e:
1176                    self.logger.error(e)
1177                    # time.sleep()
1178            else:
1179                raise UnsuspendError("Failed to unsuspend device")
1180            self.logger.info("Device successfully unsuspended")
def wait_availability(self):
1182    def wait_availability(self):
1183        logger = self.logger
1184        logger.info("Checking availability. Waiting for response...")
1185        max_duration = 1800
1186        for duration in wait(duration=max_duration):
1187            try:
1188                versions = self.get_versions()
1189                current = next(v for v in versions if v.current)
1190                if current is None:
1191                    logger.warning("Device is not not answering")
1192                else:
1193                    logger.info(f"Device responded after {duration} seconds")
1194                    return current
1195            except (
1196                urllib3.exceptions.MaxRetryError,
1197                requests.exceptions.ConnectionError,
1198            ):
1199                logger.warning("Firewall still not responding")
1200            except ServerError:
1201                raise Exception(
1202                    "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers."
1203                )
1204            except Exception as e:
1205                logger.debug(f"Unexpected error of type {type(e)} occured on firewall")
1206                logger.error(f"Firewall is still not responding: {e}")
1207        raise Exception(
1208            f"Timeout while waiting for availability of firewall. Waited for {max_duration} seconds"
1209        )
def check_availability(self):
1211    def check_availability(self):
1212        logger = self.logger
1213        ## Wait for the FW to respond
1214        version = self.wait_availability()
1215        if not version:
1216            logger.error("Device never responded")
1217            return False
1218        logger.info(f"Firewall is available on version {version.version}")
1219        return True