pa_api.xmlapi
25class XMLApi: 26 def __init__( 27 self, 28 host=None, 29 api_key=None, 30 ispanorama=None, 31 verify=False, 32 timeout=None, 33 logger=None, 34 ): 35 env_host, env_apikey = get_credentials_from_env() 36 host = host or env_host 37 api_key = api_key or env_apikey 38 if not host: 39 raise Exception("Missing Host") 40 if not api_key: 41 raise Exception("Missing API Key") 42 host, _, _ = clean_url_host(host) 43 44 self._host = host 45 self._api_key = api_key 46 self._url = f"{host}/api" 47 self._verify = verify 48 self._timeout = timeout 49 self._ispanorama = ispanorama 50 self.logger = logger or logging 51 52 def _request( 53 self, 54 type, 55 method="GET", 56 vsys=None, 57 params=None, 58 remove_blank_text=True, 59 verify=None, 60 parse=True, 61 stream=None, 62 timeout=None, 63 ): 64 if verify is None: 65 verify = self._verify 66 if timeout is None: 67 timeout = self._timeout 68 headers = {"X-PAN-KEY": self._api_key} 69 return raw_request( 70 self._url, 71 type, 72 method, 73 vsys=vsys, 74 params=params, 75 headers=headers, 76 remove_blank_text=remove_blank_text, 77 verify=verify, 78 logger=self.logger, 79 parse=parse, 80 stream=stream, 81 timeout=timeout, 82 ) 83 84 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/export-files-api 85 # https://knowledgebase.paloaltonetworks.com/KCSArticleDetail?id=kA10g000000ClaOCAS#:~:text=From%20the%20GUI%2C%20go%20to%20Device%20%3E%20Setup,%3E%20scp%20export%20configuration%20%5Btab%20for%20command%20help%5D 86 def _export_request( 87 self, 88 category, 89 method="GET", 90 params=None, 91 verify=None, 92 stream=None, 93 timeout=None, 94 ): 95 if params is None: 96 params = {} 97 params = {"category": category, **params} 98 return self._request( 99 "export", 100 method=method, 101 params=params, 102 verify=verify, 103 parse=False, 104 stream=stream, 105 timeout=timeout, 106 ).content 107 108 def export_configuration( 109 self, 110 verify=None, 111 timeout=None, 112 ) -> Element: 113 return self._export_request( 114 category="configuration", 115 verify=verify, 116 timeout=timeout, 117 ) 118 119 def export_device_state( 120 self, 121 verify=None, 122 timeout=None, 123 ) -> Element: 124 return self._export_request( 125 category="device-state", 126 verify=verify, 127 timeout=timeout, 128 ) 129 130 def _conf_request( 131 self, 132 xpath, 133 action="get", 134 method="GET", 135 vsys=None, 136 params=None, 137 remove_blank_text=True, 138 verify=None, 139 timeout=None, 140 ) -> Element: 141 if params is None: 142 params = {} 143 params = {"action": action, "xpath": xpath, **params} 144 return self._request( 145 "config", 146 method=method, 147 vsys=vsys, 148 params=params, 149 remove_blank_text=remove_blank_text, 150 verify=verify, 151 timeout=timeout, 152 ) 153 154 def _op_request( 155 self, 156 cmd, 157 method="POST", 158 vsys=None, 159 params=None, 160 remove_blank_text=True, 161 verify=None, 162 timeout=None, 163 ) -> Element: 164 if params is None: 165 params = {} 166 params = {"cmd": cmd, **params} 167 return self._request( 168 "op", 169 method=method, 170 vsys=vsys, 171 params=params, 172 remove_blank_text=remove_blank_text, 173 verify=verify, 174 timeout=timeout, 175 ) 176 177 def _commit_request( 178 self, 179 cmd, 180 method="POST", 181 params=None, 182 remove_blank_text=True, 183 verify=None, 184 timeout=None, 185 ): 186 if params is None: 187 params = {} 188 params = {"cmd": cmd, **params} 189 return self._request( 190 "commit", 191 method=method, 192 params=params, 193 remove_blank_text=remove_blank_text, 194 verify=verify, 195 timeout=timeout, 196 ) 197 198 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/get-started-with-the-pan-os-xml-api/get-your-api-key 199 def generate_apikey(self, username, password: str) -> str: 200 """ 201 Generate a new API-Key for the user connected. 202 """ 203 params = {"user": username, "password": password} 204 return self._request( 205 "keygen", 206 method="POST", 207 params=params, 208 ).xpath(".//key/text()")[0] 209 210 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/get-version-info-api 211 def api_version(self): 212 return el2dict( 213 self._request( 214 "version", 215 method="POST", 216 ).xpath(".//result")[0] 217 )["result"] 218 219 def configuration( 220 self, 221 xpath, 222 action="get", 223 method="GET", 224 params=None, 225 remove_blank_text=True, 226 ): 227 return self._conf_request( 228 xpath, 229 action=action, 230 method=method, 231 params=params, 232 remove_blank_text=remove_blank_text, 233 ) 234 235 def operation( 236 self, 237 cmd, 238 method="POST", 239 params=None, 240 remove_blank_text=True, 241 ): 242 return self._op_request( 243 cmd, 244 method=method, 245 params=params, 246 remove_blank_text=remove_blank_text, 247 ) 248 249 def _check_is_panorama(self) -> bool: 250 try: 251 self.configuration("/config/panorama/vsys") 252 return False 253 except Exception: 254 return True 255 256 @property 257 def ispanorama(self): 258 if self._ispanorama is None: 259 self._ispanorama = self._check_is_panorama() 260 return self._ispanorama 261 262 def get_tree(self, extended=False) -> Element: 263 """ 264 Return the running configuration 265 The differences with `running_config` are not known 266 """ 267 tree = get_tree( 268 self._host, self._api_key, verify=self._verify, logger=self.logger 269 ) 270 if extended: 271 self._extend_tree_information(tree) 272 return tree 273 274 def _get_rule_use(self, device_group, position, rule_type, number: int = 200): 275 results = [] 276 for i in range(100): 277 cmd = _get_rule_use_cmd( 278 device_group, 279 position, 280 rule_type, 281 i * number, 282 number, 283 ) 284 res = self._op_request(cmd).xpath("result")[0] 285 total_count = int(res.attrib["total-count"]) 286 results.extend(res.xpath("entry")) 287 if len(results) >= total_count: 288 break 289 return results 290 291 def get_rule_use(self, tree=None, max_threads: Optional[int] = None): 292 if tree is None: 293 tree = self.get_tree() 294 device_groups = tree.xpath("devices/*/device-group/*/@name") 295 positions = ("pre", "post") 296 # rule_types = tuple({x.tag for x in tree.xpath( 297 # "devices/*/device-group/*" 298 # "/*[self::post-rulebase or self::pre-rulebase]/*")}) 299 rule_types = ("security", "pbf", "nat", "application-override") 300 args_list = list(product(device_groups, positions, rule_types)) 301 302 def func(args): 303 return self._get_rule_use(*args) 304 305 threads = len(args_list) 306 threads = min(max_threads or threads, threads) 307 with Pool(len(args_list)) as pool: 308 data = pool.map(func, args_list) 309 return [entry for entry_list in data for entry in entry_list] 310 311 def _get_rule_hit_count(self, device_group, rulebase, rule_type): 312 cmd = ( 313 "<show><rule-hit-count><device-group>" 314 f"<entry name='{device_group}'><{rulebase}><entry name='{rule_type}'>" 315 f"<rules><all/></rules></entry></{rulebase}></entry>" 316 "</device-group></rule-hit-count></show>" 317 ) 318 res = self._op_request(cmd) 319 entries = res.xpath(".//rules/entry") or [] 320 # return entries 321 return [(device_group, rulebase, rule_type, e) for e in entries] 322 323 def get_rule_hit_count(self, tree=None, max_threads=None): 324 if tree is None: 325 tree = self.get_tree() 326 device_groups = tree.xpath("devices/*/device-group/*/@name") 327 rulebases = ("pre-rulebase", "post-rulebase") 328 rule_types = ("security", "pbf", "nat", "application-override") 329 args_list = list(product(device_groups, rulebases, rule_types)) 330 331 def func(args): 332 return self._get_rule_hit_count(*args) 333 334 threads = len(args_list) 335 threads = min(max_threads or threads, threads) 336 with Pool(len(args_list)) as pool: 337 data = pool.map(func, args_list) 338 return [entry for entry_list in data for entry in entry_list] 339 340 def _extend_tree_information( 341 self, 342 tree, 343 extended=None, 344 max_threads=None, 345 ): 346 """ 347 Incorporate usage statistics into the configuration. 348 tree: the configuration as a XML object 349 extended: rule-use data (if not provided, the function will retrieve them automatically) 350 """ 351 if extended is None: 352 extended = self.get_rule_use(tree, max_threads=max_threads) 353 rules = tree.xpath( 354 ".//device-group/entry/" 355 "*[self::pre-rulebase or self::post-rulebase]/*/rules/entry[@uuid]", 356 ) 357 ext_dict = {x.attrib.get("uuid"): x for x in extended} 358 rules_dict = {x.attrib["uuid"]: x for x in rules} 359 for ext, rule in map_dicts(ext_dict, rules_dict): 360 extend_element(rule, ext) 361 # NOTE: Do not use rule.extend(ext) 362 # => This is causing duplicates entries 363 return tree, extended 364 365 def get(self, xpath: str): 366 """ 367 This will retrieve the xml definition based on the xpath 368 The xpath doesn't need to be exact 369 and can select multiple values at once. 370 Still, it must at least speciy /config at is begining 371 """ 372 return self._conf_request(xpath, action="show", method="GET") 373 374 def delete(self, xpath: str): 375 """ 376 This will REMOVE the xml definition at the provided xpath. 377 The xpath must be exact. 378 """ 379 return self._conf_request( 380 xpath, 381 action="delete", 382 method="DELETE", 383 ) 384 385 def create(self, xpath: str, xml_definition): 386 """ 387 This will ADD the xml definition 388 INSIDE the element at the provided xpath. 389 The xpath must be exact. 390 """ 391 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration 392 params = {"element": xml_definition} 393 return self._conf_request( 394 xpath, 395 action="set", 396 method="POST", 397 params=params, 398 ) 399 400 def update(self, xpath: str, xml_definition): 401 """ 402 This will REPLACE the xml definition 403 INSTEAD of the element at the provided xpath 404 The xpath must be exact. 405 Nb: We can pull the whole config, update it locally, 406 and push the final result 407 """ 408 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration 409 params = {"element": xml_definition} 410 return self._conf_request( 411 xpath, 412 action="edit", 413 method="POST", 414 params=params, 415 ) 416 417 def revert_changes(self, skip_validated: bool = False): 418 """ 419 Revert all the changes made on Panorama. 420 NOTE: 421 - This only applies on non-commited changes. 422 - This revert everything (not scoped by users) 423 424 skip_validated: Do not revert changes that were validated 425 """ 426 skip = "<skip-validate>yes</skip-validate>" if skip_validated else "" 427 cmd = f"<revert><config>{skip}</config></revert>" 428 return self._op_request(cmd) 429 430 def validate_changes(self): 431 """ 432 Validated all the changes currently made 433 """ 434 cmd = "<validate><full></full></validate>" 435 return self._op_request(cmd) 436 437 def _raw_get_push_scope(self, admin=None): 438 """ 439 Gives detailed information about pending changes 440 (e.g. xpath, owner, action, ...) 441 """ 442 filter = f"<admin><member>{admin}</member></admin>" if admin else "" 443 cmd = f"<show><config><push-scope>{filter}</push-scope></config></show>" 444 return self._op_request(cmd) 445 446 def get_push_scope_devicegroups(self, admin=None): 447 """ 448 Gives detailed information about pending changes 449 (e.g. xpath, owner, action, ...) 450 """ 451 scope = self._raw_get_push_scope(admin=admin) 452 return list(set(scope.xpath(".//objects/entry[@loc-type='device-group']/@loc"))) 453 454 def uncommited_changes(self): 455 """ 456 Gives detailed information about pending changes 457 (e.g. xpath, owner, action, ...) 458 """ 459 cmd = "<show><config><list><changes></changes></list></config></show>" 460 return self._op_request(cmd) 461 462 def uncommited_changes_summary(self, admin=None): 463 """ 464 Only gives the concern device groups 465 """ 466 admin = ( 467 f"<partial><admin><member>{admin}</member></admin></partial>" 468 if admin 469 else "" 470 ) 471 cmd = f"<show><config><list><change-summary>{admin}</change-summary></list></config></show>" 472 return self._op_request(cmd) 473 474 def pending_changes(self): 475 """ 476 Result content is either 'yes' or 'no' 477 """ 478 cmd = "<check><pending-changes></pending-changes></check>" 479 return self._op_request(cmd) 480 481 def save_config(self, name): 482 """ 483 Create a named snapshot of the current configuration 484 """ 485 cmd = f"<save><config><to>{name}</to></config></save>" 486 return "\n".join(self._op_request(cmd).xpath(".//result/text()")) 487 488 def save_device_state(self): 489 """ 490 Create a snapshot of the current device state 491 """ 492 cmd = "<save><device-state></device-state></save>" 493 return "\n".join(self._op_request(cmd).xpath(".//result/text()")) 494 495 def get_named_configuration(self, name): 496 """ 497 Get the configuration from a named snapshot as an XML object 498 """ 499 cmd = f"<show><config><saved>{name}</saved></config></show>" 500 return self._op_request(cmd, remove_blank_text=False).xpath("./result/config")[ 501 0 502 ] 503 504 def candidate_config(self) -> Element: 505 """ 506 Get the configuration to be commited as an XML object 507 """ 508 cmd = "<show><config><candidate></candidate></config></show>" 509 return self._op_request(cmd, remove_blank_text=False) 510 511 def running_config(self) -> Element: 512 """ 513 Get the current running configuration as an XML object 514 """ 515 cmd = "<show><config><running></running></config></show>" 516 return self._op_request(cmd, remove_blank_text=False) 517 518 def _raw_get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> Element: 519 """ 520 Get information of job(s) as an XML object. 521 Retrieve all jobs by default. 522 523 If job_id is provided, then only retrieve the job requested. 524 """ 525 filter = "<all></all>" 526 if job_ids: 527 if isinstance(job_ids, str): 528 job_ids = [job_ids] 529 filter = "".join(f"<id>{j}</id>" for j in job_ids) 530 cmd = f"<show><jobs>{filter}</jobs></show>" 531 return self._op_request(cmd) 532 533 def get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> List[types.Job]: 534 """ 535 Get information of job(s) 536 Retrieve all jobs by default. 537 538 If job_id is provided, then only retrieve the job requested. 539 """ 540 job_xmls = self._raw_get_jobs(job_ids).xpath(".//job") 541 transformed = (types.Job.from_xml(x) for x in job_xmls) 542 return [j for j in transformed if j] 543 544 def get_job(self, job_id) -> types.Job: 545 """ 546 Get information of job(s) 547 Retrieve all jobs by default. 548 549 If job_id is provided, then only retrieve the job requested. 550 """ 551 return self.get_jobs(job_id)[0] 552 553 def _raw_get_versions(self) -> Element: 554 """ 555 Get the versions informations as a XML object. 556 """ 557 cmd = "<request><system><software><check></check></software></system></request>" 558 return self.operation(cmd) 559 560 def get_versions(self) -> List[types.SoftwareVersion]: 561 """ 562 Get the versions informations 563 """ 564 res = self._raw_get_versions() 565 return [ 566 types.SoftwareVersion.from_xml(entry) 567 for entry in res.xpath(".//sw-updates/versions/entry") 568 ] 569 570 def wait_job_completion(self, job_id: str, waiter=None) -> types.Job: 571 """ 572 Block until the job complete. 573 574 job_id: the job to wait upon 575 waiter: a generator that yield when a new query must be done. 576 see `wait` function (the default waiter) for an example 577 """ 578 if not waiter: 579 waiter = wait() 580 for _ in waiter: 581 job = self.get_job(job_id) 582 if job.progress >= 100: 583 return job 584 self.logger.info(f"Job {job_id} progress: {job.progress}") 585 raise Exception("Timeout while waiting for job completion") 586 587 def raw_get_pending_jobs(self): 588 """ 589 Get all the jobs that are pending as a XML object 590 """ 591 cmd = "<show><jobs><pending></pending></jobs></show>" 592 return self._op_request(cmd) 593 594 def commit_changes(self, force: bool = False): 595 """ 596 Commit all changes 597 """ 598 cmd = "<commit>{}</commit>".format("<force></force>" if force else "") 599 return self._commit_request(cmd) 600 601 def _lock_cmd(self, cmd, vsys, no_exception=False) -> bool: 602 """ 603 Utility function for commands that tries to manipulate the lock 604 on Panorama. 605 """ 606 try: 607 result = "".join(self._op_request(cmd, vsys=vsys).itertext()) 608 self.logger.debug(result) 609 except Exception as e: 610 if no_exception: 611 self.logger.error(e) 612 return False 613 raise 614 return True 615 616 # https://github.com/PaloAltoNetworks/pan-os-python/blob/a6b018e3864ff313fed36c3804394e2c92ca87b3/panos/base.py#L4459 617 def add_config_lock(self, comment=None, vsys="shared", no_exception=False) -> bool: 618 comment = f"<comment>{comment}</comment>" if comment else "" 619 cmd = f"<request><config-lock><add>{comment}</add></config-lock></request>" 620 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception) 621 622 def remove_config_lock(self, vsys="shared", no_exception=False) -> bool: 623 cmd = "<request><config-lock><remove></remove></config-lock></request>" 624 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception) 625 626 def add_commit_lock(self, comment=None, vsys="shared", no_exception=False) -> bool: 627 comment = f"<comment>{comment}</comment>" if comment else "" 628 cmd = f"<request><commit-lock><add>{comment}</add></commit-lock></request>" 629 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception) 630 631 def remove_commit_lock(self, vsys="shared", no_exception=False) -> bool: 632 cmd = "<request><commit-lock><remove></remove></commit-lock></request>" 633 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception) 634 635 def set_ha_status(self, active: bool = True, target: Optional[str] = None): 636 """ 637 Activate or Deactivate (suspend) the HA pair. 638 639 """ 640 status = "<functional></functional>" if active else "<suspend></suspend>" 641 cmd = f"<request><high-availability><state>{status}</state></high-availability></request>" 642 params = {"target": target} if target else None 643 return self._op_request(cmd, params=params).xpath(".//result/text()")[0] 644 645 def set_ha_preemption(self, active=True, target=None): 646 """ 647 NOT WORKING: 648 There is currently no way to deactivate the preemption using the API. 649 """ 650 raise Exception("set_ha_preemption not implementend") 651 652 def _raw_get_ha_info(self, state_only=False, target=None) -> Element: 653 """ 654 Get the current state of a HA pair as a XML object. 655 """ 656 filter = "<state></state>" if state_only else "<all></all>" 657 cmd = f"<show><high-availability>{filter}</high-availability></show>" 658 params = {"target": target} if target else None 659 return self._op_request(cmd, params=params) 660 661 def get_ha_info(self, state_only=False, target=None) -> types.HAInfo: 662 """ 663 Get the current state of a HA pair as a python object. 664 """ 665 res = self._raw_get_ha_info(state_only=state_only, target=target) 666 hainfo_xml = res.xpath(".//result")[0] 667 # pprint(hainfo_xml) 668 return types.HAInfo.from_xml(hainfo_xml) 669 670 def get_ha_pairs( 671 self, connected=True 672 ) -> Tuple[List[Tuple[types.Device, Optional[types.Device]]], List[types.Device]]: 673 """ 674 Retrieve a tuple containing 2 values: 675 1. The list of HA pairs and their members 676 2. A list of devices that are not part of a HA pair 677 """ 678 # Get all devices and index them using their serial number 679 devices: List[types.Device] = self.get_devices(connected=connected) 680 device_map = {d.serial: d for d in devices} 681 682 # Create the 2 lists by iterating over the devices 683 done = set() 684 ha_pairs = [] 685 without_ha = [] 686 for d in devices: 687 # Do not manage twice the same device 688 if d.serial in done: 689 continue 690 # The device does not have an HA peer 691 if not d.ha_peer_serial: 692 without_ha.append(d) 693 done.add(d.serial) 694 continue 695 # Get the current device's HA peer 696 # push them in the ha_pairs list 697 # and mark both of them as done 698 peer = device_map.get(d.ha_peer_serial) 699 ha_pairs.append((d, peer)) 700 done.update((d.serial, d.ha_peer_serial)) 701 return ha_pairs, without_ha 702 703 def get_ha_pairs_map(self, connected=True): 704 """ 705 Same as `get_ha_pairs`, but the ha_pairs are return as a map. 706 This provides an easier and more readable lookup to find a pair: 707 708 mapping, _ = client.get_ha_pairs_map() 709 serial = "12345" 710 pair_of_serial = mapping[serial] 711 """ 712 ha_pairs, without_ha = self.get_ha_pairs(connected=connected) 713 map = {} 714 for pair in ha_pairs: 715 for device in pair: 716 map[device.serial] = pair 717 return map, without_ha 718 719 def get_panorama_status(self): 720 """ 721 Get the current status of Panorama server. 722 """ 723 cmd = "<show><panorama-status></panorama-status></show>" 724 return self.operation(cmd).xpath(".//result") 725 726 def raw_get_local_panorama(self): 727 return self.configuration( 728 "/config/devices/entry/deviceconfig/system/panorama/local-panorama/panorama-server" 729 ) 730 731 def get_local_panorama_ip(self) -> Optional[str]: 732 res = self.raw_get_local_panorama() 733 return first(res.xpath("//panorama-server/text()")) 734 735 def _raw_get_devices(self, connected=False): 736 """ 737 Return the list of device known from Panorama as a XML object. 738 NOTE: This only works if the client is a Panorama server. 739 740 connected: only returns the devices that are connected 741 """ 742 # This only works on Panorama, not the FW 743 filter = "<connected></connected>" if connected else "<all></all>" 744 cmd = f"<show><devices>{filter}</devices></show>" 745 return self.operation(cmd) 746 747 def get_devices(self, connected=False) -> List[types.Device]: 748 """ 749 Return the list of device known from Panorama as a python structure. 750 NOTE: This only works if the client is a Panorama server. 751 752 connected: only returns the devices that are connected 753 """ 754 res = self._raw_get_devices(connected=connected) 755 entries = res.xpath(".//devices/entry") 756 devices = (types.Device.from_xml(e) for e in entries) 757 return [d for d in devices if d] 758 759 def _raw_get_dg_hierarchy(self): 760 """ 761 Return the hierarchy of device groups as a XML object. 762 """ 763 cmd = "<show><dg-hierarchy></dg-hierarchy></show>" 764 return self.operation(cmd) 765 766 def get_plan_dg_hierarchy(self, recursive=False): 767 """ 768 Return the hierarchy of device groups as a dict. 769 The keys are the names of the device groups. 770 771 The values are the children device groups and depends on the recursive parameter. 772 recursive: if False, the values are only the direct children of the device group. 773 Otherwise, the values are all the descendant device groups. 774 """ 775 devicegroups = {} # name: children 776 hierarchy = self._raw_get_dg_hierarchy().xpath(".//dg-hierarchy")[0] 777 xpath = ".//dg" if recursive else "./dg" 778 for dg in hierarchy.xpath(".//dg"): 779 devicegroups[dg.attrib["name"]] = [ 780 x.attrib["name"] for x in dg.xpath(xpath) 781 ] 782 return devicegroups 783 784 def _raw_get_devicegroups(self): 785 """ 786 Return the list of device groups as a XML object. 787 """ 788 cmd = "<show><devicegroups></devicegroups></show>" 789 return self.operation(cmd) 790 791 def get_devicegroups_name( 792 self, 793 parents=None, 794 with_connected_devices=None, 795 ): 796 """ 797 This returns the names of the devicegroups: 798 - parents: the returned list will only contain children of the provided parents (parents included) 799 - with_devices: the returned list will only contain devicegroups that have direct devices under them 800 """ 801 devicegroups = self._raw_get_devicegroups().xpath(".//devicegroups/entry") 802 if with_connected_devices: 803 names = [ 804 dg.attrib["name"] 805 for dg in devicegroups 806 if dg.xpath("./devices/entry/connected[text() = 'yes']") 807 ] 808 else: 809 names = [dg.attrib["name"] for dg in devicegroups] 810 if parents: 811 hierarchy = self.get_plan_dg_hierarchy(recursive=True) 812 tokeep = set(chain(*(hierarchy.get(p, []) for p in parents))) | set(parents) 813 names = list(set(names) & tokeep) 814 return names 815 816 def _raw_get_addresses(self): 817 """ 818 Return the list of addresses known from Panorama as a XML object. 819 NOTE: This only works if the client is a Firewall. 820 """ 821 if self.ispanorama: 822 return self.configuration( 823 "/config/devices/entry/device-group/entry/address" 824 ) 825 return self.configuration("/config/panorama/vsys//address") 826 827 def get_addresses(self) -> List[types.Address]: 828 """ 829 Return the list of addresses known from Panorama as a python structure. 830 NOTE: This only works if the client is a Firewall. 831 """ 832 res = self._raw_get_addresses() 833 addresses = res.xpath(".//address/entry") 834 return [types.Address.from_xml(i) for i in addresses] 835 836 def _raw_get_routing_tables(self) -> Element: 837 """ 838 Return the list of interfaces known from Panorama as a XML object. 839 NOTE: This only works if the client is a Firewall. 840 """ 841 return self.configuration( 842 "/config/devices/entry/network/virtual-router/entry/routing-table" 843 ) 844 845 def get_routing_tables(self) -> List[types.RoutingTable]: 846 """ 847 Return the list of interface known from Panorama as a python structure. 848 NOTE: This only works if the client is a Firewall. 849 """ 850 res = self._raw_get_routing_tables() 851 routing_tables = res.xpath(".//routing-table") 852 # print(len(routing_tables)) 853 # from pa_api.xmlapi.utils import pprint 854 # for r in routing_tables: 855 # pprint(r) 856 return [types.RoutingTable.from_xml(i) for i in routing_tables] 857 858 def _raw_get_interfaces(self) -> Element: 859 """ 860 Return the list of interfaces known from Panorama as a XML object. 861 NOTE: This only works if the client is a Firewall. 862 """ 863 return self.configuration("/config/devices/entry/network/interface") 864 865 def get_interfaces(self) -> List[types.Interface]: 866 """ 867 Return the list of interface known from Panorama as a python structure. 868 NOTE: This only works if the client is a Firewall. 869 """ 870 res = self._raw_get_interfaces() 871 interfaces = res.xpath(".//interface") 872 return [types.Interface.from_xml(i) for i in interfaces] 873 874 def _raw_get_zones(self) -> Element: 875 """ 876 Return the list of zones known from Panorama as a XML object. 877 NOTE: This only works if the client is a Firewall. 878 """ 879 if self.ispanorama: 880 return self.configuration( 881 "/config/devices/entry/*/entry/config/devices/entry/vsys/entry/zone" 882 ) 883 return self.configuration("/config/devices/entry/vsys/entry/zone") 884 885 def get_zones(self) -> Element: 886 """ 887 Return the list of zones known from Panorama as a python structure. 888 NOTE: This only works if the client is a Firewall. 889 """ 890 res = self._raw_get_zones() 891 zones = res.xpath(".//zone/entry") 892 return [types.Zone.from_xml(i) for i in zones] 893 894 def _raw_get_templates(self, name=None) -> Element: 895 """ 896 Return the synchronization status the templates per devices as a XML object. 897 A device is in sync if it is up-to-date with the current version on Panorama. 898 NOTE: This only works on Panorama. 899 """ 900 # This only works on Panorama, not the FW 901 filter = f"<name>{name}</name>" if name else "" 902 cmd = f"<show><templates>{filter}</templates></show>" 903 return self.operation(cmd) 904 905 def get_templates_sync_status(self): 906 """ 907 Return the synchronization status the templates per devices 908 A device is in sync if it is up-to-date with the current version on Panorama. 909 NOTE: This only works on Panorama. 910 911 The result is a list of tuple of 3 values: 912 1. the template's name 913 2. the device's name 914 3. the sync status 915 """ 916 res = self._raw_get_templates() 917 statuses = [] 918 for entry in res.xpath("./result/templates/entry"): 919 template_name = entry.attrib["name"] 920 for device in entry.xpath("./devices/entry"): 921 device_name = device.attrib["name"] 922 template_status = next(device.xpath("./template-status/text()"), None) 923 status = (template_name, device_name, template_status) 924 statuses.append(status) 925 return statuses 926 927 def _raw_get_vpn_flows(self, name=None) -> List[Element]: 928 """ 929 Returns the VPN flow information as a XML object. 930 NOTE: This only works on Panorama server, not firewalls 931 """ 932 # This only works on Panorama, not the FW" 933 filter = f"<name>{name}</name>" if name else "<all></all>" 934 cmd = f"<show><vpn><flow>{filter}</flow></vpn></show>" 935 return self.operation(cmd) 936 937 def get_vpn_flows(self, name=None): 938 """ 939 Returns the VPN flow information as a python structure. 940 NOTE: This only works on Panorama server, not firewalls 941 """ 942 entries = self._raw_get_vpn_flows(name=name).xpath(".//IPSec/entry") 943 return [types.VPNFlow.from_xml(e) for e in entries] 944 945 def _raw_system_info(self): 946 """ 947 Returns informations about the system as a XML object. 948 """ 949 cmd = "<show><system><info></info></system></show>" 950 return self.operation(cmd) 951 952 def system_info(self) -> types.operations.SystemInfo: 953 """ 954 Returns informations about the system as a XML object. 955 """ 956 xml = self._raw_system_info() 957 info = xml.xpath("./result/system")[0] 958 return types.operations.SystemInfo.from_xml(info) 959 960 def _raw_system_resources(self): 961 """ 962 Get the system resouces as a XML object. 963 NOTE: The string is the raw output of a `ps` command. 964 """ 965 cmd = "<show><system><resources></resources></system></show>" 966 return self.operation(cmd) 967 968 def system_resources(self): 969 """ 970 Get the system resouces as a string. 971 The string is the raw output of a `ps` command. 972 """ 973 res = self._raw_system_resources() 974 text = res.xpath(".//result/text()")[0] 975 return text.split("\n\n")[0] 976 977 def _raw_download_software(self, version): 978 """ 979 Download the software version on the device. 980 version: the software version to download 981 """ 982 cmd = f"<request><system><software><download><version>{version}</version></download></software></system></request>" 983 return self.operation(cmd) 984 985 def download_software(self, version) -> Optional[str]: 986 """ 987 Download the software version on the device. 988 version: the software version to download 989 990 Returns the download job's ID in case of successful launch, 991 None is returned otherwise. 992 """ 993 res = self._raw_download_software(version) 994 try: 995 return res.xpath(".//job/text()")[0] 996 except Exception: 997 self.logger.debug("Download has not started") 998 return None 999 1000 def _raw_install_software(self, version): 1001 """ 1002 Install the software version on the device. 1003 version: the software version to install 1004 """ 1005 cmd = f"<request><system><software><install><version>{version}</version></install></software></system></request>" 1006 return self.operation(cmd) 1007 1008 def install_software( 1009 self, version: Union[None, str, types.SoftwareVersion] 1010 ) -> Optional[str]: 1011 """ 1012 Install the software version on the device. 1013 version: the software version to install 1014 1015 Returns the download job's ID in case of successful launch, 1016 None is returned otherwise. 1017 """ 1018 if isinstance(version, types.SoftwareVersion): 1019 version = version.version 1020 res = self._raw_install_software(version) 1021 try: 1022 return res.xpath(".//job/text()")[0] 1023 except Exception: 1024 self.logger.debug("Download has not started") 1025 return None 1026 1027 def _raw_restart(self): 1028 """ 1029 Restart the device 1030 """ 1031 cmd = "<request><restart><system></system></restart></request>" 1032 return self.operation(cmd) 1033 1034 def restart(self): 1035 """ 1036 Restart the device 1037 """ 1038 return "".join(self._raw_restart().xpath(".//result/text()")) 1039 1040 def automatic_download_software( 1041 self, version: Optional[str] = None 1042 ) -> types.SoftwareVersion: 1043 """ 1044 Automatically download the requested software version. 1045 if the version is not provided, it defaults to the latest one. 1046 1047 NOTE: This does not do the installation. 1048 This is usefull to download in anticipation of the upgrade. 1049 For automatic install, see `automatic_software_upgrade` 1050 """ 1051 version_str = version 1052 try: 1053 versions = self.get_versions() 1054 except ServerError: 1055 raise Exception( 1056 "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers." 1057 ) 1058 sw_version = None 1059 if not version_str: 1060 sw_version = next((v for v in versions if v.latest), None) 1061 else: 1062 sw_version = next((v for v in versions if v.version == version_str), None) 1063 if not sw_version: 1064 self.logger.error(f"Version {version_str} not found") 1065 return exit(1) 1066 1067 # Already downloaded: Nothing to do 1068 if sw_version.downloaded: 1069 self.logger.info(f"Version {sw_version.version} already downloaded") 1070 return sw_version 1071 1072 # Download minor version first (required) 1073 base_version = next( 1074 v for v in versions if v.version == sw_version.base_minor_version 1075 ) 1076 if not base_version.downloaded: 1077 self.logger.info( 1078 f"Launching download of minor version {base_version.version}" 1079 ) 1080 job_id = self.download_software(base_version.version) 1081 if not job_id: 1082 raise Exception("Download has not started") 1083 job = self.wait_job_completion(job_id) 1084 if job.result != "OK": 1085 self.logger.debug(job) 1086 raise Exception(job.details) 1087 print(job.details) 1088 1089 # Actually download the wanted version 1090 self.logger.info(f"Launching download of version {sw_version.version}") 1091 job_id = self.download_software(sw_version.version) 1092 if not job_id: 1093 raise Exception("Download has not started") 1094 job = self.wait_job_completion(job_id) 1095 if job.result != "OK": 1096 self.logger.debug(job) 1097 raise Exception(job.details) 1098 self.logger.info(job.details) 1099 return sw_version 1100 1101 def automatic_software_upgrade( 1102 self, 1103 version: Optional[str] = None, 1104 install: bool = True, 1105 restart: bool = True, 1106 suspend: bool = False, 1107 ): 1108 """ 1109 Automatically download and install the requested software version. 1110 if the version is not provided, it defaults to the latest one. 1111 1112 NOTE: This does the software install and restart by default. 1113 If you only want to download, prefer to use `automatic_download_software` method, 1114 or set install=False. See the parameters for more information. 1115 1116 install: install the software after the download 1117 restart: restart the device after the installation. This option is ignored if install=False 1118 1119 """ 1120 sw_version = self.automatic_download_software(version) 1121 if sw_version.current: 1122 self.logger.info(f"Version {sw_version.version} is already installed") 1123 return sw_version 1124 if not install: 1125 return sw_version 1126 # We may get the following error: 1127 # "Error: Upgrading from 10.2.4-h10 to 11.1.2 requires a content version of 8761 or greater and found 8638-7689." 1128 # This should never happen, we decided to report the error and handle this manually 1129 self.logger.info(f"Launching install of version {sw_version.version}") 1130 1131 with self.suspended(suspend): 1132 job_id = self.install_software(sw_version.version) 1133 if not job_id: 1134 self.logger.error("Install has not started") 1135 raise Exception("Install has not started") 1136 job = self.wait_job_completion(job_id) 1137 self.logger.info(job.details) 1138 1139 # Do not restart if install failed 1140 if job.result != "OK": 1141 self.logger.error("Failed to install software version") 1142 return sw_version 1143 1144 if restart: 1145 self.logger.info("Restarting the device") 1146 restart_response = self.restart() 1147 self.logger.info(restart_response) 1148 return sw_version 1149 1150 @contextmanager 1151 def suspended(self, suspend=True): 1152 if not suspend: 1153 try: 1154 yield self 1155 finally: 1156 return 1157 self.logger.info("Suspending device") 1158 self.set_ha_status(active=False) 1159 try: 1160 yield self 1161 finally: 1162 self.check_availability() 1163 self.logger.info("Unsuspending device...") 1164 for _ in range(3): 1165 try: 1166 self.set_ha_status(active=True) 1167 break 1168 except Exception as e: 1169 self.logger.error(e) 1170 # time.sleep() 1171 else: 1172 raise UnsuspendError("Failed to unsuspend device") 1173 self.logger.info("Device successfully unsuspended") 1174 1175 def wait_availability(self): 1176 logger = self.logger 1177 logger.info("Checking availability. Waiting for response...") 1178 max_duration = 1800 1179 for duration in wait(duration=max_duration): 1180 try: 1181 versions = self.get_versions() 1182 current = next(v for v in versions if v.current) 1183 if current is None: 1184 logger.warning("Device is not not answering") 1185 else: 1186 logger.info(f"Device responded after {duration} seconds") 1187 return current 1188 except ( 1189 urllib3.exceptions.MaxRetryError, 1190 requests.exceptions.ConnectionError, 1191 ): 1192 logger.warning("Firewall still not responding") 1193 except ServerError: 1194 raise Exception( 1195 "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers." 1196 ) 1197 except Exception as e: 1198 logger.debug(f"Unexpected error of type {type(e)} occured on firewall") 1199 logger.error(f"Firewall is still not responding: {e}") 1200 raise Exception( 1201 f"Timeout while waiting for availability of firewall. Waited for {max_duration} seconds" 1202 ) 1203 1204 def check_availability(self): 1205 logger = self.logger 1206 ## Wait for the FW to respond 1207 version = self.wait_availability() 1208 if not version: 1209 logger.error("Device never responded") 1210 return False 1211 logger.info(f"Firewall is available on version {version.version}") 1212 return True
26 def __init__( 27 self, 28 host=None, 29 api_key=None, 30 ispanorama=None, 31 verify=False, 32 timeout=None, 33 logger=None, 34 ): 35 env_host, env_apikey = get_credentials_from_env() 36 host = host or env_host 37 api_key = api_key or env_apikey 38 if not host: 39 raise Exception("Missing Host") 40 if not api_key: 41 raise Exception("Missing API Key") 42 host, _, _ = clean_url_host(host) 43 44 self._host = host 45 self._api_key = api_key 46 self._url = f"{host}/api" 47 self._verify = verify 48 self._timeout = timeout 49 self._ispanorama = ispanorama 50 self.logger = logger or logging
199 def generate_apikey(self, username, password: str) -> str: 200 """ 201 Generate a new API-Key for the user connected. 202 """ 203 params = {"user": username, "password": password} 204 return self._request( 205 "keygen", 206 method="POST", 207 params=params, 208 ).xpath(".//key/text()")[0]
Generate a new API-Key for the user connected.
262 def get_tree(self, extended=False) -> Element: 263 """ 264 Return the running configuration 265 The differences with `running_config` are not known 266 """ 267 tree = get_tree( 268 self._host, self._api_key, verify=self._verify, logger=self.logger 269 ) 270 if extended: 271 self._extend_tree_information(tree) 272 return tree
Return the running configuration
The differences with running_config
are not known
291 def get_rule_use(self, tree=None, max_threads: Optional[int] = None): 292 if tree is None: 293 tree = self.get_tree() 294 device_groups = tree.xpath("devices/*/device-group/*/@name") 295 positions = ("pre", "post") 296 # rule_types = tuple({x.tag for x in tree.xpath( 297 # "devices/*/device-group/*" 298 # "/*[self::post-rulebase or self::pre-rulebase]/*")}) 299 rule_types = ("security", "pbf", "nat", "application-override") 300 args_list = list(product(device_groups, positions, rule_types)) 301 302 def func(args): 303 return self._get_rule_use(*args) 304 305 threads = len(args_list) 306 threads = min(max_threads or threads, threads) 307 with Pool(len(args_list)) as pool: 308 data = pool.map(func, args_list) 309 return [entry for entry_list in data for entry in entry_list]
323 def get_rule_hit_count(self, tree=None, max_threads=None): 324 if tree is None: 325 tree = self.get_tree() 326 device_groups = tree.xpath("devices/*/device-group/*/@name") 327 rulebases = ("pre-rulebase", "post-rulebase") 328 rule_types = ("security", "pbf", "nat", "application-override") 329 args_list = list(product(device_groups, rulebases, rule_types)) 330 331 def func(args): 332 return self._get_rule_hit_count(*args) 333 334 threads = len(args_list) 335 threads = min(max_threads or threads, threads) 336 with Pool(len(args_list)) as pool: 337 data = pool.map(func, args_list) 338 return [entry for entry_list in data for entry in entry_list]
365 def get(self, xpath: str): 366 """ 367 This will retrieve the xml definition based on the xpath 368 The xpath doesn't need to be exact 369 and can select multiple values at once. 370 Still, it must at least speciy /config at is begining 371 """ 372 return self._conf_request(xpath, action="show", method="GET")
This will retrieve the xml definition based on the xpath The xpath doesn't need to be exact and can select multiple values at once. Still, it must at least speciy /config at is begining
374 def delete(self, xpath: str): 375 """ 376 This will REMOVE the xml definition at the provided xpath. 377 The xpath must be exact. 378 """ 379 return self._conf_request( 380 xpath, 381 action="delete", 382 method="DELETE", 383 )
This will REMOVE the xml definition at the provided xpath. The xpath must be exact.
385 def create(self, xpath: str, xml_definition): 386 """ 387 This will ADD the xml definition 388 INSIDE the element at the provided xpath. 389 The xpath must be exact. 390 """ 391 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration 392 params = {"element": xml_definition} 393 return self._conf_request( 394 xpath, 395 action="set", 396 method="POST", 397 params=params, 398 )
This will ADD the xml definition INSIDE the element at the provided xpath. The xpath must be exact.
400 def update(self, xpath: str, xml_definition): 401 """ 402 This will REPLACE the xml definition 403 INSTEAD of the element at the provided xpath 404 The xpath must be exact. 405 Nb: We can pull the whole config, update it locally, 406 and push the final result 407 """ 408 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration 409 params = {"element": xml_definition} 410 return self._conf_request( 411 xpath, 412 action="edit", 413 method="POST", 414 params=params, 415 )
This will REPLACE the xml definition INSTEAD of the element at the provided xpath The xpath must be exact. Nb: We can pull the whole config, update it locally, and push the final result
417 def revert_changes(self, skip_validated: bool = False): 418 """ 419 Revert all the changes made on Panorama. 420 NOTE: 421 - This only applies on non-commited changes. 422 - This revert everything (not scoped by users) 423 424 skip_validated: Do not revert changes that were validated 425 """ 426 skip = "<skip-validate>yes</skip-validate>" if skip_validated else "" 427 cmd = f"<revert><config>{skip}</config></revert>" 428 return self._op_request(cmd)
Revert all the changes made on Panorama. NOTE:
- This only applies on non-commited changes.
- This revert everything (not scoped by users)
skip_validated: Do not revert changes that were validated
430 def validate_changes(self): 431 """ 432 Validated all the changes currently made 433 """ 434 cmd = "<validate><full></full></validate>" 435 return self._op_request(cmd)
Validated all the changes currently made
446 def get_push_scope_devicegroups(self, admin=None): 447 """ 448 Gives detailed information about pending changes 449 (e.g. xpath, owner, action, ...) 450 """ 451 scope = self._raw_get_push_scope(admin=admin) 452 return list(set(scope.xpath(".//objects/entry[@loc-type='device-group']/@loc")))
Gives detailed information about pending changes (e.g. xpath, owner, action, ...)
454 def uncommited_changes(self): 455 """ 456 Gives detailed information about pending changes 457 (e.g. xpath, owner, action, ...) 458 """ 459 cmd = "<show><config><list><changes></changes></list></config></show>" 460 return self._op_request(cmd)
Gives detailed information about pending changes (e.g. xpath, owner, action, ...)
462 def uncommited_changes_summary(self, admin=None): 463 """ 464 Only gives the concern device groups 465 """ 466 admin = ( 467 f"<partial><admin><member>{admin}</member></admin></partial>" 468 if admin 469 else "" 470 ) 471 cmd = f"<show><config><list><change-summary>{admin}</change-summary></list></config></show>" 472 return self._op_request(cmd)
Only gives the concern device groups
474 def pending_changes(self): 475 """ 476 Result content is either 'yes' or 'no' 477 """ 478 cmd = "<check><pending-changes></pending-changes></check>" 479 return self._op_request(cmd)
Result content is either 'yes' or 'no'
481 def save_config(self, name): 482 """ 483 Create a named snapshot of the current configuration 484 """ 485 cmd = f"<save><config><to>{name}</to></config></save>" 486 return "\n".join(self._op_request(cmd).xpath(".//result/text()"))
Create a named snapshot of the current configuration
488 def save_device_state(self): 489 """ 490 Create a snapshot of the current device state 491 """ 492 cmd = "<save><device-state></device-state></save>" 493 return "\n".join(self._op_request(cmd).xpath(".//result/text()"))
Create a snapshot of the current device state
495 def get_named_configuration(self, name): 496 """ 497 Get the configuration from a named snapshot as an XML object 498 """ 499 cmd = f"<show><config><saved>{name}</saved></config></show>" 500 return self._op_request(cmd, remove_blank_text=False).xpath("./result/config")[ 501 0 502 ]
Get the configuration from a named snapshot as an XML object
504 def candidate_config(self) -> Element: 505 """ 506 Get the configuration to be commited as an XML object 507 """ 508 cmd = "<show><config><candidate></candidate></config></show>" 509 return self._op_request(cmd, remove_blank_text=False)
Get the configuration to be commited as an XML object
511 def running_config(self) -> Element: 512 """ 513 Get the current running configuration as an XML object 514 """ 515 cmd = "<show><config><running></running></config></show>" 516 return self._op_request(cmd, remove_blank_text=False)
Get the current running configuration as an XML object
533 def get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> List[types.Job]: 534 """ 535 Get information of job(s) 536 Retrieve all jobs by default. 537 538 If job_id is provided, then only retrieve the job requested. 539 """ 540 job_xmls = self._raw_get_jobs(job_ids).xpath(".//job") 541 transformed = (types.Job.from_xml(x) for x in job_xmls) 542 return [j for j in transformed if j]
Get information of job(s) Retrieve all jobs by default.
If job_id is provided, then only retrieve the job requested.
544 def get_job(self, job_id) -> types.Job: 545 """ 546 Get information of job(s) 547 Retrieve all jobs by default. 548 549 If job_id is provided, then only retrieve the job requested. 550 """ 551 return self.get_jobs(job_id)[0]
Get information of job(s) Retrieve all jobs by default.
If job_id is provided, then only retrieve the job requested.
560 def get_versions(self) -> List[types.SoftwareVersion]: 561 """ 562 Get the versions informations 563 """ 564 res = self._raw_get_versions() 565 return [ 566 types.SoftwareVersion.from_xml(entry) 567 for entry in res.xpath(".//sw-updates/versions/entry") 568 ]
Get the versions informations
570 def wait_job_completion(self, job_id: str, waiter=None) -> types.Job: 571 """ 572 Block until the job complete. 573 574 job_id: the job to wait upon 575 waiter: a generator that yield when a new query must be done. 576 see `wait` function (the default waiter) for an example 577 """ 578 if not waiter: 579 waiter = wait() 580 for _ in waiter: 581 job = self.get_job(job_id) 582 if job.progress >= 100: 583 return job 584 self.logger.info(f"Job {job_id} progress: {job.progress}") 585 raise Exception("Timeout while waiting for job completion")
Block until the job complete.
job_id: the job to wait upon
waiter: a generator that yield when a new query must be done.
see wait
function (the default waiter) for an example
587 def raw_get_pending_jobs(self): 588 """ 589 Get all the jobs that are pending as a XML object 590 """ 591 cmd = "<show><jobs><pending></pending></jobs></show>" 592 return self._op_request(cmd)
Get all the jobs that are pending as a XML object
594 def commit_changes(self, force: bool = False): 595 """ 596 Commit all changes 597 """ 598 cmd = "<commit>{}</commit>".format("<force></force>" if force else "") 599 return self._commit_request(cmd)
Commit all changes
617 def add_config_lock(self, comment=None, vsys="shared", no_exception=False) -> bool: 618 comment = f"<comment>{comment}</comment>" if comment else "" 619 cmd = f"<request><config-lock><add>{comment}</add></config-lock></request>" 620 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
626 def add_commit_lock(self, comment=None, vsys="shared", no_exception=False) -> bool: 627 comment = f"<comment>{comment}</comment>" if comment else "" 628 cmd = f"<request><commit-lock><add>{comment}</add></commit-lock></request>" 629 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
635 def set_ha_status(self, active: bool = True, target: Optional[str] = None): 636 """ 637 Activate or Deactivate (suspend) the HA pair. 638 639 """ 640 status = "<functional></functional>" if active else "<suspend></suspend>" 641 cmd = f"<request><high-availability><state>{status}</state></high-availability></request>" 642 params = {"target": target} if target else None 643 return self._op_request(cmd, params=params).xpath(".//result/text()")[0]
Activate or Deactivate (suspend) the HA pair.
645 def set_ha_preemption(self, active=True, target=None): 646 """ 647 NOT WORKING: 648 There is currently no way to deactivate the preemption using the API. 649 """ 650 raise Exception("set_ha_preemption not implementend")
NOT WORKING: There is currently no way to deactivate the preemption using the API.
661 def get_ha_info(self, state_only=False, target=None) -> types.HAInfo: 662 """ 663 Get the current state of a HA pair as a python object. 664 """ 665 res = self._raw_get_ha_info(state_only=state_only, target=target) 666 hainfo_xml = res.xpath(".//result")[0] 667 # pprint(hainfo_xml) 668 return types.HAInfo.from_xml(hainfo_xml)
Get the current state of a HA pair as a python object.
670 def get_ha_pairs( 671 self, connected=True 672 ) -> Tuple[List[Tuple[types.Device, Optional[types.Device]]], List[types.Device]]: 673 """ 674 Retrieve a tuple containing 2 values: 675 1. The list of HA pairs and their members 676 2. A list of devices that are not part of a HA pair 677 """ 678 # Get all devices and index them using their serial number 679 devices: List[types.Device] = self.get_devices(connected=connected) 680 device_map = {d.serial: d for d in devices} 681 682 # Create the 2 lists by iterating over the devices 683 done = set() 684 ha_pairs = [] 685 without_ha = [] 686 for d in devices: 687 # Do not manage twice the same device 688 if d.serial in done: 689 continue 690 # The device does not have an HA peer 691 if not d.ha_peer_serial: 692 without_ha.append(d) 693 done.add(d.serial) 694 continue 695 # Get the current device's HA peer 696 # push them in the ha_pairs list 697 # and mark both of them as done 698 peer = device_map.get(d.ha_peer_serial) 699 ha_pairs.append((d, peer)) 700 done.update((d.serial, d.ha_peer_serial)) 701 return ha_pairs, without_ha
Retrieve a tuple containing 2 values:
- The list of HA pairs and their members
- A list of devices that are not part of a HA pair
703 def get_ha_pairs_map(self, connected=True): 704 """ 705 Same as `get_ha_pairs`, but the ha_pairs are return as a map. 706 This provides an easier and more readable lookup to find a pair: 707 708 mapping, _ = client.get_ha_pairs_map() 709 serial = "12345" 710 pair_of_serial = mapping[serial] 711 """ 712 ha_pairs, without_ha = self.get_ha_pairs(connected=connected) 713 map = {} 714 for pair in ha_pairs: 715 for device in pair: 716 map[device.serial] = pair 717 return map, without_ha
Same as get_ha_pairs
, but the ha_pairs are return as a map.
This provides an easier and more readable lookup to find a pair:
mapping, _ = client.get_ha_pairs_map() serial = "12345" pair_of_serial = mapping[serial]
719 def get_panorama_status(self): 720 """ 721 Get the current status of Panorama server. 722 """ 723 cmd = "<show><panorama-status></panorama-status></show>" 724 return self.operation(cmd).xpath(".//result")
Get the current status of Panorama server.
747 def get_devices(self, connected=False) -> List[types.Device]: 748 """ 749 Return the list of device known from Panorama as a python structure. 750 NOTE: This only works if the client is a Panorama server. 751 752 connected: only returns the devices that are connected 753 """ 754 res = self._raw_get_devices(connected=connected) 755 entries = res.xpath(".//devices/entry") 756 devices = (types.Device.from_xml(e) for e in entries) 757 return [d for d in devices if d]
Return the list of device known from Panorama as a python structure. NOTE: This only works if the client is a Panorama server.
connected: only returns the devices that are connected
766 def get_plan_dg_hierarchy(self, recursive=False): 767 """ 768 Return the hierarchy of device groups as a dict. 769 The keys are the names of the device groups. 770 771 The values are the children device groups and depends on the recursive parameter. 772 recursive: if False, the values are only the direct children of the device group. 773 Otherwise, the values are all the descendant device groups. 774 """ 775 devicegroups = {} # name: children 776 hierarchy = self._raw_get_dg_hierarchy().xpath(".//dg-hierarchy")[0] 777 xpath = ".//dg" if recursive else "./dg" 778 for dg in hierarchy.xpath(".//dg"): 779 devicegroups[dg.attrib["name"]] = [ 780 x.attrib["name"] for x in dg.xpath(xpath) 781 ] 782 return devicegroups
Return the hierarchy of device groups as a dict. The keys are the names of the device groups.
The values are the children device groups and depends on the recursive parameter. recursive: if False, the values are only the direct children of the device group. Otherwise, the values are all the descendant device groups.
791 def get_devicegroups_name( 792 self, 793 parents=None, 794 with_connected_devices=None, 795 ): 796 """ 797 This returns the names of the devicegroups: 798 - parents: the returned list will only contain children of the provided parents (parents included) 799 - with_devices: the returned list will only contain devicegroups that have direct devices under them 800 """ 801 devicegroups = self._raw_get_devicegroups().xpath(".//devicegroups/entry") 802 if with_connected_devices: 803 names = [ 804 dg.attrib["name"] 805 for dg in devicegroups 806 if dg.xpath("./devices/entry/connected[text() = 'yes']") 807 ] 808 else: 809 names = [dg.attrib["name"] for dg in devicegroups] 810 if parents: 811 hierarchy = self.get_plan_dg_hierarchy(recursive=True) 812 tokeep = set(chain(*(hierarchy.get(p, []) for p in parents))) | set(parents) 813 names = list(set(names) & tokeep) 814 return names
This returns the names of the devicegroups:
- parents: the returned list will only contain children of the provided parents (parents included)
- with_devices: the returned list will only contain devicegroups that have direct devices under them
827 def get_addresses(self) -> List[types.Address]: 828 """ 829 Return the list of addresses known from Panorama as a python structure. 830 NOTE: This only works if the client is a Firewall. 831 """ 832 res = self._raw_get_addresses() 833 addresses = res.xpath(".//address/entry") 834 return [types.Address.from_xml(i) for i in addresses]
Return the list of addresses known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.
845 def get_routing_tables(self) -> List[types.RoutingTable]: 846 """ 847 Return the list of interface known from Panorama as a python structure. 848 NOTE: This only works if the client is a Firewall. 849 """ 850 res = self._raw_get_routing_tables() 851 routing_tables = res.xpath(".//routing-table") 852 # print(len(routing_tables)) 853 # from pa_api.xmlapi.utils import pprint 854 # for r in routing_tables: 855 # pprint(r) 856 return [types.RoutingTable.from_xml(i) for i in routing_tables]
Return the list of interface known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.
865 def get_interfaces(self) -> List[types.Interface]: 866 """ 867 Return the list of interface known from Panorama as a python structure. 868 NOTE: This only works if the client is a Firewall. 869 """ 870 res = self._raw_get_interfaces() 871 interfaces = res.xpath(".//interface") 872 return [types.Interface.from_xml(i) for i in interfaces]
Return the list of interface known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.
885 def get_zones(self) -> Element: 886 """ 887 Return the list of zones known from Panorama as a python structure. 888 NOTE: This only works if the client is a Firewall. 889 """ 890 res = self._raw_get_zones() 891 zones = res.xpath(".//zone/entry") 892 return [types.Zone.from_xml(i) for i in zones]
Return the list of zones known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.
905 def get_templates_sync_status(self): 906 """ 907 Return the synchronization status the templates per devices 908 A device is in sync if it is up-to-date with the current version on Panorama. 909 NOTE: This only works on Panorama. 910 911 The result is a list of tuple of 3 values: 912 1. the template's name 913 2. the device's name 914 3. the sync status 915 """ 916 res = self._raw_get_templates() 917 statuses = [] 918 for entry in res.xpath("./result/templates/entry"): 919 template_name = entry.attrib["name"] 920 for device in entry.xpath("./devices/entry"): 921 device_name = device.attrib["name"] 922 template_status = next(device.xpath("./template-status/text()"), None) 923 status = (template_name, device_name, template_status) 924 statuses.append(status) 925 return statuses
Return the synchronization status the templates per devices A device is in sync if it is up-to-date with the current version on Panorama. NOTE: This only works on Panorama.
The result is a list of tuple of 3 values:
- the template's name
- the device's name
- the sync status
937 def get_vpn_flows(self, name=None): 938 """ 939 Returns the VPN flow information as a python structure. 940 NOTE: This only works on Panorama server, not firewalls 941 """ 942 entries = self._raw_get_vpn_flows(name=name).xpath(".//IPSec/entry") 943 return [types.VPNFlow.from_xml(e) for e in entries]
Returns the VPN flow information as a python structure. NOTE: This only works on Panorama server, not firewalls
952 def system_info(self) -> types.operations.SystemInfo: 953 """ 954 Returns informations about the system as a XML object. 955 """ 956 xml = self._raw_system_info() 957 info = xml.xpath("./result/system")[0] 958 return types.operations.SystemInfo.from_xml(info)
Returns informations about the system as a XML object.
968 def system_resources(self): 969 """ 970 Get the system resouces as a string. 971 The string is the raw output of a `ps` command. 972 """ 973 res = self._raw_system_resources() 974 text = res.xpath(".//result/text()")[0] 975 return text.split("\n\n")[0]
Get the system resouces as a string.
The string is the raw output of a ps
command.
985 def download_software(self, version) -> Optional[str]: 986 """ 987 Download the software version on the device. 988 version: the software version to download 989 990 Returns the download job's ID in case of successful launch, 991 None is returned otherwise. 992 """ 993 res = self._raw_download_software(version) 994 try: 995 return res.xpath(".//job/text()")[0] 996 except Exception: 997 self.logger.debug("Download has not started") 998 return None
Download the software version on the device. version: the software version to download
Returns the download job's ID in case of successful launch, None is returned otherwise.
1008 def install_software( 1009 self, version: Union[None, str, types.SoftwareVersion] 1010 ) -> Optional[str]: 1011 """ 1012 Install the software version on the device. 1013 version: the software version to install 1014 1015 Returns the download job's ID in case of successful launch, 1016 None is returned otherwise. 1017 """ 1018 if isinstance(version, types.SoftwareVersion): 1019 version = version.version 1020 res = self._raw_install_software(version) 1021 try: 1022 return res.xpath(".//job/text()")[0] 1023 except Exception: 1024 self.logger.debug("Download has not started") 1025 return None
Install the software version on the device. version: the software version to install
Returns the download job's ID in case of successful launch, None is returned otherwise.
1034 def restart(self): 1035 """ 1036 Restart the device 1037 """ 1038 return "".join(self._raw_restart().xpath(".//result/text()"))
Restart the device
1040 def automatic_download_software( 1041 self, version: Optional[str] = None 1042 ) -> types.SoftwareVersion: 1043 """ 1044 Automatically download the requested software version. 1045 if the version is not provided, it defaults to the latest one. 1046 1047 NOTE: This does not do the installation. 1048 This is usefull to download in anticipation of the upgrade. 1049 For automatic install, see `automatic_software_upgrade` 1050 """ 1051 version_str = version 1052 try: 1053 versions = self.get_versions() 1054 except ServerError: 1055 raise Exception( 1056 "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers." 1057 ) 1058 sw_version = None 1059 if not version_str: 1060 sw_version = next((v for v in versions if v.latest), None) 1061 else: 1062 sw_version = next((v for v in versions if v.version == version_str), None) 1063 if not sw_version: 1064 self.logger.error(f"Version {version_str} not found") 1065 return exit(1) 1066 1067 # Already downloaded: Nothing to do 1068 if sw_version.downloaded: 1069 self.logger.info(f"Version {sw_version.version} already downloaded") 1070 return sw_version 1071 1072 # Download minor version first (required) 1073 base_version = next( 1074 v for v in versions if v.version == sw_version.base_minor_version 1075 ) 1076 if not base_version.downloaded: 1077 self.logger.info( 1078 f"Launching download of minor version {base_version.version}" 1079 ) 1080 job_id = self.download_software(base_version.version) 1081 if not job_id: 1082 raise Exception("Download has not started") 1083 job = self.wait_job_completion(job_id) 1084 if job.result != "OK": 1085 self.logger.debug(job) 1086 raise Exception(job.details) 1087 print(job.details) 1088 1089 # Actually download the wanted version 1090 self.logger.info(f"Launching download of version {sw_version.version}") 1091 job_id = self.download_software(sw_version.version) 1092 if not job_id: 1093 raise Exception("Download has not started") 1094 job = self.wait_job_completion(job_id) 1095 if job.result != "OK": 1096 self.logger.debug(job) 1097 raise Exception(job.details) 1098 self.logger.info(job.details) 1099 return sw_version
Automatically download the requested software version. if the version is not provided, it defaults to the latest one.
NOTE: This does not do the installation.
This is usefull to download in anticipation of the upgrade.
For automatic install, see automatic_software_upgrade
1101 def automatic_software_upgrade( 1102 self, 1103 version: Optional[str] = None, 1104 install: bool = True, 1105 restart: bool = True, 1106 suspend: bool = False, 1107 ): 1108 """ 1109 Automatically download and install the requested software version. 1110 if the version is not provided, it defaults to the latest one. 1111 1112 NOTE: This does the software install and restart by default. 1113 If you only want to download, prefer to use `automatic_download_software` method, 1114 or set install=False. See the parameters for more information. 1115 1116 install: install the software after the download 1117 restart: restart the device after the installation. This option is ignored if install=False 1118 1119 """ 1120 sw_version = self.automatic_download_software(version) 1121 if sw_version.current: 1122 self.logger.info(f"Version {sw_version.version} is already installed") 1123 return sw_version 1124 if not install: 1125 return sw_version 1126 # We may get the following error: 1127 # "Error: Upgrading from 10.2.4-h10 to 11.1.2 requires a content version of 8761 or greater and found 8638-7689." 1128 # This should never happen, we decided to report the error and handle this manually 1129 self.logger.info(f"Launching install of version {sw_version.version}") 1130 1131 with self.suspended(suspend): 1132 job_id = self.install_software(sw_version.version) 1133 if not job_id: 1134 self.logger.error("Install has not started") 1135 raise Exception("Install has not started") 1136 job = self.wait_job_completion(job_id) 1137 self.logger.info(job.details) 1138 1139 # Do not restart if install failed 1140 if job.result != "OK": 1141 self.logger.error("Failed to install software version") 1142 return sw_version 1143 1144 if restart: 1145 self.logger.info("Restarting the device") 1146 restart_response = self.restart() 1147 self.logger.info(restart_response) 1148 return sw_version
Automatically download and install the requested software version. if the version is not provided, it defaults to the latest one.
NOTE: This does the software install and restart by default.
If you only want to download, prefer to use automatic_download_software
method,
or set install=False. See the parameters for more information.
install: install the software after the download restart: restart the device after the installation. This option is ignored if install=False
1150 @contextmanager 1151 def suspended(self, suspend=True): 1152 if not suspend: 1153 try: 1154 yield self 1155 finally: 1156 return 1157 self.logger.info("Suspending device") 1158 self.set_ha_status(active=False) 1159 try: 1160 yield self 1161 finally: 1162 self.check_availability() 1163 self.logger.info("Unsuspending device...") 1164 for _ in range(3): 1165 try: 1166 self.set_ha_status(active=True) 1167 break 1168 except Exception as e: 1169 self.logger.error(e) 1170 # time.sleep() 1171 else: 1172 raise UnsuspendError("Failed to unsuspend device") 1173 self.logger.info("Device successfully unsuspended")
1175 def wait_availability(self): 1176 logger = self.logger 1177 logger.info("Checking availability. Waiting for response...") 1178 max_duration = 1800 1179 for duration in wait(duration=max_duration): 1180 try: 1181 versions = self.get_versions() 1182 current = next(v for v in versions if v.current) 1183 if current is None: 1184 logger.warning("Device is not not answering") 1185 else: 1186 logger.info(f"Device responded after {duration} seconds") 1187 return current 1188 except ( 1189 urllib3.exceptions.MaxRetryError, 1190 requests.exceptions.ConnectionError, 1191 ): 1192 logger.warning("Firewall still not responding") 1193 except ServerError: 1194 raise Exception( 1195 "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers." 1196 ) 1197 except Exception as e: 1198 logger.debug(f"Unexpected error of type {type(e)} occured on firewall") 1199 logger.error(f"Firewall is still not responding: {e}") 1200 raise Exception( 1201 f"Timeout while waiting for availability of firewall. Waited for {max_duration} seconds" 1202 )
1204 def check_availability(self): 1205 logger = self.logger 1206 ## Wait for the FW to respond 1207 version = self.wait_availability() 1208 if not version: 1209 logger.error("Device never responded") 1210 return False 1211 logger.info(f"Firewall is available on version {version.version}") 1212 return True