pa_api.xmlapi

1from . import types
2from .client import XMLApi
3from .exceptions import ServerError
4
5__all__ = [
6    "XMLApi",
7    "types",
8]
class XMLApi:
  25class XMLApi:
  26    def __init__(
  27        self,
  28        host=None,
  29        api_key=None,
  30        ispanorama=None,
  31        verify=False,
  32        timeout=None,
  33        logger=None,
  34    ):
  35        env_host, env_apikey = get_credentials_from_env()
  36        host = host or env_host
  37        api_key = api_key or env_apikey
  38        if not host:
  39            raise Exception("Missing Host")
  40        if not api_key:
  41            raise Exception("Missing API Key")
  42        host, _, _ = clean_url_host(host)
  43
  44        self._host = host
  45        self._api_key = api_key
  46        self._url = f"{host}/api"
  47        self._verify = verify
  48        self._timeout = timeout
  49        self._ispanorama = ispanorama
  50        self.logger = logger or logging
  51
  52    def _request(
  53        self,
  54        type,
  55        method="GET",
  56        vsys=None,
  57        params=None,
  58        remove_blank_text=True,
  59        verify=None,
  60        parse=True,
  61        stream=None,
  62        timeout=None,
  63    ):
  64        if verify is None:
  65            verify = self._verify
  66        if timeout is None:
  67            timeout = self._timeout
  68        headers = {"X-PAN-KEY": self._api_key}
  69        return raw_request(
  70            self._url,
  71            type,
  72            method,
  73            vsys=vsys,
  74            params=params,
  75            headers=headers,
  76            remove_blank_text=remove_blank_text,
  77            verify=verify,
  78            logger=self.logger,
  79            parse=parse,
  80            stream=stream,
  81            timeout=timeout,
  82        )
  83
  84    # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/export-files-api
  85    # https://knowledgebase.paloaltonetworks.com/KCSArticleDetail?id=kA10g000000ClaOCAS#:~:text=From%20the%20GUI%2C%20go%20to%20Device%20%3E%20Setup,%3E%20scp%20export%20configuration%20%5Btab%20for%20command%20help%5D
  86    def _export_request(
  87        self,
  88        category,
  89        method="GET",
  90        params=None,
  91        verify=None,
  92        stream=None,
  93        timeout=None,
  94    ):
  95        if params is None:
  96            params = {}
  97        params = {"category": category, **params}
  98        return self._request(
  99            "export",
 100            method=method,
 101            params=params,
 102            verify=verify,
 103            parse=False,
 104            stream=stream,
 105            timeout=timeout,
 106        ).content
 107
 108    def export_configuration(
 109        self,
 110        verify=None,
 111        timeout=None,
 112    ) -> Element:
 113        return self._export_request(
 114            category="configuration",
 115            verify=verify,
 116            timeout=timeout,
 117        )
 118
 119    def export_device_state(
 120        self,
 121        verify=None,
 122        timeout=None,
 123    ) -> Element:
 124        return self._export_request(
 125            category="device-state",
 126            verify=verify,
 127            timeout=timeout,
 128        )
 129
 130    def _conf_request(
 131        self,
 132        xpath,
 133        action="get",
 134        method="GET",
 135        vsys=None,
 136        params=None,
 137        remove_blank_text=True,
 138        verify=None,
 139        timeout=None,
 140    ) -> Element:
 141        if params is None:
 142            params = {}
 143        params = {"action": action, "xpath": xpath, **params}
 144        return self._request(
 145            "config",
 146            method=method,
 147            vsys=vsys,
 148            params=params,
 149            remove_blank_text=remove_blank_text,
 150            verify=verify,
 151            timeout=timeout,
 152        )
 153
 154    def _op_request(
 155        self,
 156        cmd,
 157        method="POST",
 158        vsys=None,
 159        params=None,
 160        remove_blank_text=True,
 161        verify=None,
 162        timeout=None,
 163    ) -> Element:
 164        if params is None:
 165            params = {}
 166        params = {"cmd": cmd, **params}
 167        return self._request(
 168            "op",
 169            method=method,
 170            vsys=vsys,
 171            params=params,
 172            remove_blank_text=remove_blank_text,
 173            verify=verify,
 174            timeout=timeout,
 175        )
 176
 177    def _commit_request(
 178        self,
 179        cmd,
 180        method="POST",
 181        params=None,
 182        remove_blank_text=True,
 183        verify=None,
 184        timeout=None,
 185    ):
 186        if params is None:
 187            params = {}
 188        params = {"cmd": cmd, **params}
 189        return self._request(
 190            "commit",
 191            method=method,
 192            params=params,
 193            remove_blank_text=remove_blank_text,
 194            verify=verify,
 195            timeout=timeout,
 196        )
 197
 198    # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/get-started-with-the-pan-os-xml-api/get-your-api-key
 199    def generate_apikey(self, username, password: str) -> str:
 200        """
 201        Generate a new API-Key for the user connected.
 202        """
 203        params = {"user": username, "password": password}
 204        return self._request(
 205            "keygen",
 206            method="POST",
 207            params=params,
 208        ).xpath(".//key/text()")[0]
 209
 210    # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/get-version-info-api
 211    def api_version(self):
 212        return el2dict(
 213            self._request(
 214                "version",
 215                method="POST",
 216            ).xpath(".//result")[0]
 217        )["result"]
 218
 219    def configuration(
 220        self,
 221        xpath,
 222        action="get",
 223        method="GET",
 224        params=None,
 225        remove_blank_text=True,
 226    ):
 227        return self._conf_request(
 228            xpath,
 229            action=action,
 230            method=method,
 231            params=params,
 232            remove_blank_text=remove_blank_text,
 233        )
 234
 235    def operation(
 236        self,
 237        cmd,
 238        method="POST",
 239        params=None,
 240        remove_blank_text=True,
 241    ):
 242        return self._op_request(
 243            cmd,
 244            method=method,
 245            params=params,
 246            remove_blank_text=remove_blank_text,
 247        )
 248
 249    def _check_is_panorama(self) -> bool:
 250        try:
 251            self.configuration("/config/panorama/vsys")
 252            return False
 253        except Exception:
 254            return True
 255
 256    @property
 257    def ispanorama(self):
 258        if self._ispanorama is None:
 259            self._ispanorama = self._check_is_panorama()
 260        return self._ispanorama
 261
 262    def get_tree(self, extended=False) -> Element:
 263        """
 264        Return the running configuration
 265        The differences with `running_config` are not known
 266        """
 267        tree = get_tree(
 268            self._host, self._api_key, verify=self._verify, logger=self.logger
 269        )
 270        if extended:
 271            self._extend_tree_information(tree)
 272        return tree
 273
 274    def _get_rule_use(self, device_group, position, rule_type, number: int = 200):
 275        results = []
 276        for i in range(100):
 277            cmd = _get_rule_use_cmd(
 278                device_group,
 279                position,
 280                rule_type,
 281                i * number,
 282                number,
 283            )
 284            res = self._op_request(cmd).xpath("result")[0]
 285            total_count = int(res.attrib["total-count"])
 286            results.extend(res.xpath("entry"))
 287            if len(results) >= total_count:
 288                break
 289        return results
 290
 291    def get_rule_use(self, tree=None, max_threads: Optional[int] = None):
 292        if tree is None:
 293            tree = self.get_tree()
 294        device_groups = tree.xpath("devices/*/device-group/*/@name")
 295        positions = ("pre", "post")
 296        # rule_types = tuple({x.tag for x in tree.xpath(
 297        # "devices/*/device-group/*"
 298        # "/*[self::post-rulebase or self::pre-rulebase]/*")})
 299        rule_types = ("security", "pbf", "nat", "application-override")
 300        args_list = list(product(device_groups, positions, rule_types))
 301
 302        def func(args):
 303            return self._get_rule_use(*args)
 304
 305        threads = len(args_list)
 306        threads = min(max_threads or threads, threads)
 307        with Pool(len(args_list)) as pool:
 308            data = pool.map(func, args_list)
 309        return [entry for entry_list in data for entry in entry_list]
 310
 311    def _get_rule_hit_count(self, device_group, rulebase, rule_type):
 312        cmd = (
 313            "<show><rule-hit-count><device-group>"
 314            f"<entry name='{device_group}'><{rulebase}><entry name='{rule_type}'>"
 315            f"<rules><all/></rules></entry></{rulebase}></entry>"
 316            "</device-group></rule-hit-count></show>"
 317        )
 318        res = self._op_request(cmd)
 319        entries = res.xpath(".//rules/entry") or []
 320        # return entries
 321        return [(device_group, rulebase, rule_type, e) for e in entries]
 322
 323    def get_rule_hit_count(self, tree=None, max_threads=None):
 324        if tree is None:
 325            tree = self.get_tree()
 326        device_groups = tree.xpath("devices/*/device-group/*/@name")
 327        rulebases = ("pre-rulebase", "post-rulebase")
 328        rule_types = ("security", "pbf", "nat", "application-override")
 329        args_list = list(product(device_groups, rulebases, rule_types))
 330
 331        def func(args):
 332            return self._get_rule_hit_count(*args)
 333
 334        threads = len(args_list)
 335        threads = min(max_threads or threads, threads)
 336        with Pool(len(args_list)) as pool:
 337            data = pool.map(func, args_list)
 338        return [entry for entry_list in data for entry in entry_list]
 339
 340    def _extend_tree_information(
 341        self,
 342        tree,
 343        extended=None,
 344        max_threads=None,
 345    ):
 346        """
 347        Incorporate usage statistics into the configuration.
 348        tree: the configuration as a XML object
 349        extended: rule-use data (if not provided, the function will retrieve them automatically)
 350        """
 351        if extended is None:
 352            extended = self.get_rule_use(tree, max_threads=max_threads)
 353        rules = tree.xpath(
 354            ".//device-group/entry/"
 355            "*[self::pre-rulebase or self::post-rulebase]/*/rules/entry[@uuid]",
 356        )
 357        ext_dict = {x.attrib.get("uuid"): x for x in extended}
 358        rules_dict = {x.attrib["uuid"]: x for x in rules}
 359        for ext, rule in map_dicts(ext_dict, rules_dict):
 360            extend_element(rule, ext)
 361            # NOTE: Do not use rule.extend(ext)
 362            # => This is causing duplicates entries
 363        return tree, extended
 364
 365    def get(self, xpath: str):
 366        """
 367        This will retrieve the xml definition based on the xpath
 368        The xpath doesn't need to be exact
 369        and can select multiple values at once.
 370        Still, it must at least speciy /config at is begining
 371        """
 372        return self._conf_request(xpath, action="show", method="GET")
 373
 374    def delete(self, xpath: str):
 375        """
 376        This will REMOVE the xml definition at the provided xpath.
 377        The xpath must be exact.
 378        """
 379        return self._conf_request(
 380            xpath,
 381            action="delete",
 382            method="DELETE",
 383        )
 384
 385    def create(self, xpath: str, xml_definition):
 386        """
 387        This will ADD the xml definition
 388        INSIDE the element at the provided xpath.
 389        The xpath must be exact.
 390        """
 391        # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration
 392        params = {"element": xml_definition}
 393        return self._conf_request(
 394            xpath,
 395            action="set",
 396            method="POST",
 397            params=params,
 398        )
 399
 400    def update(self, xpath: str, xml_definition):
 401        """
 402        This will REPLACE the xml definition
 403        INSTEAD of the element at the provided xpath
 404        The xpath must be exact.
 405        Nb: We can pull the whole config, update it locally,
 406        and push the final result
 407        """
 408        # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration
 409        params = {"element": xml_definition}
 410        return self._conf_request(
 411            xpath,
 412            action="edit",
 413            method="POST",
 414            params=params,
 415        )
 416
 417    def revert_changes(self, skip_validated: bool = False):
 418        """
 419        Revert all the changes made on Panorama.
 420        NOTE:
 421        - This only applies on non-commited changes.
 422        - This revert everything (not scoped by users)
 423
 424        skip_validated: Do not revert changes that were validated
 425        """
 426        skip = "<skip-validate>yes</skip-validate>" if skip_validated else ""
 427        cmd = f"<revert><config>{skip}</config></revert>"
 428        return self._op_request(cmd)
 429
 430    def validate_changes(self):
 431        """
 432        Validated all the changes currently made
 433        """
 434        cmd = "<validate><full></full></validate>"
 435        return self._op_request(cmd)
 436
 437    def _raw_get_push_scope(self, admin=None):
 438        """
 439        Gives detailed information about pending changes
 440        (e.g. xpath, owner, action, ...)
 441        """
 442        filter = f"<admin><member>{admin}</member></admin>" if admin else ""
 443        cmd = f"<show><config><push-scope>{filter}</push-scope></config></show>"
 444        return self._op_request(cmd)
 445
 446    def get_push_scope_devicegroups(self, admin=None):
 447        """
 448        Gives detailed information about pending changes
 449        (e.g. xpath, owner, action, ...)
 450        """
 451        scope = self._raw_get_push_scope(admin=admin)
 452        return list(set(scope.xpath(".//objects/entry[@loc-type='device-group']/@loc")))
 453
 454    def uncommited_changes(self):
 455        """
 456        Gives detailed information about pending changes
 457        (e.g. xpath, owner, action, ...)
 458        """
 459        cmd = "<show><config><list><changes></changes></list></config></show>"
 460        return self._op_request(cmd)
 461
 462    def uncommited_changes_summary(self, admin=None):
 463        """
 464        Only gives the concern device groups
 465        """
 466        admin = (
 467            f"<partial><admin><member>{admin}</member></admin></partial>"
 468            if admin
 469            else ""
 470        )
 471        cmd = f"<show><config><list><change-summary>{admin}</change-summary></list></config></show>"
 472        return self._op_request(cmd)
 473
 474    def pending_changes(self):
 475        """
 476        Result content is either 'yes' or 'no'
 477        """
 478        cmd = "<check><pending-changes></pending-changes></check>"
 479        return self._op_request(cmd)
 480
 481    def save_config(self, name):
 482        """
 483        Create a named snapshot of the current configuration
 484        """
 485        cmd = f"<save><config><to>{name}</to></config></save>"
 486        return "\n".join(self._op_request(cmd).xpath(".//result/text()"))
 487
 488    def save_device_state(self):
 489        """
 490        Create a snapshot of the current device state
 491        """
 492        cmd = "<save><device-state></device-state></save>"
 493        return "\n".join(self._op_request(cmd).xpath(".//result/text()"))
 494
 495    def get_named_configuration(self, name):
 496        """
 497        Get the configuration from a named snapshot as an XML object
 498        """
 499        cmd = f"<show><config><saved>{name}</saved></config></show>"
 500        return self._op_request(cmd, remove_blank_text=False).xpath("./result/config")[
 501            0
 502        ]
 503
 504    def candidate_config(self) -> Element:
 505        """
 506        Get the configuration to be commited as an XML object
 507        """
 508        cmd = "<show><config><candidate></candidate></config></show>"
 509        return self._op_request(cmd, remove_blank_text=False)
 510
 511    def running_config(self) -> Element:
 512        """
 513        Get the current running configuration as an XML object
 514        """
 515        cmd = "<show><config><running></running></config></show>"
 516        return self._op_request(cmd, remove_blank_text=False)
 517
 518    def _raw_get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> Element:
 519        """
 520        Get information of job(s) as an XML object.
 521        Retrieve all jobs by default.
 522
 523        If job_id is provided, then only retrieve the job requested.
 524        """
 525        filter = "<all></all>"
 526        if job_ids:
 527            if isinstance(job_ids, str):
 528                job_ids = [job_ids]
 529            filter = "".join(f"<id>{j}</id>" for j in job_ids)
 530        cmd = f"<show><jobs>{filter}</jobs></show>"
 531        return self._op_request(cmd)
 532
 533    def get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> List[types.Job]:
 534        """
 535        Get information of job(s)
 536        Retrieve all jobs by default.
 537
 538        If job_id is provided, then only retrieve the job requested.
 539        """
 540        job_xmls = self._raw_get_jobs(job_ids).xpath(".//job")
 541        transformed = (types.Job.from_xml(x) for x in job_xmls)
 542        return [j for j in transformed if j]
 543
 544    def get_job(self, job_id) -> types.Job:
 545        """
 546        Get information of job(s)
 547        Retrieve all jobs by default.
 548
 549        If job_id is provided, then only retrieve the job requested.
 550        """
 551        return self.get_jobs(job_id)[0]
 552
 553    def _raw_get_versions(self) -> Element:
 554        """
 555        Get the versions informations as a XML object.
 556        """
 557        cmd = "<request><system><software><check></check></software></system></request>"
 558        return self.operation(cmd)
 559
 560    def get_versions(self) -> List[types.SoftwareVersion]:
 561        """
 562        Get the versions informations
 563        """
 564        res = self._raw_get_versions()
 565        return [
 566            types.SoftwareVersion.from_xml(entry)
 567            for entry in res.xpath(".//sw-updates/versions/entry")
 568        ]
 569
 570    def wait_job_completion(self, job_id: str, waiter=None) -> types.Job:
 571        """
 572        Block until the job complete.
 573
 574        job_id: the job to wait upon
 575        waiter: a generator that yield when a new query must be done.
 576                see `wait` function (the default waiter) for an example
 577        """
 578        if not waiter:
 579            waiter = wait()
 580        for _ in waiter:
 581            job = self.get_job(job_id)
 582            if job.progress >= 100:
 583                return job
 584            self.logger.info(f"Job {job_id} progress: {job.progress}")
 585        raise Exception("Timeout while waiting for job completion")
 586
 587    def raw_get_pending_jobs(self):
 588        """
 589        Get all the jobs that are pending as a XML object
 590        """
 591        cmd = "<show><jobs><pending></pending></jobs></show>"
 592        return self._op_request(cmd)
 593
 594    def commit_changes(self, force: bool = False):
 595        """
 596        Commit all changes
 597        """
 598        cmd = "<commit>{}</commit>".format("<force></force>" if force else "")
 599        return self._commit_request(cmd)
 600
 601    def _lock_cmd(self, cmd, vsys, no_exception=False) -> bool:
 602        """
 603        Utility function for commands that tries to manipulate the lock
 604        on Panorama.
 605        """
 606        try:
 607            result = "".join(self._op_request(cmd, vsys=vsys).itertext())
 608            self.logger.debug(result)
 609        except Exception as e:
 610            if no_exception:
 611                self.logger.error(e)
 612                return False
 613            raise
 614        return True
 615
 616    # https://github.com/PaloAltoNetworks/pan-os-python/blob/a6b018e3864ff313fed36c3804394e2c92ca87b3/panos/base.py#L4459
 617    def add_config_lock(self, comment=None, vsys="shared", no_exception=False) -> bool:
 618        comment = f"<comment>{comment}</comment>" if comment else ""
 619        cmd = f"<request><config-lock><add>{comment}</add></config-lock></request>"
 620        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
 621
 622    def remove_config_lock(self, vsys="shared", no_exception=False) -> bool:
 623        cmd = "<request><config-lock><remove></remove></config-lock></request>"
 624        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
 625
 626    def add_commit_lock(self, comment=None, vsys="shared", no_exception=False) -> bool:
 627        comment = f"<comment>{comment}</comment>" if comment else ""
 628        cmd = f"<request><commit-lock><add>{comment}</add></commit-lock></request>"
 629        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
 630
 631    def remove_commit_lock(self, vsys="shared", no_exception=False) -> bool:
 632        cmd = "<request><commit-lock><remove></remove></commit-lock></request>"
 633        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
 634
 635    def set_ha_status(self, active: bool = True, target: Optional[str] = None):
 636        """
 637        Activate or Deactivate (suspend) the HA pair.
 638
 639        """
 640        status = "<functional></functional>" if active else "<suspend></suspend>"
 641        cmd = f"<request><high-availability><state>{status}</state></high-availability></request>"
 642        params = {"target": target} if target else None
 643        return self._op_request(cmd, params=params).xpath(".//result/text()")[0]
 644
 645    def set_ha_preemption(self, active=True, target=None):
 646        """
 647        NOT WORKING:
 648        There is currently no way to deactivate the preemption using the API.
 649        """
 650        raise Exception("set_ha_preemption not implementend")
 651
 652    def _raw_get_ha_info(self, state_only=False, target=None) -> Element:
 653        """
 654        Get the current state of a HA pair as a XML object.
 655        """
 656        filter = "<state></state>" if state_only else "<all></all>"
 657        cmd = f"<show><high-availability>{filter}</high-availability></show>"
 658        params = {"target": target} if target else None
 659        return self._op_request(cmd, params=params)
 660
 661    def get_ha_info(self, state_only=False, target=None) -> types.HAInfo:
 662        """
 663        Get the current state of a HA pair as a python object.
 664        """
 665        res = self._raw_get_ha_info(state_only=state_only, target=target)
 666        hainfo_xml = res.xpath(".//result")[0]
 667        # pprint(hainfo_xml)
 668        return types.HAInfo.from_xml(hainfo_xml)
 669
 670    def get_ha_pairs(
 671        self, connected=True
 672    ) -> Tuple[List[Tuple[types.Device, Optional[types.Device]]], List[types.Device]]:
 673        """
 674        Retrieve a tuple containing 2 values:
 675        1. The list of HA pairs and their members
 676        2. A list of devices that are not part of a HA pair
 677        """
 678        # Get all devices and index them using their serial number
 679        devices: List[types.Device] = self.get_devices(connected=connected)
 680        device_map = {d.serial: d for d in devices}
 681
 682        # Create the 2 lists by iterating over the devices
 683        done = set()
 684        ha_pairs = []
 685        without_ha = []
 686        for d in devices:
 687            # Do not manage twice the same device
 688            if d.serial in done:
 689                continue
 690            # The device does not have an HA peer
 691            if not d.ha_peer_serial:
 692                without_ha.append(d)
 693                done.add(d.serial)
 694                continue
 695            # Get the current device's HA peer
 696            # push them in the ha_pairs list
 697            # and mark both of them as done
 698            peer = device_map.get(d.ha_peer_serial)
 699            ha_pairs.append((d, peer))
 700            done.update((d.serial, d.ha_peer_serial))
 701        return ha_pairs, without_ha
 702
 703    def get_ha_pairs_map(self, connected=True):
 704        """
 705        Same as `get_ha_pairs`, but the ha_pairs are return as a map.
 706        This provides an easier and more readable lookup to find a pair:
 707
 708        mapping, _ = client.get_ha_pairs_map()
 709        serial = "12345"
 710        pair_of_serial = mapping[serial]
 711        """
 712        ha_pairs, without_ha = self.get_ha_pairs(connected=connected)
 713        map = {}
 714        for pair in ha_pairs:
 715            for device in pair:
 716                map[device.serial] = pair
 717        return map, without_ha
 718
 719    def get_panorama_status(self):
 720        """
 721        Get the current status of Panorama server.
 722        """
 723        cmd = "<show><panorama-status></panorama-status></show>"
 724        return self.operation(cmd).xpath(".//result")
 725
 726    def raw_get_local_panorama(self):
 727        return self.configuration(
 728            "/config/devices/entry/deviceconfig/system/panorama/local-panorama/panorama-server"
 729        )
 730
 731    def get_local_panorama_ip(self) -> Optional[str]:
 732        res = self.raw_get_local_panorama()
 733        return first(res.xpath("//panorama-server/text()"))
 734
 735    def _raw_get_devices(self, connected=False):
 736        """
 737        Return the list of device known from Panorama as a XML object.
 738        NOTE: This only works if the client is a Panorama server.
 739
 740        connected: only returns the devices that are connected
 741        """
 742        # This only works on Panorama, not the FW
 743        filter = "<connected></connected>" if connected else "<all></all>"
 744        cmd = f"<show><devices>{filter}</devices></show>"
 745        return self.operation(cmd)
 746
 747    def get_devices(self, connected=False) -> List[types.Device]:
 748        """
 749        Return the list of device known from Panorama as a python structure.
 750        NOTE: This only works if the client is a Panorama server.
 751
 752        connected: only returns the devices that are connected
 753        """
 754        res = self._raw_get_devices(connected=connected)
 755        entries = res.xpath(".//devices/entry")
 756        devices = (types.Device.from_xml(e) for e in entries)
 757        return [d for d in devices if d]
 758
 759    def _raw_get_dg_hierarchy(self):
 760        """
 761        Return the hierarchy of device groups as a XML object.
 762        """
 763        cmd = "<show><dg-hierarchy></dg-hierarchy></show>"
 764        return self.operation(cmd)
 765
 766    def get_plan_dg_hierarchy(self, recursive=False):
 767        """
 768        Return the hierarchy of device groups as a dict.
 769        The keys are the names of the device groups.
 770
 771        The values are the children device groups and depends on the recursive parameter.
 772        recursive: if False, the values are only the direct children of the device group.
 773            Otherwise, the values are all the descendant device groups.
 774        """
 775        devicegroups = {}  # name: children
 776        hierarchy = self._raw_get_dg_hierarchy().xpath(".//dg-hierarchy")[0]
 777        xpath = ".//dg" if recursive else "./dg"
 778        for dg in hierarchy.xpath(".//dg"):
 779            devicegroups[dg.attrib["name"]] = [
 780                x.attrib["name"] for x in dg.xpath(xpath)
 781            ]
 782        return devicegroups
 783
 784    def _raw_get_devicegroups(self):
 785        """
 786        Return the list of device groups as a XML object.
 787        """
 788        cmd = "<show><devicegroups></devicegroups></show>"
 789        return self.operation(cmd)
 790
 791    def get_devicegroups_name(
 792        self,
 793        parents=None,
 794        with_connected_devices=None,
 795    ):
 796        """
 797        This returns the names of the devicegroups:
 798        - parents: the returned list will only contain children of the provided parents (parents included)
 799        - with_devices: the returned list will only contain devicegroups that have direct devices under them
 800        """
 801        devicegroups = self._raw_get_devicegroups().xpath(".//devicegroups/entry")
 802        if with_connected_devices:
 803            names = [
 804                dg.attrib["name"]
 805                for dg in devicegroups
 806                if dg.xpath("./devices/entry/connected[text() = 'yes']")
 807            ]
 808        else:
 809            names = [dg.attrib["name"] for dg in devicegroups]
 810        if parents:
 811            hierarchy = self.get_plan_dg_hierarchy(recursive=True)
 812            tokeep = set(chain(*(hierarchy.get(p, []) for p in parents))) | set(parents)
 813            names = list(set(names) & tokeep)
 814        return names
 815
 816    def _raw_get_addresses(self):
 817        """
 818        Return the list of addresses known from Panorama as a XML object.
 819        NOTE: This only works if the client is a Firewall.
 820        """
 821        if self.ispanorama:
 822            return self.configuration(
 823                "/config/devices/entry/device-group/entry/address"
 824            )
 825        return self.configuration("/config/panorama/vsys//address")
 826
 827    def get_addresses(self) -> List[types.Address]:
 828        """
 829        Return the list of addresses known from Panorama as a python structure.
 830        NOTE: This only works if the client is a Firewall.
 831        """
 832        res = self._raw_get_addresses()
 833        addresses = res.xpath(".//address/entry")
 834        return [types.Address.from_xml(i) for i in addresses]
 835
 836    def _raw_get_routing_tables(self) -> Element:
 837        """
 838        Return the list of interfaces known from Panorama as a XML object.
 839        NOTE: This only works if the client is a Firewall.
 840        """
 841        return self.configuration(
 842            "/config/devices/entry/network/virtual-router/entry/routing-table"
 843        )
 844
 845    def get_routing_tables(self) -> List[types.RoutingTable]:
 846        """
 847        Return the list of interface known from Panorama as a python structure.
 848        NOTE: This only works if the client is a Firewall.
 849        """
 850        res = self._raw_get_routing_tables()
 851        routing_tables = res.xpath(".//routing-table")
 852        # print(len(routing_tables))
 853        # from pa_api.xmlapi.utils import pprint
 854        # for r in routing_tables:
 855        #     pprint(r)
 856        return [types.RoutingTable.from_xml(i) for i in routing_tables]
 857
 858    def _raw_get_interfaces(self) -> Element:
 859        """
 860        Return the list of interfaces known from Panorama as a XML object.
 861        NOTE: This only works if the client is a Firewall.
 862        """
 863        return self.configuration("/config/devices/entry/network/interface")
 864
 865    def get_interfaces(self) -> List[types.Interface]:
 866        """
 867        Return the list of interface known from Panorama as a python structure.
 868        NOTE: This only works if the client is a Firewall.
 869        """
 870        res = self._raw_get_interfaces()
 871        interfaces = res.xpath(".//interface")
 872        return [types.Interface.from_xml(i) for i in interfaces]
 873
 874    def _raw_get_zones(self) -> Element:
 875        """
 876        Return the list of zones known from Panorama as a XML object.
 877        NOTE: This only works if the client is a Firewall.
 878        """
 879        if self.ispanorama:
 880            return self.configuration(
 881                "/config/devices/entry/*/entry/config/devices/entry/vsys/entry/zone"
 882            )
 883        return self.configuration("/config/devices/entry/vsys/entry/zone")
 884
 885    def get_zones(self) -> Element:
 886        """
 887        Return the list of zones known from Panorama as a python structure.
 888        NOTE: This only works if the client is a Firewall.
 889        """
 890        res = self._raw_get_zones()
 891        zones = res.xpath(".//zone/entry")
 892        return [types.Zone.from_xml(i) for i in zones]
 893
 894    def _raw_get_templates(self, name=None) -> Element:
 895        """
 896        Return the synchronization status the templates per devices as a XML object.
 897        A device is in sync if it is up-to-date with the current version on Panorama.
 898        NOTE: This only works on Panorama.
 899        """
 900        # This only works on Panorama, not the FW
 901        filter = f"<name>{name}</name>" if name else ""
 902        cmd = f"<show><templates>{filter}</templates></show>"
 903        return self.operation(cmd)
 904
 905    def get_templates_sync_status(self):
 906        """
 907        Return the synchronization status the templates per devices
 908        A device is in sync if it is up-to-date with the current version on Panorama.
 909        NOTE: This only works on Panorama.
 910
 911        The result is a list of tuple of 3 values:
 912        1. the template's name
 913        2. the device's name
 914        3. the sync status
 915        """
 916        res = self._raw_get_templates()
 917        statuses = []
 918        for entry in res.xpath("./result/templates/entry"):
 919            template_name = entry.attrib["name"]
 920            for device in entry.xpath("./devices/entry"):
 921                device_name = device.attrib["name"]
 922                template_status = next(device.xpath("./template-status/text()"), None)
 923                status = (template_name, device_name, template_status)
 924                statuses.append(status)
 925        return statuses
 926
 927    def _raw_get_vpn_flows(self, name=None) -> List[Element]:
 928        """
 929        Returns the VPN flow information as a XML object.
 930        NOTE: This only works on Panorama server, not firewalls
 931        """
 932        # This only works on Panorama, not the FW"
 933        filter = f"<name>{name}</name>" if name else "<all></all>"
 934        cmd = f"<show><vpn><flow>{filter}</flow></vpn></show>"
 935        return self.operation(cmd)
 936
 937    def get_vpn_flows(self, name=None):
 938        """
 939        Returns the VPN flow information as a python structure.
 940        NOTE: This only works on Panorama server, not firewalls
 941        """
 942        entries = self._raw_get_vpn_flows(name=name).xpath(".//IPSec/entry")
 943        return [types.VPNFlow.from_xml(e) for e in entries]
 944
 945    def _raw_system_info(self):
 946        """
 947        Returns informations about the system as a XML object.
 948        """
 949        cmd = "<show><system><info></info></system></show>"
 950        return self.operation(cmd)
 951
 952    def system_info(self) -> types.operations.SystemInfo:
 953        """
 954        Returns informations about the system as a XML object.
 955        """
 956        xml = self._raw_system_info()
 957        info = xml.xpath("./result/system")[0]
 958        return types.operations.SystemInfo.from_xml(info)
 959
 960    def _raw_system_resources(self):
 961        """
 962        Get the system resouces as a XML object.
 963        NOTE: The string is the raw output of a `ps` command.
 964        """
 965        cmd = "<show><system><resources></resources></system></show>"
 966        return self.operation(cmd)
 967
 968    def system_resources(self):
 969        """
 970        Get the system resouces as a string.
 971        The string is the raw output of a `ps` command.
 972        """
 973        res = self._raw_system_resources()
 974        text = res.xpath(".//result/text()")[0]
 975        return text.split("\n\n")[0]
 976
 977    def _raw_download_software(self, version):
 978        """
 979        Download the software version on the device.
 980        version: the software version to download
 981        """
 982        cmd = f"<request><system><software><download><version>{version}</version></download></software></system></request>"
 983        return self.operation(cmd)
 984
 985    def download_software(self, version) -> Optional[str]:
 986        """
 987        Download the software version on the device.
 988        version: the software version to download
 989
 990        Returns the download job's ID in case of successful launch,
 991        None is returned otherwise.
 992        """
 993        res = self._raw_download_software(version)
 994        try:
 995            return res.xpath(".//job/text()")[0]
 996        except Exception:
 997            self.logger.debug("Download has not started")
 998        return None
 999
1000    def _raw_install_software(self, version):
1001        """
1002        Install the software version on the device.
1003        version: the software version to install
1004        """
1005        cmd = f"<request><system><software><install><version>{version}</version></install></software></system></request>"
1006        return self.operation(cmd)
1007
1008    def install_software(
1009        self, version: Union[None, str, types.SoftwareVersion]
1010    ) -> Optional[str]:
1011        """
1012        Install the software version on the device.
1013        version: the software version to install
1014
1015        Returns the download job's ID in case of successful launch,
1016        None is returned otherwise.
1017        """
1018        if isinstance(version, types.SoftwareVersion):
1019            version = version.version
1020        res = self._raw_install_software(version)
1021        try:
1022            return res.xpath(".//job/text()")[0]
1023        except Exception:
1024            self.logger.debug("Download has not started")
1025        return None
1026
1027    def _raw_restart(self):
1028        """
1029        Restart the device
1030        """
1031        cmd = "<request><restart><system></system></restart></request>"
1032        return self.operation(cmd)
1033
1034    def restart(self):
1035        """
1036        Restart the device
1037        """
1038        return "".join(self._raw_restart().xpath(".//result/text()"))
1039
1040    def automatic_download_software(
1041        self, version: Optional[str] = None
1042    ) -> types.SoftwareVersion:
1043        """
1044        Automatically download the requested software version.
1045        if the version is not provided, it defaults to the latest one.
1046
1047        NOTE: This does not do the installation.
1048        This is usefull to download in anticipation of the upgrade.
1049        For automatic install, see `automatic_software_upgrade`
1050        """
1051        version_str = version
1052        try:
1053            versions = self.get_versions()
1054        except ServerError:
1055            raise Exception(
1056                "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers."
1057            )
1058        sw_version = None
1059        if not version_str:
1060            sw_version = next((v for v in versions if v.latest), None)
1061        else:
1062            sw_version = next((v for v in versions if v.version == version_str), None)
1063        if not sw_version:
1064            self.logger.error(f"Version {version_str} not found")
1065            return exit(1)
1066
1067        # Already downloaded: Nothing to do
1068        if sw_version.downloaded:
1069            self.logger.info(f"Version {sw_version.version} already downloaded")
1070            return sw_version
1071
1072        # Download minor version first (required)
1073        base_version = next(
1074            v for v in versions if v.version == sw_version.base_minor_version
1075        )
1076        if not base_version.downloaded:
1077            self.logger.info(
1078                f"Launching download of minor version {base_version.version}"
1079            )
1080            job_id = self.download_software(base_version.version)
1081            if not job_id:
1082                raise Exception("Download has not started")
1083            job = self.wait_job_completion(job_id)
1084            if job.result != "OK":
1085                self.logger.debug(job)
1086                raise Exception(job.details)
1087            print(job.details)
1088
1089        # Actually download the wanted version
1090        self.logger.info(f"Launching download of version {sw_version.version}")
1091        job_id = self.download_software(sw_version.version)
1092        if not job_id:
1093            raise Exception("Download has not started")
1094        job = self.wait_job_completion(job_id)
1095        if job.result != "OK":
1096            self.logger.debug(job)
1097            raise Exception(job.details)
1098        self.logger.info(job.details)
1099        return sw_version
1100
1101    def automatic_software_upgrade(
1102        self,
1103        version: Optional[str] = None,
1104        install: bool = True,
1105        restart: bool = True,
1106        suspend: bool = False,
1107    ):
1108        """
1109        Automatically download and install the requested software version.
1110        if the version is not provided, it defaults to the latest one.
1111
1112        NOTE: This does the software install and restart by default.
1113        If you only want to download, prefer to use `automatic_download_software` method,
1114        or set install=False. See the parameters for more information.
1115
1116        install: install the software after the download
1117        restart: restart the device after the installation. This option is ignored if install=False
1118
1119        """
1120        sw_version = self.automatic_download_software(version)
1121        if sw_version.current:
1122            self.logger.info(f"Version {sw_version.version} is already installed")
1123            return sw_version
1124        if not install:
1125            return sw_version
1126        # We may get the following error:
1127        # "Error: Upgrading from 10.2.4-h10 to 11.1.2 requires a content version of 8761 or greater and found 8638-7689."
1128        # This should never happen, we decided to report the error and handle this manually
1129        self.logger.info(f"Launching install of version {sw_version.version}")
1130
1131        with self.suspended(suspend):
1132            job_id = self.install_software(sw_version.version)
1133            if not job_id:
1134                self.logger.error("Install has not started")
1135                raise Exception("Install has not started")
1136            job = self.wait_job_completion(job_id)
1137            self.logger.info(job.details)
1138
1139            # Do not restart if install failed
1140            if job.result != "OK":
1141                self.logger.error("Failed to install software version")
1142                return sw_version
1143
1144            if restart:
1145                self.logger.info("Restarting the device")
1146                restart_response = self.restart()
1147                self.logger.info(restart_response)
1148            return sw_version
1149
1150    @contextmanager
1151    def suspended(self, suspend=True):
1152        if not suspend:
1153            try:
1154                yield self
1155            finally:
1156                return
1157        self.logger.info("Suspending device")
1158        self.set_ha_status(active=False)
1159        try:
1160            yield self
1161        finally:
1162            self.check_availability()
1163            self.logger.info("Unsuspending device...")
1164            for _ in range(3):
1165                try:
1166                    self.set_ha_status(active=True)
1167                    break
1168                except Exception as e:
1169                    self.logger.error(e)
1170                    # time.sleep()
1171            else:
1172                raise UnsuspendError("Failed to unsuspend device")
1173            self.logger.info("Device successfully unsuspended")
1174
1175    def wait_availability(self):
1176        logger = self.logger
1177        logger.info("Checking availability. Waiting for response...")
1178        max_duration = 1800
1179        for duration in wait(duration=max_duration):
1180            try:
1181                versions = self.get_versions()
1182                current = next(v for v in versions if v.current)
1183                if current is None:
1184                    logger.warning("Device is not not answering")
1185                else:
1186                    logger.info(f"Device responded after {duration} seconds")
1187                    return current
1188            except (
1189                urllib3.exceptions.MaxRetryError,
1190                requests.exceptions.ConnectionError,
1191            ):
1192                logger.warning("Firewall still not responding")
1193            except ServerError:
1194                raise Exception(
1195                    "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers."
1196                )
1197            except Exception as e:
1198                logger.debug(f"Unexpected error of type {type(e)} occured on firewall")
1199                logger.error(f"Firewall is still not responding: {e}")
1200        raise Exception(
1201            f"Timeout while waiting for availability of firewall. Waited for {max_duration} seconds"
1202        )
1203
1204    def check_availability(self):
1205        logger = self.logger
1206        ## Wait for the FW to respond
1207        version = self.wait_availability()
1208        if not version:
1209            logger.error("Device never responded")
1210            return False
1211        logger.info(f"Firewall is available on version {version.version}")
1212        return True
XMLApi( host=None, api_key=None, ispanorama=None, verify=False, timeout=None, logger=None)
26    def __init__(
27        self,
28        host=None,
29        api_key=None,
30        ispanorama=None,
31        verify=False,
32        timeout=None,
33        logger=None,
34    ):
35        env_host, env_apikey = get_credentials_from_env()
36        host = host or env_host
37        api_key = api_key or env_apikey
38        if not host:
39            raise Exception("Missing Host")
40        if not api_key:
41            raise Exception("Missing API Key")
42        host, _, _ = clean_url_host(host)
43
44        self._host = host
45        self._api_key = api_key
46        self._url = f"{host}/api"
47        self._verify = verify
48        self._timeout = timeout
49        self._ispanorama = ispanorama
50        self.logger = logger or logging
logger
def export_configuration( self, verify=None, timeout=None) -> <cyfunction Element at 0x7f0db0c579f0>:
108    def export_configuration(
109        self,
110        verify=None,
111        timeout=None,
112    ) -> Element:
113        return self._export_request(
114            category="configuration",
115            verify=verify,
116            timeout=timeout,
117        )
def export_device_state( self, verify=None, timeout=None) -> <cyfunction Element at 0x7f0db0c579f0>:
119    def export_device_state(
120        self,
121        verify=None,
122        timeout=None,
123    ) -> Element:
124        return self._export_request(
125            category="device-state",
126            verify=verify,
127            timeout=timeout,
128        )
def generate_apikey(self, username, password: str) -> str:
199    def generate_apikey(self, username, password: str) -> str:
200        """
201        Generate a new API-Key for the user connected.
202        """
203        params = {"user": username, "password": password}
204        return self._request(
205            "keygen",
206            method="POST",
207            params=params,
208        ).xpath(".//key/text()")[0]

Generate a new API-Key for the user connected.

def api_version(self):
211    def api_version(self):
212        return el2dict(
213            self._request(
214                "version",
215                method="POST",
216            ).xpath(".//result")[0]
217        )["result"]
def configuration( self, xpath, action='get', method='GET', params=None, remove_blank_text=True):
219    def configuration(
220        self,
221        xpath,
222        action="get",
223        method="GET",
224        params=None,
225        remove_blank_text=True,
226    ):
227        return self._conf_request(
228            xpath,
229            action=action,
230            method=method,
231            params=params,
232            remove_blank_text=remove_blank_text,
233        )
def operation(self, cmd, method='POST', params=None, remove_blank_text=True):
235    def operation(
236        self,
237        cmd,
238        method="POST",
239        params=None,
240        remove_blank_text=True,
241    ):
242        return self._op_request(
243            cmd,
244            method=method,
245            params=params,
246            remove_blank_text=remove_blank_text,
247        )
ispanorama
256    @property
257    def ispanorama(self):
258        if self._ispanorama is None:
259            self._ispanorama = self._check_is_panorama()
260        return self._ispanorama
def get_tree(self, extended=False) -> <cyfunction Element at 0x7f0db0c579f0>:
262    def get_tree(self, extended=False) -> Element:
263        """
264        Return the running configuration
265        The differences with `running_config` are not known
266        """
267        tree = get_tree(
268            self._host, self._api_key, verify=self._verify, logger=self.logger
269        )
270        if extended:
271            self._extend_tree_information(tree)
272        return tree

Return the running configuration The differences with running_config are not known

def get_rule_use(self, tree=None, max_threads: Optional[int] = None):
291    def get_rule_use(self, tree=None, max_threads: Optional[int] = None):
292        if tree is None:
293            tree = self.get_tree()
294        device_groups = tree.xpath("devices/*/device-group/*/@name")
295        positions = ("pre", "post")
296        # rule_types = tuple({x.tag for x in tree.xpath(
297        # "devices/*/device-group/*"
298        # "/*[self::post-rulebase or self::pre-rulebase]/*")})
299        rule_types = ("security", "pbf", "nat", "application-override")
300        args_list = list(product(device_groups, positions, rule_types))
301
302        def func(args):
303            return self._get_rule_use(*args)
304
305        threads = len(args_list)
306        threads = min(max_threads or threads, threads)
307        with Pool(len(args_list)) as pool:
308            data = pool.map(func, args_list)
309        return [entry for entry_list in data for entry in entry_list]
def get_rule_hit_count(self, tree=None, max_threads=None):
323    def get_rule_hit_count(self, tree=None, max_threads=None):
324        if tree is None:
325            tree = self.get_tree()
326        device_groups = tree.xpath("devices/*/device-group/*/@name")
327        rulebases = ("pre-rulebase", "post-rulebase")
328        rule_types = ("security", "pbf", "nat", "application-override")
329        args_list = list(product(device_groups, rulebases, rule_types))
330
331        def func(args):
332            return self._get_rule_hit_count(*args)
333
334        threads = len(args_list)
335        threads = min(max_threads or threads, threads)
336        with Pool(len(args_list)) as pool:
337            data = pool.map(func, args_list)
338        return [entry for entry_list in data for entry in entry_list]
def get(self, xpath: str):
365    def get(self, xpath: str):
366        """
367        This will retrieve the xml definition based on the xpath
368        The xpath doesn't need to be exact
369        and can select multiple values at once.
370        Still, it must at least speciy /config at is begining
371        """
372        return self._conf_request(xpath, action="show", method="GET")

This will retrieve the xml definition based on the xpath The xpath doesn't need to be exact and can select multiple values at once. Still, it must at least speciy /config at is begining

def delete(self, xpath: str):
374    def delete(self, xpath: str):
375        """
376        This will REMOVE the xml definition at the provided xpath.
377        The xpath must be exact.
378        """
379        return self._conf_request(
380            xpath,
381            action="delete",
382            method="DELETE",
383        )

This will REMOVE the xml definition at the provided xpath. The xpath must be exact.

def create(self, xpath: str, xml_definition):
385    def create(self, xpath: str, xml_definition):
386        """
387        This will ADD the xml definition
388        INSIDE the element at the provided xpath.
389        The xpath must be exact.
390        """
391        # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration
392        params = {"element": xml_definition}
393        return self._conf_request(
394            xpath,
395            action="set",
396            method="POST",
397            params=params,
398        )

This will ADD the xml definition INSIDE the element at the provided xpath. The xpath must be exact.

def update(self, xpath: str, xml_definition):
400    def update(self, xpath: str, xml_definition):
401        """
402        This will REPLACE the xml definition
403        INSTEAD of the element at the provided xpath
404        The xpath must be exact.
405        Nb: We can pull the whole config, update it locally,
406        and push the final result
407        """
408        # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration
409        params = {"element": xml_definition}
410        return self._conf_request(
411            xpath,
412            action="edit",
413            method="POST",
414            params=params,
415        )

This will REPLACE the xml definition INSTEAD of the element at the provided xpath The xpath must be exact. Nb: We can pull the whole config, update it locally, and push the final result

def revert_changes(self, skip_validated: bool = False):
417    def revert_changes(self, skip_validated: bool = False):
418        """
419        Revert all the changes made on Panorama.
420        NOTE:
421        - This only applies on non-commited changes.
422        - This revert everything (not scoped by users)
423
424        skip_validated: Do not revert changes that were validated
425        """
426        skip = "<skip-validate>yes</skip-validate>" if skip_validated else ""
427        cmd = f"<revert><config>{skip}</config></revert>"
428        return self._op_request(cmd)

Revert all the changes made on Panorama. NOTE:

  • This only applies on non-commited changes.
  • This revert everything (not scoped by users)

skip_validated: Do not revert changes that were validated

def validate_changes(self):
430    def validate_changes(self):
431        """
432        Validated all the changes currently made
433        """
434        cmd = "<validate><full></full></validate>"
435        return self._op_request(cmd)

Validated all the changes currently made

def get_push_scope_devicegroups(self, admin=None):
446    def get_push_scope_devicegroups(self, admin=None):
447        """
448        Gives detailed information about pending changes
449        (e.g. xpath, owner, action, ...)
450        """
451        scope = self._raw_get_push_scope(admin=admin)
452        return list(set(scope.xpath(".//objects/entry[@loc-type='device-group']/@loc")))

Gives detailed information about pending changes (e.g. xpath, owner, action, ...)

def uncommited_changes(self):
454    def uncommited_changes(self):
455        """
456        Gives detailed information about pending changes
457        (e.g. xpath, owner, action, ...)
458        """
459        cmd = "<show><config><list><changes></changes></list></config></show>"
460        return self._op_request(cmd)

Gives detailed information about pending changes (e.g. xpath, owner, action, ...)

def uncommited_changes_summary(self, admin=None):
462    def uncommited_changes_summary(self, admin=None):
463        """
464        Only gives the concern device groups
465        """
466        admin = (
467            f"<partial><admin><member>{admin}</member></admin></partial>"
468            if admin
469            else ""
470        )
471        cmd = f"<show><config><list><change-summary>{admin}</change-summary></list></config></show>"
472        return self._op_request(cmd)

Only gives the concern device groups

def pending_changes(self):
474    def pending_changes(self):
475        """
476        Result content is either 'yes' or 'no'
477        """
478        cmd = "<check><pending-changes></pending-changes></check>"
479        return self._op_request(cmd)

Result content is either 'yes' or 'no'

def save_config(self, name):
481    def save_config(self, name):
482        """
483        Create a named snapshot of the current configuration
484        """
485        cmd = f"<save><config><to>{name}</to></config></save>"
486        return "\n".join(self._op_request(cmd).xpath(".//result/text()"))

Create a named snapshot of the current configuration

def save_device_state(self):
488    def save_device_state(self):
489        """
490        Create a snapshot of the current device state
491        """
492        cmd = "<save><device-state></device-state></save>"
493        return "\n".join(self._op_request(cmd).xpath(".//result/text()"))

Create a snapshot of the current device state

def get_named_configuration(self, name):
495    def get_named_configuration(self, name):
496        """
497        Get the configuration from a named snapshot as an XML object
498        """
499        cmd = f"<show><config><saved>{name}</saved></config></show>"
500        return self._op_request(cmd, remove_blank_text=False).xpath("./result/config")[
501            0
502        ]

Get the configuration from a named snapshot as an XML object

def candidate_config(self) -> <cyfunction Element at 0x7f0db0c579f0>:
504    def candidate_config(self) -> Element:
505        """
506        Get the configuration to be commited as an XML object
507        """
508        cmd = "<show><config><candidate></candidate></config></show>"
509        return self._op_request(cmd, remove_blank_text=False)

Get the configuration to be commited as an XML object

def running_config(self) -> <cyfunction Element at 0x7f0db0c579f0>:
511    def running_config(self) -> Element:
512        """
513        Get the current running configuration as an XML object
514        """
515        cmd = "<show><config><running></running></config></show>"
516        return self._op_request(cmd, remove_blank_text=False)

Get the current running configuration as an XML object

def get_jobs( self, job_ids: Union[NoneType, str, List[str]] = None) -> List[pa_api.xmlapi.types.operations.job.Job]:
533    def get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> List[types.Job]:
534        """
535        Get information of job(s)
536        Retrieve all jobs by default.
537
538        If job_id is provided, then only retrieve the job requested.
539        """
540        job_xmls = self._raw_get_jobs(job_ids).xpath(".//job")
541        transformed = (types.Job.from_xml(x) for x in job_xmls)
542        return [j for j in transformed if j]

Get information of job(s) Retrieve all jobs by default.

If job_id is provided, then only retrieve the job requested.

def get_job(self, job_id) -> pa_api.xmlapi.types.operations.job.Job:
544    def get_job(self, job_id) -> types.Job:
545        """
546        Get information of job(s)
547        Retrieve all jobs by default.
548
549        If job_id is provided, then only retrieve the job requested.
550        """
551        return self.get_jobs(job_id)[0]

Get information of job(s) Retrieve all jobs by default.

If job_id is provided, then only retrieve the job requested.

def get_versions(self) -> List[pa_api.xmlapi.types.operations.software.SoftwareVersion]:
560    def get_versions(self) -> List[types.SoftwareVersion]:
561        """
562        Get the versions informations
563        """
564        res = self._raw_get_versions()
565        return [
566            types.SoftwareVersion.from_xml(entry)
567            for entry in res.xpath(".//sw-updates/versions/entry")
568        ]

Get the versions informations

def wait_job_completion(self, job_id: str, waiter=None) -> pa_api.xmlapi.types.operations.job.Job:
570    def wait_job_completion(self, job_id: str, waiter=None) -> types.Job:
571        """
572        Block until the job complete.
573
574        job_id: the job to wait upon
575        waiter: a generator that yield when a new query must be done.
576                see `wait` function (the default waiter) for an example
577        """
578        if not waiter:
579            waiter = wait()
580        for _ in waiter:
581            job = self.get_job(job_id)
582            if job.progress >= 100:
583                return job
584            self.logger.info(f"Job {job_id} progress: {job.progress}")
585        raise Exception("Timeout while waiting for job completion")

Block until the job complete.

job_id: the job to wait upon waiter: a generator that yield when a new query must be done. see wait function (the default waiter) for an example

def raw_get_pending_jobs(self):
587    def raw_get_pending_jobs(self):
588        """
589        Get all the jobs that are pending as a XML object
590        """
591        cmd = "<show><jobs><pending></pending></jobs></show>"
592        return self._op_request(cmd)

Get all the jobs that are pending as a XML object

def commit_changes(self, force: bool = False):
594    def commit_changes(self, force: bool = False):
595        """
596        Commit all changes
597        """
598        cmd = "<commit>{}</commit>".format("<force></force>" if force else "")
599        return self._commit_request(cmd)

Commit all changes

def add_config_lock(self, comment=None, vsys='shared', no_exception=False) -> bool:
617    def add_config_lock(self, comment=None, vsys="shared", no_exception=False) -> bool:
618        comment = f"<comment>{comment}</comment>" if comment else ""
619        cmd = f"<request><config-lock><add>{comment}</add></config-lock></request>"
620        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
def remove_config_lock(self, vsys='shared', no_exception=False) -> bool:
622    def remove_config_lock(self, vsys="shared", no_exception=False) -> bool:
623        cmd = "<request><config-lock><remove></remove></config-lock></request>"
624        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
def add_commit_lock(self, comment=None, vsys='shared', no_exception=False) -> bool:
626    def add_commit_lock(self, comment=None, vsys="shared", no_exception=False) -> bool:
627        comment = f"<comment>{comment}</comment>" if comment else ""
628        cmd = f"<request><commit-lock><add>{comment}</add></commit-lock></request>"
629        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
def remove_commit_lock(self, vsys='shared', no_exception=False) -> bool:
631    def remove_commit_lock(self, vsys="shared", no_exception=False) -> bool:
632        cmd = "<request><commit-lock><remove></remove></commit-lock></request>"
633        return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
def set_ha_status(self, active: bool = True, target: Optional[str] = None):
635    def set_ha_status(self, active: bool = True, target: Optional[str] = None):
636        """
637        Activate or Deactivate (suspend) the HA pair.
638
639        """
640        status = "<functional></functional>" if active else "<suspend></suspend>"
641        cmd = f"<request><high-availability><state>{status}</state></high-availability></request>"
642        params = {"target": target} if target else None
643        return self._op_request(cmd, params=params).xpath(".//result/text()")[0]

Activate or Deactivate (suspend) the HA pair.

def set_ha_preemption(self, active=True, target=None):
645    def set_ha_preemption(self, active=True, target=None):
646        """
647        NOT WORKING:
648        There is currently no way to deactivate the preemption using the API.
649        """
650        raise Exception("set_ha_preemption not implementend")

NOT WORKING: There is currently no way to deactivate the preemption using the API.

def get_ha_info( self, state_only=False, target=None) -> pa_api.xmlapi.types.operations.ha.HAInfo:
661    def get_ha_info(self, state_only=False, target=None) -> types.HAInfo:
662        """
663        Get the current state of a HA pair as a python object.
664        """
665        res = self._raw_get_ha_info(state_only=state_only, target=target)
666        hainfo_xml = res.xpath(".//result")[0]
667        # pprint(hainfo_xml)
668        return types.HAInfo.from_xml(hainfo_xml)

Get the current state of a HA pair as a python object.

def get_ha_pairs( self, connected=True) -> Tuple[List[Tuple[pa_api.xmlapi.types.operations.device.Device, Optional[pa_api.xmlapi.types.operations.device.Device]]], List[pa_api.xmlapi.types.operations.device.Device]]:
670    def get_ha_pairs(
671        self, connected=True
672    ) -> Tuple[List[Tuple[types.Device, Optional[types.Device]]], List[types.Device]]:
673        """
674        Retrieve a tuple containing 2 values:
675        1. The list of HA pairs and their members
676        2. A list of devices that are not part of a HA pair
677        """
678        # Get all devices and index them using their serial number
679        devices: List[types.Device] = self.get_devices(connected=connected)
680        device_map = {d.serial: d for d in devices}
681
682        # Create the 2 lists by iterating over the devices
683        done = set()
684        ha_pairs = []
685        without_ha = []
686        for d in devices:
687            # Do not manage twice the same device
688            if d.serial in done:
689                continue
690            # The device does not have an HA peer
691            if not d.ha_peer_serial:
692                without_ha.append(d)
693                done.add(d.serial)
694                continue
695            # Get the current device's HA peer
696            # push them in the ha_pairs list
697            # and mark both of them as done
698            peer = device_map.get(d.ha_peer_serial)
699            ha_pairs.append((d, peer))
700            done.update((d.serial, d.ha_peer_serial))
701        return ha_pairs, without_ha

Retrieve a tuple containing 2 values:

  1. The list of HA pairs and their members
  2. A list of devices that are not part of a HA pair
def get_ha_pairs_map(self, connected=True):
703    def get_ha_pairs_map(self, connected=True):
704        """
705        Same as `get_ha_pairs`, but the ha_pairs are return as a map.
706        This provides an easier and more readable lookup to find a pair:
707
708        mapping, _ = client.get_ha_pairs_map()
709        serial = "12345"
710        pair_of_serial = mapping[serial]
711        """
712        ha_pairs, without_ha = self.get_ha_pairs(connected=connected)
713        map = {}
714        for pair in ha_pairs:
715            for device in pair:
716                map[device.serial] = pair
717        return map, without_ha

Same as get_ha_pairs, but the ha_pairs are return as a map. This provides an easier and more readable lookup to find a pair:

mapping, _ = client.get_ha_pairs_map() serial = "12345" pair_of_serial = mapping[serial]

def get_panorama_status(self):
719    def get_panorama_status(self):
720        """
721        Get the current status of Panorama server.
722        """
723        cmd = "<show><panorama-status></panorama-status></show>"
724        return self.operation(cmd).xpath(".//result")

Get the current status of Panorama server.

def raw_get_local_panorama(self):
726    def raw_get_local_panorama(self):
727        return self.configuration(
728            "/config/devices/entry/deviceconfig/system/panorama/local-panorama/panorama-server"
729        )
def get_local_panorama_ip(self) -> Optional[str]:
731    def get_local_panorama_ip(self) -> Optional[str]:
732        res = self.raw_get_local_panorama()
733        return first(res.xpath("//panorama-server/text()"))
def get_devices( self, connected=False) -> List[pa_api.xmlapi.types.operations.device.Device]:
747    def get_devices(self, connected=False) -> List[types.Device]:
748        """
749        Return the list of device known from Panorama as a python structure.
750        NOTE: This only works if the client is a Panorama server.
751
752        connected: only returns the devices that are connected
753        """
754        res = self._raw_get_devices(connected=connected)
755        entries = res.xpath(".//devices/entry")
756        devices = (types.Device.from_xml(e) for e in entries)
757        return [d for d in devices if d]

Return the list of device known from Panorama as a python structure. NOTE: This only works if the client is a Panorama server.

connected: only returns the devices that are connected

def get_plan_dg_hierarchy(self, recursive=False):
766    def get_plan_dg_hierarchy(self, recursive=False):
767        """
768        Return the hierarchy of device groups as a dict.
769        The keys are the names of the device groups.
770
771        The values are the children device groups and depends on the recursive parameter.
772        recursive: if False, the values are only the direct children of the device group.
773            Otherwise, the values are all the descendant device groups.
774        """
775        devicegroups = {}  # name: children
776        hierarchy = self._raw_get_dg_hierarchy().xpath(".//dg-hierarchy")[0]
777        xpath = ".//dg" if recursive else "./dg"
778        for dg in hierarchy.xpath(".//dg"):
779            devicegroups[dg.attrib["name"]] = [
780                x.attrib["name"] for x in dg.xpath(xpath)
781            ]
782        return devicegroups

Return the hierarchy of device groups as a dict. The keys are the names of the device groups.

The values are the children device groups and depends on the recursive parameter. recursive: if False, the values are only the direct children of the device group. Otherwise, the values are all the descendant device groups.

def get_devicegroups_name(self, parents=None, with_connected_devices=None):
791    def get_devicegroups_name(
792        self,
793        parents=None,
794        with_connected_devices=None,
795    ):
796        """
797        This returns the names of the devicegroups:
798        - parents: the returned list will only contain children of the provided parents (parents included)
799        - with_devices: the returned list will only contain devicegroups that have direct devices under them
800        """
801        devicegroups = self._raw_get_devicegroups().xpath(".//devicegroups/entry")
802        if with_connected_devices:
803            names = [
804                dg.attrib["name"]
805                for dg in devicegroups
806                if dg.xpath("./devices/entry/connected[text() = 'yes']")
807            ]
808        else:
809            names = [dg.attrib["name"] for dg in devicegroups]
810        if parents:
811            hierarchy = self.get_plan_dg_hierarchy(recursive=True)
812            tokeep = set(chain(*(hierarchy.get(p, []) for p in parents))) | set(parents)
813            names = list(set(names) & tokeep)
814        return names

This returns the names of the devicegroups:

  • parents: the returned list will only contain children of the provided parents (parents included)
  • with_devices: the returned list will only contain devicegroups that have direct devices under them
def get_addresses(self) -> List[pa_api.xmlapi.types.config.address.Address]:
827    def get_addresses(self) -> List[types.Address]:
828        """
829        Return the list of addresses known from Panorama as a python structure.
830        NOTE: This only works if the client is a Firewall.
831        """
832        res = self._raw_get_addresses()
833        addresses = res.xpath(".//address/entry")
834        return [types.Address.from_xml(i) for i in addresses]

Return the list of addresses known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.

def get_routing_tables( self) -> List[pa_api.xmlapi.types.config.routing.routing_table.RoutingTable]:
845    def get_routing_tables(self) -> List[types.RoutingTable]:
846        """
847        Return the list of interface known from Panorama as a python structure.
848        NOTE: This only works if the client is a Firewall.
849        """
850        res = self._raw_get_routing_tables()
851        routing_tables = res.xpath(".//routing-table")
852        # print(len(routing_tables))
853        # from pa_api.xmlapi.utils import pprint
854        # for r in routing_tables:
855        #     pprint(r)
856        return [types.RoutingTable.from_xml(i) for i in routing_tables]

Return the list of interface known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.

def get_interfaces(self) -> List[pa_api.xmlapi.types.config.interface.Interface]:
865    def get_interfaces(self) -> List[types.Interface]:
866        """
867        Return the list of interface known from Panorama as a python structure.
868        NOTE: This only works if the client is a Firewall.
869        """
870        res = self._raw_get_interfaces()
871        interfaces = res.xpath(".//interface")
872        return [types.Interface.from_xml(i) for i in interfaces]

Return the list of interface known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.

def get_zones(self) -> <cyfunction Element at 0x7f0db0c579f0>:
885    def get_zones(self) -> Element:
886        """
887        Return the list of zones known from Panorama as a python structure.
888        NOTE: This only works if the client is a Firewall.
889        """
890        res = self._raw_get_zones()
891        zones = res.xpath(".//zone/entry")
892        return [types.Zone.from_xml(i) for i in zones]

Return the list of zones known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.

def get_templates_sync_status(self):
905    def get_templates_sync_status(self):
906        """
907        Return the synchronization status the templates per devices
908        A device is in sync if it is up-to-date with the current version on Panorama.
909        NOTE: This only works on Panorama.
910
911        The result is a list of tuple of 3 values:
912        1. the template's name
913        2. the device's name
914        3. the sync status
915        """
916        res = self._raw_get_templates()
917        statuses = []
918        for entry in res.xpath("./result/templates/entry"):
919            template_name = entry.attrib["name"]
920            for device in entry.xpath("./devices/entry"):
921                device_name = device.attrib["name"]
922                template_status = next(device.xpath("./template-status/text()"), None)
923                status = (template_name, device_name, template_status)
924                statuses.append(status)
925        return statuses

Return the synchronization status the templates per devices A device is in sync if it is up-to-date with the current version on Panorama. NOTE: This only works on Panorama.

The result is a list of tuple of 3 values:

  1. the template's name
  2. the device's name
  3. the sync status
def get_vpn_flows(self, name=None):
937    def get_vpn_flows(self, name=None):
938        """
939        Returns the VPN flow information as a python structure.
940        NOTE: This only works on Panorama server, not firewalls
941        """
942        entries = self._raw_get_vpn_flows(name=name).xpath(".//IPSec/entry")
943        return [types.VPNFlow.from_xml(e) for e in entries]

Returns the VPN flow information as a python structure. NOTE: This only works on Panorama server, not firewalls

def system_info(self) -> pa_api.xmlapi.types.operations.system.SystemInfo:
952    def system_info(self) -> types.operations.SystemInfo:
953        """
954        Returns informations about the system as a XML object.
955        """
956        xml = self._raw_system_info()
957        info = xml.xpath("./result/system")[0]
958        return types.operations.SystemInfo.from_xml(info)

Returns informations about the system as a XML object.

def system_resources(self):
968    def system_resources(self):
969        """
970        Get the system resouces as a string.
971        The string is the raw output of a `ps` command.
972        """
973        res = self._raw_system_resources()
974        text = res.xpath(".//result/text()")[0]
975        return text.split("\n\n")[0]

Get the system resouces as a string. The string is the raw output of a ps command.

def download_software(self, version) -> Optional[str]:
985    def download_software(self, version) -> Optional[str]:
986        """
987        Download the software version on the device.
988        version: the software version to download
989
990        Returns the download job's ID in case of successful launch,
991        None is returned otherwise.
992        """
993        res = self._raw_download_software(version)
994        try:
995            return res.xpath(".//job/text()")[0]
996        except Exception:
997            self.logger.debug("Download has not started")
998        return None

Download the software version on the device. version: the software version to download

Returns the download job's ID in case of successful launch, None is returned otherwise.

def install_software( self, version: Union[NoneType, str, pa_api.xmlapi.types.operations.software.SoftwareVersion]) -> Optional[str]:
1008    def install_software(
1009        self, version: Union[None, str, types.SoftwareVersion]
1010    ) -> Optional[str]:
1011        """
1012        Install the software version on the device.
1013        version: the software version to install
1014
1015        Returns the download job's ID in case of successful launch,
1016        None is returned otherwise.
1017        """
1018        if isinstance(version, types.SoftwareVersion):
1019            version = version.version
1020        res = self._raw_install_software(version)
1021        try:
1022            return res.xpath(".//job/text()")[0]
1023        except Exception:
1024            self.logger.debug("Download has not started")
1025        return None

Install the software version on the device. version: the software version to install

Returns the download job's ID in case of successful launch, None is returned otherwise.

def restart(self):
1034    def restart(self):
1035        """
1036        Restart the device
1037        """
1038        return "".join(self._raw_restart().xpath(".//result/text()"))

Restart the device

def automatic_download_software( self, version: Optional[str] = None) -> pa_api.xmlapi.types.operations.software.SoftwareVersion:
1040    def automatic_download_software(
1041        self, version: Optional[str] = None
1042    ) -> types.SoftwareVersion:
1043        """
1044        Automatically download the requested software version.
1045        if the version is not provided, it defaults to the latest one.
1046
1047        NOTE: This does not do the installation.
1048        This is usefull to download in anticipation of the upgrade.
1049        For automatic install, see `automatic_software_upgrade`
1050        """
1051        version_str = version
1052        try:
1053            versions = self.get_versions()
1054        except ServerError:
1055            raise Exception(
1056                "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers."
1057            )
1058        sw_version = None
1059        if not version_str:
1060            sw_version = next((v for v in versions if v.latest), None)
1061        else:
1062            sw_version = next((v for v in versions if v.version == version_str), None)
1063        if not sw_version:
1064            self.logger.error(f"Version {version_str} not found")
1065            return exit(1)
1066
1067        # Already downloaded: Nothing to do
1068        if sw_version.downloaded:
1069            self.logger.info(f"Version {sw_version.version} already downloaded")
1070            return sw_version
1071
1072        # Download minor version first (required)
1073        base_version = next(
1074            v for v in versions if v.version == sw_version.base_minor_version
1075        )
1076        if not base_version.downloaded:
1077            self.logger.info(
1078                f"Launching download of minor version {base_version.version}"
1079            )
1080            job_id = self.download_software(base_version.version)
1081            if not job_id:
1082                raise Exception("Download has not started")
1083            job = self.wait_job_completion(job_id)
1084            if job.result != "OK":
1085                self.logger.debug(job)
1086                raise Exception(job.details)
1087            print(job.details)
1088
1089        # Actually download the wanted version
1090        self.logger.info(f"Launching download of version {sw_version.version}")
1091        job_id = self.download_software(sw_version.version)
1092        if not job_id:
1093            raise Exception("Download has not started")
1094        job = self.wait_job_completion(job_id)
1095        if job.result != "OK":
1096            self.logger.debug(job)
1097            raise Exception(job.details)
1098        self.logger.info(job.details)
1099        return sw_version

Automatically download the requested software version. if the version is not provided, it defaults to the latest one.

NOTE: This does not do the installation. This is usefull to download in anticipation of the upgrade. For automatic install, see automatic_software_upgrade

def automatic_software_upgrade( self, version: Optional[str] = None, install: bool = True, restart: bool = True, suspend: bool = False):
1101    def automatic_software_upgrade(
1102        self,
1103        version: Optional[str] = None,
1104        install: bool = True,
1105        restart: bool = True,
1106        suspend: bool = False,
1107    ):
1108        """
1109        Automatically download and install the requested software version.
1110        if the version is not provided, it defaults to the latest one.
1111
1112        NOTE: This does the software install and restart by default.
1113        If you only want to download, prefer to use `automatic_download_software` method,
1114        or set install=False. See the parameters for more information.
1115
1116        install: install the software after the download
1117        restart: restart the device after the installation. This option is ignored if install=False
1118
1119        """
1120        sw_version = self.automatic_download_software(version)
1121        if sw_version.current:
1122            self.logger.info(f"Version {sw_version.version} is already installed")
1123            return sw_version
1124        if not install:
1125            return sw_version
1126        # We may get the following error:
1127        # "Error: Upgrading from 10.2.4-h10 to 11.1.2 requires a content version of 8761 or greater and found 8638-7689."
1128        # This should never happen, we decided to report the error and handle this manually
1129        self.logger.info(f"Launching install of version {sw_version.version}")
1130
1131        with self.suspended(suspend):
1132            job_id = self.install_software(sw_version.version)
1133            if not job_id:
1134                self.logger.error("Install has not started")
1135                raise Exception("Install has not started")
1136            job = self.wait_job_completion(job_id)
1137            self.logger.info(job.details)
1138
1139            # Do not restart if install failed
1140            if job.result != "OK":
1141                self.logger.error("Failed to install software version")
1142                return sw_version
1143
1144            if restart:
1145                self.logger.info("Restarting the device")
1146                restart_response = self.restart()
1147                self.logger.info(restart_response)
1148            return sw_version

Automatically download and install the requested software version. if the version is not provided, it defaults to the latest one.

NOTE: This does the software install and restart by default. If you only want to download, prefer to use automatic_download_software method, or set install=False. See the parameters for more information.

install: install the software after the download restart: restart the device after the installation. This option is ignored if install=False

@contextmanager
def suspended(self, suspend=True):
1150    @contextmanager
1151    def suspended(self, suspend=True):
1152        if not suspend:
1153            try:
1154                yield self
1155            finally:
1156                return
1157        self.logger.info("Suspending device")
1158        self.set_ha_status(active=False)
1159        try:
1160            yield self
1161        finally:
1162            self.check_availability()
1163            self.logger.info("Unsuspending device...")
1164            for _ in range(3):
1165                try:
1166                    self.set_ha_status(active=True)
1167                    break
1168                except Exception as e:
1169                    self.logger.error(e)
1170                    # time.sleep()
1171            else:
1172                raise UnsuspendError("Failed to unsuspend device")
1173            self.logger.info("Device successfully unsuspended")
def wait_availability(self):
1175    def wait_availability(self):
1176        logger = self.logger
1177        logger.info("Checking availability. Waiting for response...")
1178        max_duration = 1800
1179        for duration in wait(duration=max_duration):
1180            try:
1181                versions = self.get_versions()
1182                current = next(v for v in versions if v.current)
1183                if current is None:
1184                    logger.warning("Device is not not answering")
1185                else:
1186                    logger.info(f"Device responded after {duration} seconds")
1187                    return current
1188            except (
1189                urllib3.exceptions.MaxRetryError,
1190                requests.exceptions.ConnectionError,
1191            ):
1192                logger.warning("Firewall still not responding")
1193            except ServerError:
1194                raise Exception(
1195                    "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers."
1196                )
1197            except Exception as e:
1198                logger.debug(f"Unexpected error of type {type(e)} occured on firewall")
1199                logger.error(f"Firewall is still not responding: {e}")
1200        raise Exception(
1201            f"Timeout while waiting for availability of firewall. Waited for {max_duration} seconds"
1202        )
def check_availability(self):
1204    def check_availability(self):
1205        logger = self.logger
1206        ## Wait for the FW to respond
1207        version = self.wait_availability()
1208        if not version:
1209            logger.error("Device never responded")
1210            return False
1211        logger.info(f"Firewall is available on version {version.version}")
1212        return True