pa_api.xmlapi
15class Client(BaseXMLApiClient): 16 operation: Operation 17 configuration: Config 18 commit: Commit 19 logs: Log 20 misc: Misc 21 22 def _post_init(self): 23 self.configuration = Config(self) 24 self.operation = Operation(self) 25 self.commit = Commit(self) 26 self.logs = Log(self) 27 self.misc = Misc(self) 28 29 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/export-files-api 30 # https://knowledgebase.paloaltonetworks.com/KCSArticleDetail?id=kA10g000000ClaOCAS#:~:text=From%20the%20GUI%2C%20go%20to%20Device%20%3E%20Setup,%3E%20scp%20export%20configuration%20%5Btab%20for%20command%20help%5D 31 def _export_request( 32 self, 33 category, 34 method="GET", 35 params=None, 36 verify=None, 37 stream=None, 38 timeout=None, 39 ): 40 if params is None: 41 params = {} 42 params = {"category": category, **params} 43 return self._request( 44 "export", 45 method=method, 46 params=params, 47 verify=verify, 48 parse=False, 49 stream=stream, 50 timeout=timeout, 51 ).content 52 53 def export_configuration( 54 self, 55 verify=None, 56 timeout=None, 57 ) -> Element: 58 return self._export_request( 59 category="configuration", 60 verify=verify, 61 timeout=timeout, 62 ) 63 64 def export_device_state( 65 self, 66 verify=None, 67 timeout=None, 68 ) -> Element: 69 return self._export_request( 70 category="device-state", 71 verify=verify, 72 timeout=timeout, 73 ) 74 75 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/get-version-info-api 76 def api_version(self): 77 return el2dict( 78 self._request( 79 "version", 80 method="POST", 81 ).xpath(".//result")[0] 82 )["result"] 83 84 def _check_is_panorama(self) -> bool: 85 return self.configuration.check_is_panorama() 86 87 @property 88 def ispanorama(self): 89 if self._ispanorama is None: 90 self._ispanorama = self._check_is_panorama() 91 return self._ispanorama
25class XMLApi: 26 def __init__( 27 self, 28 host=None, 29 api_key=None, 30 ispanorama=None, 31 target=None, 32 verify=False, 33 timeout=None, 34 logger=None, 35 ): 36 env_host, env_apikey = get_credentials_from_env() 37 host = host or env_host 38 api_key = api_key or env_apikey 39 if not host: 40 raise Exception("Missing Host") 41 if not api_key: 42 raise Exception("Missing API Key") 43 host, _, _ = clean_url_host(host) 44 45 default_params = {} 46 if target: 47 default_params["target"] = target 48 49 self._host = host 50 self._api_key = api_key 51 self._url = f"{host}/api" 52 self._verify = verify 53 self._timeout = timeout 54 self._ispanorama = ispanorama 55 self._default_params = default_params 56 self.logger = logger or logging 57 58 def _request( 59 self, 60 type, 61 method="GET", 62 vsys=None, 63 params=None, 64 remove_blank_text=True, 65 verify=None, 66 parse=True, 67 stream=None, 68 timeout=None, 69 ): 70 if verify is None: 71 verify = self._verify 72 if timeout is None: 73 timeout = self._timeout 74 headers = {"X-PAN-KEY": self._api_key} 75 params = {**self._default_params, **(params or {})} 76 return raw_request( 77 self._url, 78 type, 79 method, 80 vsys=vsys, 81 params=params, 82 headers=headers, 83 remove_blank_text=remove_blank_text, 84 verify=verify, 85 logger=self.logger, 86 parse=parse, 87 stream=stream, 88 timeout=timeout, 89 ) 90 91 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/export-files-api 92 # https://knowledgebase.paloaltonetworks.com/KCSArticleDetail?id=kA10g000000ClaOCAS#:~:text=From%20the%20GUI%2C%20go%20to%20Device%20%3E%20Setup,%3E%20scp%20export%20configuration%20%5Btab%20for%20command%20help%5D 93 def _export_request( 94 self, 95 category, 96 method="GET", 97 params=None, 98 verify=None, 99 stream=None, 100 timeout=None, 101 ): 102 if params is None: 103 params = {} 104 params = {"category": category, **params} 105 return self._request( 106 "export", 107 method=method, 108 params=params, 109 verify=verify, 110 parse=False, 111 stream=stream, 112 timeout=timeout, 113 ).content 114 115 def export_configuration( 116 self, 117 verify=None, 118 timeout=None, 119 ) -> Element: 120 return self._export_request( 121 category="configuration", 122 verify=verify, 123 timeout=timeout, 124 ) 125 126 def export_device_state( 127 self, 128 verify=None, 129 timeout=None, 130 ) -> Element: 131 return self._export_request( 132 category="device-state", 133 verify=verify, 134 timeout=timeout, 135 ) 136 137 def _conf_request( 138 self, 139 xpath, 140 action="get", 141 method="GET", 142 vsys=None, 143 params=None, 144 remove_blank_text=True, 145 verify=None, 146 timeout=None, 147 ) -> Element: 148 if params is None: 149 params = {} 150 params = {"action": action, "xpath": xpath, **params} 151 return self._request( 152 "config", 153 method=method, 154 vsys=vsys, 155 params=params, 156 remove_blank_text=remove_blank_text, 157 verify=verify, 158 timeout=timeout, 159 ) 160 161 def _op_request( 162 self, 163 cmd, 164 method="POST", 165 vsys=None, 166 params=None, 167 remove_blank_text=True, 168 verify=None, 169 timeout=None, 170 ) -> Element: 171 if params is None: 172 params = {} 173 params = {"cmd": cmd, **params} 174 return self._request( 175 "op", 176 method=method, 177 vsys=vsys, 178 params=params, 179 remove_blank_text=remove_blank_text, 180 verify=verify, 181 timeout=timeout, 182 ) 183 184 def _commit_request( 185 self, 186 cmd, 187 method="POST", 188 params=None, 189 remove_blank_text=True, 190 verify=None, 191 timeout=None, 192 ): 193 if params is None: 194 params = {} 195 params = {"cmd": cmd, **params} 196 return self._request( 197 "commit", 198 method=method, 199 params=params, 200 remove_blank_text=remove_blank_text, 201 verify=verify, 202 timeout=timeout, 203 ) 204 205 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/get-started-with-the-pan-os-xml-api/get-your-api-key 206 def generate_apikey(self, username, password: str) -> str: 207 """ 208 Generate a new API-Key for the user connected. 209 """ 210 params = {"user": username, "password": password} 211 return self._request( 212 "keygen", 213 method="POST", 214 params=params, 215 ).xpath(".//key/text()")[0] 216 217 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/get-version-info-api 218 def api_version(self): 219 return el2dict( 220 self._request( 221 "version", 222 method="POST", 223 ).xpath(".//result")[0] 224 )["result"] 225 226 def configuration( 227 self, 228 xpath, 229 action="get", 230 method="GET", 231 params=None, 232 remove_blank_text=True, 233 ): 234 return self._conf_request( 235 xpath, 236 action=action, 237 method=method, 238 params=params, 239 remove_blank_text=remove_blank_text, 240 ) 241 242 def operation( 243 self, 244 cmd, 245 method="POST", 246 params=None, 247 remove_blank_text=True, 248 ): 249 return self._op_request( 250 cmd, 251 method=method, 252 params=params, 253 remove_blank_text=remove_blank_text, 254 ) 255 256 def _check_is_panorama(self) -> bool: 257 try: 258 self.configuration("/config/panorama/vsys") 259 return False 260 except Exception: 261 return True 262 263 @property 264 def ispanorama(self): 265 if self._ispanorama is None: 266 self._ispanorama = self._check_is_panorama() 267 return self._ispanorama 268 269 def get_tree(self, extended=False) -> Element: 270 """ 271 Return the running configuration 272 The differences with `running_config` are not known 273 """ 274 tree = get_tree( 275 self._host, self._api_key, verify=self._verify, logger=self.logger 276 ) 277 if extended: 278 self._extend_tree_information(tree) 279 return tree 280 281 def _get_rule_use(self, device_group, position, rule_type, number: int = 200): 282 results = [] 283 for i in range(100): 284 cmd = _get_rule_use_cmd( 285 device_group, 286 position, 287 rule_type, 288 i * number, 289 number, 290 ) 291 res = self._op_request(cmd).xpath("result")[0] 292 total_count = int(res.attrib["total-count"]) 293 results.extend(res.xpath("entry")) 294 if len(results) >= total_count: 295 break 296 return results 297 298 def get_rule_use(self, tree=None, max_threads: Optional[int] = None): 299 if tree is None: 300 tree = self.get_tree() 301 device_groups = tree.xpath("devices/*/device-group/*/@name") 302 positions = ("pre", "post") 303 # rule_types = tuple({x.tag for x in tree.xpath( 304 # "devices/*/device-group/*" 305 # "/*[self::post-rulebase or self::pre-rulebase]/*")}) 306 rule_types = ("security", "pbf", "nat", "application-override") 307 args_list = list(product(device_groups, positions, rule_types)) 308 309 def func(args): 310 return self._get_rule_use(*args) 311 312 threads = len(args_list) 313 threads = min(max_threads or threads, threads) 314 with Pool(len(args_list)) as pool: 315 data = pool.map(func, args_list) 316 return [entry for entry_list in data for entry in entry_list] 317 318 def _get_rule_hit_count(self, device_group, rulebase, rule_type): 319 cmd = ( 320 "<show><rule-hit-count><device-group>" 321 f"<entry name='{device_group}'><{rulebase}><entry name='{rule_type}'>" 322 f"<rules><all/></rules></entry></{rulebase}></entry>" 323 "</device-group></rule-hit-count></show>" 324 ) 325 res = self._op_request(cmd) 326 entries = res.xpath(".//rules/entry") or [] 327 # return entries 328 return [(device_group, rulebase, rule_type, e) for e in entries] 329 330 def get_rule_hit_count(self, tree=None, max_threads=None): 331 if tree is None: 332 tree = self.get_tree() 333 device_groups = tree.xpath("devices/*/device-group/*/@name") 334 rulebases = ("pre-rulebase", "post-rulebase") 335 rule_types = ("security", "pbf", "nat", "application-override") 336 args_list = list(product(device_groups, rulebases, rule_types)) 337 338 def func(args): 339 return self._get_rule_hit_count(*args) 340 341 threads = len(args_list) 342 threads = min(max_threads or threads, threads) 343 with Pool(len(args_list)) as pool: 344 data = pool.map(func, args_list) 345 return [entry for entry_list in data for entry in entry_list] 346 347 def _extend_tree_information( 348 self, 349 tree, 350 extended=None, 351 max_threads=None, 352 ): 353 """ 354 Incorporate usage statistics into the configuration. 355 tree: the configuration as a XML object 356 extended: rule-use data (if not provided, the function will retrieve them automatically) 357 """ 358 if extended is None: 359 extended = self.get_rule_use(tree, max_threads=max_threads) 360 rules = tree.xpath( 361 ".//device-group/entry/" 362 "*[self::pre-rulebase or self::post-rulebase]/*/rules/entry[@uuid]", 363 ) 364 ext_dict = {x.attrib.get("uuid"): x for x in extended} 365 rules_dict = {x.attrib["uuid"]: x for x in rules} 366 for ext, rule in map_dicts(ext_dict, rules_dict): 367 extend_element(rule, ext) 368 # NOTE: Do not use rule.extend(ext) 369 # => This is causing duplicates entries 370 return tree, extended 371 372 def get(self, xpath: str): 373 """ 374 This will retrieve the xml definition based on the xpath 375 The xpath doesn't need to be exact 376 and can select multiple values at once. 377 Still, it must at least speciy /config at is begining 378 """ 379 return self._conf_request(xpath, action="show", method="GET") 380 381 def delete(self, xpath: str): 382 """ 383 This will REMOVE the xml definition at the provided xpath. 384 The xpath must be exact. 385 """ 386 return self._conf_request( 387 xpath, 388 action="delete", 389 method="DELETE", 390 ) 391 392 def create(self, xpath: str, xml_definition): 393 """ 394 This will ADD the xml definition 395 INSIDE the element at the provided xpath. 396 The xpath must be exact. 397 """ 398 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration 399 params = {"element": xml_definition} 400 return self._conf_request( 401 xpath, 402 action="set", 403 method="POST", 404 params=params, 405 ) 406 407 def update(self, xpath: str, xml_definition): 408 """ 409 This will REPLACE the xml definition 410 INSTEAD of the element at the provided xpath 411 The xpath must be exact. 412 Nb: We can pull the whole config, update it locally, 413 and push the final result 414 """ 415 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration 416 params = {"element": xml_definition} 417 return self._conf_request( 418 xpath, 419 action="edit", 420 method="POST", 421 params=params, 422 ) 423 424 def revert_changes(self, skip_validated: bool = False): 425 """ 426 Revert all the changes made on Panorama. 427 NOTE: 428 - This only applies on non-commited changes. 429 - This revert everything (not scoped by users) 430 431 skip_validated: Do not revert changes that were validated 432 """ 433 skip = "<skip-validate>yes</skip-validate>" if skip_validated else "" 434 cmd = f"<revert><config>{skip}</config></revert>" 435 return self._op_request(cmd) 436 437 def validate_changes(self): 438 """ 439 Validated all the changes currently made 440 """ 441 cmd = "<validate><full></full></validate>" 442 return self._op_request(cmd) 443 444 def _raw_get_push_scope(self, admin=None): 445 """ 446 Gives detailed information about pending changes 447 (e.g. xpath, owner, action, ...) 448 """ 449 filter = f"<admin><member>{admin}</member></admin>" if admin else "" 450 cmd = f"<show><config><push-scope>{filter}</push-scope></config></show>" 451 return self._op_request(cmd) 452 453 def get_push_scope_devicegroups(self, admin=None): 454 """ 455 Gives detailed information about pending changes 456 (e.g. xpath, owner, action, ...) 457 """ 458 scope = self._raw_get_push_scope(admin=admin) 459 return list(set(scope.xpath(".//objects/entry[@loc-type='device-group']/@loc"))) 460 461 def uncommited_changes(self): 462 """ 463 Gives detailed information about pending changes 464 (e.g. xpath, owner, action, ...) 465 """ 466 cmd = "<show><config><list><changes></changes></list></config></show>" 467 return self._op_request(cmd) 468 469 def uncommited_changes_summary(self, admin=None): 470 """ 471 Only gives the concern device groups 472 """ 473 admin = ( 474 f"<partial><admin><member>{admin}</member></admin></partial>" 475 if admin 476 else "" 477 ) 478 cmd = f"<show><config><list><change-summary>{admin}</change-summary></list></config></show>" 479 return self._op_request(cmd) 480 481 def pending_changes(self): 482 """ 483 Result content is either 'yes' or 'no' 484 """ 485 cmd = "<check><pending-changes></pending-changes></check>" 486 return self._op_request(cmd) 487 488 def save_config(self, name): 489 """ 490 Create a named snapshot of the current configuration 491 """ 492 cmd = f"<save><config><to>{name}</to></config></save>" 493 return "\n".join(self._op_request(cmd).xpath(".//result/text()")) 494 495 def save_device_state(self): 496 """ 497 Create a snapshot of the current device state 498 """ 499 cmd = "<save><device-state></device-state></save>" 500 return "\n".join(self._op_request(cmd).xpath(".//result/text()")) 501 502 def get_named_configuration(self, name): 503 """ 504 Get the configuration from a named snapshot as an XML object 505 """ 506 cmd = f"<show><config><saved>{name}</saved></config></show>" 507 return self._op_request(cmd, remove_blank_text=False).xpath("./result/config")[ 508 0 509 ] 510 511 def candidate_config(self) -> Element: 512 """ 513 Get the configuration to be commited as an XML object 514 """ 515 cmd = "<show><config><candidate></candidate></config></show>" 516 return self._op_request(cmd, remove_blank_text=False) 517 518 def running_config(self) -> Element: 519 """ 520 Get the current running configuration as an XML object 521 """ 522 cmd = "<show><config><running></running></config></show>" 523 return self._op_request(cmd, remove_blank_text=False) 524 525 def _raw_get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> Element: 526 """ 527 Get information of job(s) as an XML object. 528 Retrieve all jobs by default. 529 530 If job_id is provided, then only retrieve the job requested. 531 """ 532 filter = "<all></all>" 533 if job_ids: 534 if isinstance(job_ids, str): 535 job_ids = [job_ids] 536 filter = "".join(f"<id>{j}</id>" for j in job_ids) 537 cmd = f"<show><jobs>{filter}</jobs></show>" 538 return self._op_request(cmd) 539 540 def get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> List[types.Job]: 541 """ 542 Get information of job(s) 543 Retrieve all jobs by default. 544 545 If job_id is provided, then only retrieve the job requested. 546 """ 547 job_xmls = self._raw_get_jobs(job_ids).xpath(".//job") 548 transformed = (types.Job.from_xml(x) for x in job_xmls) 549 return [j for j in transformed if j] 550 551 def get_job(self, job_id) -> types.Job: 552 """ 553 Get information of job(s) 554 Retrieve all jobs by default. 555 556 If job_id is provided, then only retrieve the job requested. 557 """ 558 return self.get_jobs(job_id)[0] 559 560 def _raw_get_versions(self) -> Element: 561 """ 562 Get the versions informations as a XML object. 563 """ 564 cmd = "<request><system><software><check></check></software></system></request>" 565 return self.operation(cmd) 566 567 def get_versions(self) -> List[types.SoftwareVersion]: 568 """ 569 Get the versions informations 570 """ 571 res = self._raw_get_versions() 572 return [ 573 types.SoftwareVersion.from_xml(entry) 574 for entry in res.xpath(".//sw-updates/versions/entry") 575 ] 576 577 def wait_job_completion(self, job_id: str, waiter=None) -> types.Job: 578 """ 579 Block until the job complete. 580 581 job_id: the job to wait upon 582 waiter: a generator that yield when a new query must be done. 583 see `wait` function (the default waiter) for an example 584 """ 585 if not waiter: 586 waiter = wait() 587 for _ in waiter: 588 job = self.get_job(job_id) 589 if job.progress >= 100: 590 return job 591 self.logger.info(f"Job {job_id} progress: {job.progress}") 592 raise Exception("Timeout while waiting for job completion") 593 594 def raw_get_pending_jobs(self): 595 """ 596 Get all the jobs that are pending as a XML object 597 """ 598 cmd = "<show><jobs><pending></pending></jobs></show>" 599 return self._op_request(cmd) 600 601 def commit_changes(self, force: bool = False): 602 """ 603 Commit all changes 604 """ 605 cmd = "<commit>{}</commit>".format("<force></force>" if force else "") 606 return self._commit_request(cmd) 607 608 def _lock_cmd(self, cmd, vsys, no_exception=False) -> bool: 609 """ 610 Utility function for commands that tries to manipulate the lock 611 on Panorama. 612 """ 613 try: 614 result = "".join(self._op_request(cmd, vsys=vsys).itertext()) 615 self.logger.debug(result) 616 except Exception as e: 617 if no_exception: 618 self.logger.error(e) 619 return False 620 raise 621 return True 622 623 # https://github.com/PaloAltoNetworks/pan-os-python/blob/a6b018e3864ff313fed36c3804394e2c92ca87b3/panos/base.py#L4459 624 def add_config_lock(self, comment=None, vsys="shared", no_exception=False) -> bool: 625 comment = f"<comment>{comment}</comment>" if comment else "" 626 cmd = f"<request><config-lock><add>{comment}</add></config-lock></request>" 627 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception) 628 629 def remove_config_lock(self, vsys="shared", no_exception=False) -> bool: 630 cmd = "<request><config-lock><remove></remove></config-lock></request>" 631 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception) 632 633 def add_commit_lock(self, comment=None, vsys="shared", no_exception=False) -> bool: 634 comment = f"<comment>{comment}</comment>" if comment else "" 635 cmd = f"<request><commit-lock><add>{comment}</add></commit-lock></request>" 636 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception) 637 638 def remove_commit_lock(self, vsys="shared", no_exception=False) -> bool: 639 cmd = "<request><commit-lock><remove></remove></commit-lock></request>" 640 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception) 641 642 def set_ha_status(self, active: bool = True, target: Optional[str] = None): 643 """ 644 Activate or Deactivate (suspend) the HA pair. 645 646 """ 647 status = "<functional></functional>" if active else "<suspend></suspend>" 648 cmd = f"<request><high-availability><state>{status}</state></high-availability></request>" 649 params = {"target": target} if target else None 650 return self._op_request(cmd, params=params).xpath(".//result/text()")[0] 651 652 def set_ha_preemption(self, active=True, target=None): 653 """ 654 NOT WORKING: 655 There is currently no way to deactivate the preemption using the API. 656 """ 657 raise Exception("set_ha_preemption not implementend") 658 659 def _raw_get_ha_info(self, state_only=False, target=None) -> Element: 660 """ 661 Get the current state of a HA pair as a XML object. 662 """ 663 filter = "<state></state>" if state_only else "<all></all>" 664 cmd = f"<show><high-availability>{filter}</high-availability></show>" 665 params = {"target": target} if target else None 666 return self._op_request(cmd, params=params) 667 668 def get_ha_info(self, state_only=False, target=None) -> types.HAInfo: 669 """ 670 Get the current state of a HA pair as a python object. 671 """ 672 res = self._raw_get_ha_info(state_only=state_only, target=target) 673 hainfo_xml = res.xpath(".//result")[0] 674 # pprint(hainfo_xml) 675 return types.HAInfo.from_xml(hainfo_xml) 676 677 def get_ha_pairs( 678 self, connected=True 679 ) -> Tuple[List[Tuple[types.Device, Optional[types.Device]]], List[types.Device]]: 680 """ 681 Retrieve a tuple containing 2 values: 682 1. The list of HA pairs and their members 683 2. A list of devices that are not part of a HA pair 684 """ 685 # Get all devices and index them using their serial number 686 devices: List[types.Device] = self.get_devices(connected=connected) 687 device_map = {d.serial: d for d in devices} 688 689 # Create the 2 lists by iterating over the devices 690 done = set() 691 ha_pairs = [] 692 without_ha = [] 693 for d in devices: 694 # Do not manage twice the same device 695 if d.serial in done: 696 continue 697 # The device does not have an HA peer 698 if not d.ha_peer_serial: 699 without_ha.append(d) 700 done.add(d.serial) 701 continue 702 # Get the current device's HA peer 703 # push them in the ha_pairs list 704 # and mark both of them as done 705 peer = device_map.get(d.ha_peer_serial) 706 ha_pairs.append((d, peer)) 707 done.update((d.serial, d.ha_peer_serial)) 708 return ha_pairs, without_ha 709 710 def get_ha_pairs_map(self, connected=True): 711 """ 712 Same as `get_ha_pairs`, but the ha_pairs are return as a map. 713 This provides an easier and more readable lookup to find a pair: 714 715 mapping, _ = client.get_ha_pairs_map() 716 serial = "12345" 717 pair_of_serial = mapping[serial] 718 """ 719 ha_pairs, without_ha = self.get_ha_pairs(connected=connected) 720 map = {} 721 for pair in ha_pairs: 722 for device in pair: 723 map[device.serial] = pair 724 return map, without_ha 725 726 def get_panorama_status(self): 727 """ 728 Get the current status of Panorama server. 729 """ 730 cmd = "<show><panorama-status></panorama-status></show>" 731 return self.operation(cmd).xpath(".//result") 732 733 def raw_get_local_panorama(self): 734 return self.configuration( 735 "/config/devices/entry/deviceconfig/system/panorama/local-panorama/panorama-server" 736 ) 737 738 def get_local_panorama_ip(self) -> Optional[str]: 739 res = self.raw_get_local_panorama() 740 return first(res.xpath("//panorama-server/text()")) 741 742 def _raw_get_devices(self, connected=False): 743 """ 744 Return the list of device known from Panorama as a XML object. 745 NOTE: This only works if the client is a Panorama server. 746 747 connected: only returns the devices that are connected 748 """ 749 # This only works on Panorama, not the FW 750 filter = "<connected></connected>" if connected else "<all></all>" 751 cmd = f"<show><devices>{filter}</devices></show>" 752 return self.operation(cmd) 753 754 def get_devices(self, connected=False) -> List[types.Device]: 755 """ 756 Return the list of device known from Panorama as a python structure. 757 NOTE: This only works if the client is a Panorama server. 758 759 connected: only returns the devices that are connected 760 """ 761 res = self._raw_get_devices(connected=connected) 762 entries = res.xpath(".//devices/entry") 763 devices = (types.Device.from_xml(e) for e in entries) 764 return [d for d in devices if d] 765 766 def _raw_get_dg_hierarchy(self): 767 """ 768 Return the hierarchy of device groups as a XML object. 769 """ 770 cmd = "<show><dg-hierarchy></dg-hierarchy></show>" 771 return self.operation(cmd) 772 773 def get_plan_dg_hierarchy(self, recursive=False): 774 """ 775 Return the hierarchy of device groups as a dict. 776 The keys are the names of the device groups. 777 778 The values are the children device groups and depends on the recursive parameter. 779 recursive: if False, the values are only the direct children of the device group. 780 Otherwise, the values are all the descendant device groups. 781 """ 782 devicegroups = {} # name: children 783 hierarchy = self._raw_get_dg_hierarchy().xpath(".//dg-hierarchy")[0] 784 xpath = ".//dg" if recursive else "./dg" 785 for dg in hierarchy.xpath(".//dg"): 786 devicegroups[dg.attrib["name"]] = [ 787 x.attrib["name"] for x in dg.xpath(xpath) 788 ] 789 return devicegroups 790 791 def _raw_get_devicegroups(self): 792 """ 793 Return the list of device groups as a XML object. 794 """ 795 cmd = "<show><devicegroups></devicegroups></show>" 796 return self.operation(cmd) 797 798 def get_devicegroups_name( 799 self, 800 parents=None, 801 with_connected_devices=None, 802 ): 803 """ 804 This returns the names of the devicegroups: 805 - parents: the returned list will only contain children of the provided parents (parents included) 806 - with_devices: the returned list will only contain devicegroups that have direct devices under them 807 """ 808 devicegroups = self._raw_get_devicegroups().xpath(".//devicegroups/entry") 809 if with_connected_devices: 810 names = [ 811 dg.attrib["name"] 812 for dg in devicegroups 813 if dg.xpath("./devices/entry/connected[text() = 'yes']") 814 ] 815 else: 816 names = [dg.attrib["name"] for dg in devicegroups] 817 if parents: 818 hierarchy = self.get_plan_dg_hierarchy(recursive=True) 819 tokeep = set(chain(*(hierarchy.get(p, []) for p in parents))) | set(parents) 820 names = list(set(names) & tokeep) 821 return names 822 823 def _raw_get_addresses(self): 824 """ 825 Return the list of addresses known from Panorama as a XML object. 826 NOTE: This only works if the client is a Firewall. 827 """ 828 if self.ispanorama: 829 return self.configuration( 830 "/config/devices/entry/device-group/entry/address" 831 ) 832 return self.configuration("/config/panorama/vsys//address") 833 834 def get_addresses(self) -> List[types.Address]: 835 """ 836 Return the list of addresses known from Panorama as a python structure. 837 NOTE: This only works if the client is a Firewall. 838 """ 839 res = self._raw_get_addresses() 840 addresses = res.xpath(".//address/entry") 841 return [types.Address.from_xml(i) for i in addresses] 842 843 def _raw_get_routing_tables(self) -> Element: 844 """ 845 Return the list of interfaces known from Panorama as a XML object. 846 NOTE: This only works if the client is a Firewall. 847 """ 848 return self.configuration( 849 "/config/devices/entry/network/virtual-router/entry/routing-table" 850 ) 851 852 def get_routing_tables(self) -> List[types.RoutingTable]: 853 """ 854 Return the list of interface known from Panorama as a python structure. 855 NOTE: This only works if the client is a Firewall. 856 """ 857 res = self._raw_get_routing_tables() 858 routing_tables = res.xpath(".//routing-table") 859 # print(len(routing_tables)) 860 # from pa_api.xmlapi.utils import pprint 861 # for r in routing_tables: 862 # pprint(r) 863 return [types.RoutingTable.from_xml(i) for i in routing_tables] 864 865 def _raw_get_interfaces(self) -> Element: 866 """ 867 Return the list of interfaces known from Panorama as a XML object. 868 NOTE: This only works if the client is a Firewall. 869 """ 870 return self.configuration("/config/devices/entry/network/interface") 871 872 def get_interfaces(self) -> List[types.Interface]: 873 """ 874 Return the list of interface known from Panorama as a python structure. 875 NOTE: This only works if the client is a Firewall. 876 """ 877 res = self._raw_get_interfaces() 878 interfaces = res.xpath(".//interface") 879 return [types.Interface.from_xml(i) for i in interfaces] 880 881 def _raw_get_zones(self) -> Element: 882 """ 883 Return the list of zones known from Panorama as a XML object. 884 NOTE: This only works if the client is a Firewall. 885 """ 886 if self.ispanorama: 887 return self.configuration( 888 "/config/devices/entry/*/entry/config/devices/entry/vsys/entry/zone" 889 ) 890 return self.configuration("/config/devices/entry/vsys/entry/zone") 891 892 def get_zones(self) -> Element: 893 """ 894 Return the list of zones known from Panorama as a python structure. 895 NOTE: This only works if the client is a Firewall. 896 """ 897 res = self._raw_get_zones() 898 zones = res.xpath(".//zone/entry") 899 return [types.Zone.from_xml(i) for i in zones] 900 901 def _raw_get_templates(self, name=None) -> Element: 902 """ 903 Return the synchronization status the templates per devices as a XML object. 904 A device is in sync if it is up-to-date with the current version on Panorama. 905 NOTE: This only works on Panorama. 906 """ 907 # This only works on Panorama, not the FW 908 filter = f"<name>{name}</name>" if name else "" 909 cmd = f"<show><templates>{filter}</templates></show>" 910 return self.operation(cmd) 911 912 def get_templates_sync_status(self): 913 """ 914 Return the synchronization status the templates per devices 915 A device is in sync if it is up-to-date with the current version on Panorama. 916 NOTE: This only works on Panorama. 917 918 The result is a list of tuple of 3 values: 919 1. the template's name 920 2. the device's name 921 3. the sync status 922 """ 923 res = self._raw_get_templates() 924 statuses = [] 925 for entry in res.xpath("./result/templates/entry"): 926 template_name = entry.attrib["name"] 927 for device in entry.xpath("./devices/entry"): 928 device_name = device.attrib["name"] 929 template_status = next(device.xpath("./template-status/text()"), None) 930 status = (template_name, device_name, template_status) 931 statuses.append(status) 932 return statuses 933 934 def _raw_get_vpn_flows(self, name=None) -> List[Element]: 935 """ 936 Returns the VPN flow information as a XML object. 937 NOTE: This only works on Panorama server, not firewalls 938 """ 939 # This only works on Panorama, not the FW" 940 filter = f"<name>{name}</name>" if name else "<all></all>" 941 cmd = f"<show><vpn><flow>{filter}</flow></vpn></show>" 942 return self.operation(cmd) 943 944 def get_vpn_flows(self, name=None): 945 """ 946 Returns the VPN flow information as a python structure. 947 NOTE: This only works on Panorama server, not firewalls 948 """ 949 entries = self._raw_get_vpn_flows(name=name).xpath(".//IPSec/entry") 950 return [types.VPNFlow.from_xml(e) for e in entries] 951 952 def _raw_system_info(self): 953 """ 954 Returns informations about the system as a XML object. 955 """ 956 cmd = "<show><system><info></info></system></show>" 957 return self.operation(cmd) 958 959 def system_info(self) -> types.operations.SystemInfo: 960 """ 961 Returns informations about the system as a XML object. 962 """ 963 xml = self._raw_system_info() 964 info = xml.xpath("./result/system")[0] 965 return types.operations.SystemInfo.from_xml(info) 966 967 def _raw_system_resources(self): 968 """ 969 Get the system resouces as a XML object. 970 NOTE: The string is the raw output of a `ps` command. 971 """ 972 cmd = "<show><system><resources></resources></system></show>" 973 return self.operation(cmd) 974 975 def system_resources(self): 976 """ 977 Get the system resouces as a string. 978 The string is the raw output of a `ps` command. 979 """ 980 res = self._raw_system_resources() 981 text = res.xpath(".//result/text()")[0] 982 return text.split("\n\n")[0] 983 984 def _raw_download_software(self, version): 985 """ 986 Download the software version on the device. 987 version: the software version to download 988 """ 989 cmd = f"<request><system><software><download><version>{version}</version></download></software></system></request>" 990 return self.operation(cmd) 991 992 def download_software(self, version) -> Optional[str]: 993 """ 994 Download the software version on the device. 995 version: the software version to download 996 997 Returns the download job's ID in case of successful launch, 998 None is returned otherwise. 999 """ 1000 res = self._raw_download_software(version) 1001 try: 1002 return res.xpath(".//job/text()")[0] 1003 except Exception: 1004 self.logger.debug("Download has not started") 1005 return None 1006 1007 def _raw_install_software(self, version): 1008 """ 1009 Install the software version on the device. 1010 version: the software version to install 1011 """ 1012 cmd = f"<request><system><software><install><version>{version}</version></install></software></system></request>" 1013 return self.operation(cmd) 1014 1015 def install_software( 1016 self, version: Union[None, str, types.SoftwareVersion] 1017 ) -> Optional[str]: 1018 """ 1019 Install the software version on the device. 1020 version: the software version to install 1021 1022 Returns the download job's ID in case of successful launch, 1023 None is returned otherwise. 1024 """ 1025 if isinstance(version, types.SoftwareVersion): 1026 version = version.version 1027 res = self._raw_install_software(version) 1028 try: 1029 return res.xpath(".//job/text()")[0] 1030 except Exception: 1031 self.logger.debug("Download has not started") 1032 return None 1033 1034 def _raw_restart(self): 1035 """ 1036 Restart the device 1037 """ 1038 cmd = "<request><restart><system></system></restart></request>" 1039 return self.operation(cmd) 1040 1041 def restart(self): 1042 """ 1043 Restart the device 1044 """ 1045 return "".join(self._raw_restart().xpath(".//result/text()")) 1046 1047 def automatic_download_software( 1048 self, version: Optional[str] = None 1049 ) -> types.SoftwareVersion: 1050 """ 1051 Automatically download the requested software version. 1052 if the version is not provided, it defaults to the latest one. 1053 1054 NOTE: This does not do the installation. 1055 This is usefull to download in anticipation of the upgrade. 1056 For automatic install, see `automatic_software_upgrade` 1057 """ 1058 version_str = version 1059 try: 1060 versions = self.get_versions() 1061 except ServerError: 1062 raise Exception( 1063 "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers." 1064 ) 1065 sw_version = None 1066 if not version_str: 1067 sw_version = next((v for v in versions if v.latest), None) 1068 else: 1069 sw_version = next((v for v in versions if v.version == version_str), None) 1070 if not sw_version: 1071 self.logger.error(f"Version {version_str} not found") 1072 return exit(1) 1073 1074 # Already downloaded: Nothing to do 1075 if sw_version.downloaded: 1076 self.logger.info(f"Version {sw_version.version} already downloaded") 1077 return sw_version 1078 1079 # Download minor version first (required) 1080 base_version = next( 1081 v for v in versions if v.version == sw_version.base_minor_version 1082 ) 1083 if not base_version.downloaded: 1084 self.logger.info( 1085 f"Launching download of minor version {base_version.version}" 1086 ) 1087 job_id = self.download_software(base_version.version) 1088 if not job_id: 1089 raise Exception("Download has not started") 1090 job = self.wait_job_completion(job_id) 1091 if job.result != "OK": 1092 self.logger.debug(job) 1093 raise Exception(job.details) 1094 print(job.details) 1095 1096 # Actually download the wanted version 1097 self.logger.info(f"Launching download of version {sw_version.version}") 1098 job_id = self.download_software(sw_version.version) 1099 if not job_id: 1100 raise Exception("Download has not started") 1101 job = self.wait_job_completion(job_id) 1102 if job.result != "OK": 1103 self.logger.debug(job) 1104 raise Exception(job.details) 1105 self.logger.info(job.details) 1106 return sw_version 1107 1108 def automatic_software_upgrade( 1109 self, 1110 version: Optional[str] = None, 1111 install: bool = True, 1112 restart: bool = True, 1113 suspend: bool = False, 1114 ): 1115 """ 1116 Automatically download and install the requested software version. 1117 if the version is not provided, it defaults to the latest one. 1118 1119 NOTE: This does the software install and restart by default. 1120 If you only want to download, prefer to use `automatic_download_software` method, 1121 or set install=False. See the parameters for more information. 1122 1123 install: install the software after the download 1124 restart: restart the device after the installation. This option is ignored if install=False 1125 1126 """ 1127 sw_version = self.automatic_download_software(version) 1128 if sw_version.current: 1129 self.logger.info(f"Version {sw_version.version} is already installed") 1130 return sw_version 1131 if not install: 1132 return sw_version 1133 # We may get the following error: 1134 # "Error: Upgrading from 10.2.4-h10 to 11.1.2 requires a content version of 8761 or greater and found 8638-7689." 1135 # This should never happen, we decided to report the error and handle this manually 1136 self.logger.info(f"Launching install of version {sw_version.version}") 1137 1138 with self.suspended(suspend): 1139 job_id = self.install_software(sw_version.version) 1140 if not job_id: 1141 self.logger.error("Install has not started") 1142 raise Exception("Install has not started") 1143 job = self.wait_job_completion(job_id) 1144 self.logger.info(job.details) 1145 1146 # Do not restart if install failed 1147 if job.result != "OK": 1148 self.logger.error("Failed to install software version") 1149 return sw_version 1150 1151 if restart: 1152 self.logger.info("Restarting the device") 1153 restart_response = self.restart() 1154 self.logger.info(restart_response) 1155 return sw_version 1156 1157 @contextmanager 1158 def suspended(self, suspend=True): 1159 if not suspend: 1160 try: 1161 yield self 1162 finally: 1163 return 1164 self.logger.info("Suspending device") 1165 self.set_ha_status(active=False) 1166 try: 1167 yield self 1168 finally: 1169 self.check_availability() 1170 self.logger.info("Unsuspending device...") 1171 for _ in range(3): 1172 try: 1173 self.set_ha_status(active=True) 1174 break 1175 except Exception as e: 1176 self.logger.error(e) 1177 # time.sleep() 1178 else: 1179 raise UnsuspendError("Failed to unsuspend device") 1180 self.logger.info("Device successfully unsuspended") 1181 1182 def wait_availability(self): 1183 logger = self.logger 1184 logger.info("Checking availability. Waiting for response...") 1185 max_duration = 1800 1186 for duration in wait(duration=max_duration): 1187 try: 1188 versions = self.get_versions() 1189 current = next(v for v in versions if v.current) 1190 if current is None: 1191 logger.warning("Device is not not answering") 1192 else: 1193 logger.info(f"Device responded after {duration} seconds") 1194 return current 1195 except ( 1196 urllib3.exceptions.MaxRetryError, 1197 requests.exceptions.ConnectionError, 1198 ): 1199 logger.warning("Firewall still not responding") 1200 except ServerError: 1201 raise Exception( 1202 "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers." 1203 ) 1204 except Exception as e: 1205 logger.debug(f"Unexpected error of type {type(e)} occured on firewall") 1206 logger.error(f"Firewall is still not responding: {e}") 1207 raise Exception( 1208 f"Timeout while waiting for availability of firewall. Waited for {max_duration} seconds" 1209 ) 1210 1211 def check_availability(self): 1212 logger = self.logger 1213 ## Wait for the FW to respond 1214 version = self.wait_availability() 1215 if not version: 1216 logger.error("Device never responded") 1217 return False 1218 logger.info(f"Firewall is available on version {version.version}") 1219 return True
26 def __init__( 27 self, 28 host=None, 29 api_key=None, 30 ispanorama=None, 31 target=None, 32 verify=False, 33 timeout=None, 34 logger=None, 35 ): 36 env_host, env_apikey = get_credentials_from_env() 37 host = host or env_host 38 api_key = api_key or env_apikey 39 if not host: 40 raise Exception("Missing Host") 41 if not api_key: 42 raise Exception("Missing API Key") 43 host, _, _ = clean_url_host(host) 44 45 default_params = {} 46 if target: 47 default_params["target"] = target 48 49 self._host = host 50 self._api_key = api_key 51 self._url = f"{host}/api" 52 self._verify = verify 53 self._timeout = timeout 54 self._ispanorama = ispanorama 55 self._default_params = default_params 56 self.logger = logger or logging
206 def generate_apikey(self, username, password: str) -> str: 207 """ 208 Generate a new API-Key for the user connected. 209 """ 210 params = {"user": username, "password": password} 211 return self._request( 212 "keygen", 213 method="POST", 214 params=params, 215 ).xpath(".//key/text()")[0]
Generate a new API-Key for the user connected.
269 def get_tree(self, extended=False) -> Element: 270 """ 271 Return the running configuration 272 The differences with `running_config` are not known 273 """ 274 tree = get_tree( 275 self._host, self._api_key, verify=self._verify, logger=self.logger 276 ) 277 if extended: 278 self._extend_tree_information(tree) 279 return tree
Return the running configuration
The differences with running_config
are not known
298 def get_rule_use(self, tree=None, max_threads: Optional[int] = None): 299 if tree is None: 300 tree = self.get_tree() 301 device_groups = tree.xpath("devices/*/device-group/*/@name") 302 positions = ("pre", "post") 303 # rule_types = tuple({x.tag for x in tree.xpath( 304 # "devices/*/device-group/*" 305 # "/*[self::post-rulebase or self::pre-rulebase]/*")}) 306 rule_types = ("security", "pbf", "nat", "application-override") 307 args_list = list(product(device_groups, positions, rule_types)) 308 309 def func(args): 310 return self._get_rule_use(*args) 311 312 threads = len(args_list) 313 threads = min(max_threads or threads, threads) 314 with Pool(len(args_list)) as pool: 315 data = pool.map(func, args_list) 316 return [entry for entry_list in data for entry in entry_list]
330 def get_rule_hit_count(self, tree=None, max_threads=None): 331 if tree is None: 332 tree = self.get_tree() 333 device_groups = tree.xpath("devices/*/device-group/*/@name") 334 rulebases = ("pre-rulebase", "post-rulebase") 335 rule_types = ("security", "pbf", "nat", "application-override") 336 args_list = list(product(device_groups, rulebases, rule_types)) 337 338 def func(args): 339 return self._get_rule_hit_count(*args) 340 341 threads = len(args_list) 342 threads = min(max_threads or threads, threads) 343 with Pool(len(args_list)) as pool: 344 data = pool.map(func, args_list) 345 return [entry for entry_list in data for entry in entry_list]
372 def get(self, xpath: str): 373 """ 374 This will retrieve the xml definition based on the xpath 375 The xpath doesn't need to be exact 376 and can select multiple values at once. 377 Still, it must at least speciy /config at is begining 378 """ 379 return self._conf_request(xpath, action="show", method="GET")
This will retrieve the xml definition based on the xpath The xpath doesn't need to be exact and can select multiple values at once. Still, it must at least speciy /config at is begining
381 def delete(self, xpath: str): 382 """ 383 This will REMOVE the xml definition at the provided xpath. 384 The xpath must be exact. 385 """ 386 return self._conf_request( 387 xpath, 388 action="delete", 389 method="DELETE", 390 )
This will REMOVE the xml definition at the provided xpath. The xpath must be exact.
392 def create(self, xpath: str, xml_definition): 393 """ 394 This will ADD the xml definition 395 INSIDE the element at the provided xpath. 396 The xpath must be exact. 397 """ 398 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration 399 params = {"element": xml_definition} 400 return self._conf_request( 401 xpath, 402 action="set", 403 method="POST", 404 params=params, 405 )
This will ADD the xml definition INSIDE the element at the provided xpath. The xpath must be exact.
407 def update(self, xpath: str, xml_definition): 408 """ 409 This will REPLACE the xml definition 410 INSTEAD of the element at the provided xpath 411 The xpath must be exact. 412 Nb: We can pull the whole config, update it locally, 413 and push the final result 414 """ 415 # https://docs.paloaltonetworks.com/pan-os/9-1/pan-os-panorama-api/pan-os-xml-api-request-types/configuration-api/set-configuration 416 params = {"element": xml_definition} 417 return self._conf_request( 418 xpath, 419 action="edit", 420 method="POST", 421 params=params, 422 )
This will REPLACE the xml definition INSTEAD of the element at the provided xpath The xpath must be exact. Nb: We can pull the whole config, update it locally, and push the final result
424 def revert_changes(self, skip_validated: bool = False): 425 """ 426 Revert all the changes made on Panorama. 427 NOTE: 428 - This only applies on non-commited changes. 429 - This revert everything (not scoped by users) 430 431 skip_validated: Do not revert changes that were validated 432 """ 433 skip = "<skip-validate>yes</skip-validate>" if skip_validated else "" 434 cmd = f"<revert><config>{skip}</config></revert>" 435 return self._op_request(cmd)
Revert all the changes made on Panorama. NOTE:
- This only applies on non-commited changes.
- This revert everything (not scoped by users)
skip_validated: Do not revert changes that were validated
437 def validate_changes(self): 438 """ 439 Validated all the changes currently made 440 """ 441 cmd = "<validate><full></full></validate>" 442 return self._op_request(cmd)
Validated all the changes currently made
453 def get_push_scope_devicegroups(self, admin=None): 454 """ 455 Gives detailed information about pending changes 456 (e.g. xpath, owner, action, ...) 457 """ 458 scope = self._raw_get_push_scope(admin=admin) 459 return list(set(scope.xpath(".//objects/entry[@loc-type='device-group']/@loc")))
Gives detailed information about pending changes (e.g. xpath, owner, action, ...)
461 def uncommited_changes(self): 462 """ 463 Gives detailed information about pending changes 464 (e.g. xpath, owner, action, ...) 465 """ 466 cmd = "<show><config><list><changes></changes></list></config></show>" 467 return self._op_request(cmd)
Gives detailed information about pending changes (e.g. xpath, owner, action, ...)
469 def uncommited_changes_summary(self, admin=None): 470 """ 471 Only gives the concern device groups 472 """ 473 admin = ( 474 f"<partial><admin><member>{admin}</member></admin></partial>" 475 if admin 476 else "" 477 ) 478 cmd = f"<show><config><list><change-summary>{admin}</change-summary></list></config></show>" 479 return self._op_request(cmd)
Only gives the concern device groups
481 def pending_changes(self): 482 """ 483 Result content is either 'yes' or 'no' 484 """ 485 cmd = "<check><pending-changes></pending-changes></check>" 486 return self._op_request(cmd)
Result content is either 'yes' or 'no'
488 def save_config(self, name): 489 """ 490 Create a named snapshot of the current configuration 491 """ 492 cmd = f"<save><config><to>{name}</to></config></save>" 493 return "\n".join(self._op_request(cmd).xpath(".//result/text()"))
Create a named snapshot of the current configuration
495 def save_device_state(self): 496 """ 497 Create a snapshot of the current device state 498 """ 499 cmd = "<save><device-state></device-state></save>" 500 return "\n".join(self._op_request(cmd).xpath(".//result/text()"))
Create a snapshot of the current device state
502 def get_named_configuration(self, name): 503 """ 504 Get the configuration from a named snapshot as an XML object 505 """ 506 cmd = f"<show><config><saved>{name}</saved></config></show>" 507 return self._op_request(cmd, remove_blank_text=False).xpath("./result/config")[ 508 0 509 ]
Get the configuration from a named snapshot as an XML object
511 def candidate_config(self) -> Element: 512 """ 513 Get the configuration to be commited as an XML object 514 """ 515 cmd = "<show><config><candidate></candidate></config></show>" 516 return self._op_request(cmd, remove_blank_text=False)
Get the configuration to be commited as an XML object
518 def running_config(self) -> Element: 519 """ 520 Get the current running configuration as an XML object 521 """ 522 cmd = "<show><config><running></running></config></show>" 523 return self._op_request(cmd, remove_blank_text=False)
Get the current running configuration as an XML object
540 def get_jobs(self, job_ids: Union[None, str, List[str]] = None) -> List[types.Job]: 541 """ 542 Get information of job(s) 543 Retrieve all jobs by default. 544 545 If job_id is provided, then only retrieve the job requested. 546 """ 547 job_xmls = self._raw_get_jobs(job_ids).xpath(".//job") 548 transformed = (types.Job.from_xml(x) for x in job_xmls) 549 return [j for j in transformed if j]
Get information of job(s) Retrieve all jobs by default.
If job_id is provided, then only retrieve the job requested.
551 def get_job(self, job_id) -> types.Job: 552 """ 553 Get information of job(s) 554 Retrieve all jobs by default. 555 556 If job_id is provided, then only retrieve the job requested. 557 """ 558 return self.get_jobs(job_id)[0]
Get information of job(s) Retrieve all jobs by default.
If job_id is provided, then only retrieve the job requested.
567 def get_versions(self) -> List[types.SoftwareVersion]: 568 """ 569 Get the versions informations 570 """ 571 res = self._raw_get_versions() 572 return [ 573 types.SoftwareVersion.from_xml(entry) 574 for entry in res.xpath(".//sw-updates/versions/entry") 575 ]
Get the versions informations
577 def wait_job_completion(self, job_id: str, waiter=None) -> types.Job: 578 """ 579 Block until the job complete. 580 581 job_id: the job to wait upon 582 waiter: a generator that yield when a new query must be done. 583 see `wait` function (the default waiter) for an example 584 """ 585 if not waiter: 586 waiter = wait() 587 for _ in waiter: 588 job = self.get_job(job_id) 589 if job.progress >= 100: 590 return job 591 self.logger.info(f"Job {job_id} progress: {job.progress}") 592 raise Exception("Timeout while waiting for job completion")
Block until the job complete.
job_id: the job to wait upon
waiter: a generator that yield when a new query must be done.
see wait
function (the default waiter) for an example
594 def raw_get_pending_jobs(self): 595 """ 596 Get all the jobs that are pending as a XML object 597 """ 598 cmd = "<show><jobs><pending></pending></jobs></show>" 599 return self._op_request(cmd)
Get all the jobs that are pending as a XML object
601 def commit_changes(self, force: bool = False): 602 """ 603 Commit all changes 604 """ 605 cmd = "<commit>{}</commit>".format("<force></force>" if force else "") 606 return self._commit_request(cmd)
Commit all changes
624 def add_config_lock(self, comment=None, vsys="shared", no_exception=False) -> bool: 625 comment = f"<comment>{comment}</comment>" if comment else "" 626 cmd = f"<request><config-lock><add>{comment}</add></config-lock></request>" 627 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
633 def add_commit_lock(self, comment=None, vsys="shared", no_exception=False) -> bool: 634 comment = f"<comment>{comment}</comment>" if comment else "" 635 cmd = f"<request><commit-lock><add>{comment}</add></commit-lock></request>" 636 return self._lock_cmd(cmd, vsys=vsys, no_exception=no_exception)
642 def set_ha_status(self, active: bool = True, target: Optional[str] = None): 643 """ 644 Activate or Deactivate (suspend) the HA pair. 645 646 """ 647 status = "<functional></functional>" if active else "<suspend></suspend>" 648 cmd = f"<request><high-availability><state>{status}</state></high-availability></request>" 649 params = {"target": target} if target else None 650 return self._op_request(cmd, params=params).xpath(".//result/text()")[0]
Activate or Deactivate (suspend) the HA pair.
652 def set_ha_preemption(self, active=True, target=None): 653 """ 654 NOT WORKING: 655 There is currently no way to deactivate the preemption using the API. 656 """ 657 raise Exception("set_ha_preemption not implementend")
NOT WORKING: There is currently no way to deactivate the preemption using the API.
668 def get_ha_info(self, state_only=False, target=None) -> types.HAInfo: 669 """ 670 Get the current state of a HA pair as a python object. 671 """ 672 res = self._raw_get_ha_info(state_only=state_only, target=target) 673 hainfo_xml = res.xpath(".//result")[0] 674 # pprint(hainfo_xml) 675 return types.HAInfo.from_xml(hainfo_xml)
Get the current state of a HA pair as a python object.
677 def get_ha_pairs( 678 self, connected=True 679 ) -> Tuple[List[Tuple[types.Device, Optional[types.Device]]], List[types.Device]]: 680 """ 681 Retrieve a tuple containing 2 values: 682 1. The list of HA pairs and their members 683 2. A list of devices that are not part of a HA pair 684 """ 685 # Get all devices and index them using their serial number 686 devices: List[types.Device] = self.get_devices(connected=connected) 687 device_map = {d.serial: d for d in devices} 688 689 # Create the 2 lists by iterating over the devices 690 done = set() 691 ha_pairs = [] 692 without_ha = [] 693 for d in devices: 694 # Do not manage twice the same device 695 if d.serial in done: 696 continue 697 # The device does not have an HA peer 698 if not d.ha_peer_serial: 699 without_ha.append(d) 700 done.add(d.serial) 701 continue 702 # Get the current device's HA peer 703 # push them in the ha_pairs list 704 # and mark both of them as done 705 peer = device_map.get(d.ha_peer_serial) 706 ha_pairs.append((d, peer)) 707 done.update((d.serial, d.ha_peer_serial)) 708 return ha_pairs, without_ha
Retrieve a tuple containing 2 values:
- The list of HA pairs and their members
- A list of devices that are not part of a HA pair
710 def get_ha_pairs_map(self, connected=True): 711 """ 712 Same as `get_ha_pairs`, but the ha_pairs are return as a map. 713 This provides an easier and more readable lookup to find a pair: 714 715 mapping, _ = client.get_ha_pairs_map() 716 serial = "12345" 717 pair_of_serial = mapping[serial] 718 """ 719 ha_pairs, without_ha = self.get_ha_pairs(connected=connected) 720 map = {} 721 for pair in ha_pairs: 722 for device in pair: 723 map[device.serial] = pair 724 return map, without_ha
Same as get_ha_pairs
, but the ha_pairs are return as a map.
This provides an easier and more readable lookup to find a pair:
mapping, _ = client.get_ha_pairs_map() serial = "12345" pair_of_serial = mapping[serial]
726 def get_panorama_status(self): 727 """ 728 Get the current status of Panorama server. 729 """ 730 cmd = "<show><panorama-status></panorama-status></show>" 731 return self.operation(cmd).xpath(".//result")
Get the current status of Panorama server.
754 def get_devices(self, connected=False) -> List[types.Device]: 755 """ 756 Return the list of device known from Panorama as a python structure. 757 NOTE: This only works if the client is a Panorama server. 758 759 connected: only returns the devices that are connected 760 """ 761 res = self._raw_get_devices(connected=connected) 762 entries = res.xpath(".//devices/entry") 763 devices = (types.Device.from_xml(e) for e in entries) 764 return [d for d in devices if d]
Return the list of device known from Panorama as a python structure. NOTE: This only works if the client is a Panorama server.
connected: only returns the devices that are connected
773 def get_plan_dg_hierarchy(self, recursive=False): 774 """ 775 Return the hierarchy of device groups as a dict. 776 The keys are the names of the device groups. 777 778 The values are the children device groups and depends on the recursive parameter. 779 recursive: if False, the values are only the direct children of the device group. 780 Otherwise, the values are all the descendant device groups. 781 """ 782 devicegroups = {} # name: children 783 hierarchy = self._raw_get_dg_hierarchy().xpath(".//dg-hierarchy")[0] 784 xpath = ".//dg" if recursive else "./dg" 785 for dg in hierarchy.xpath(".//dg"): 786 devicegroups[dg.attrib["name"]] = [ 787 x.attrib["name"] for x in dg.xpath(xpath) 788 ] 789 return devicegroups
Return the hierarchy of device groups as a dict. The keys are the names of the device groups.
The values are the children device groups and depends on the recursive parameter. recursive: if False, the values are only the direct children of the device group. Otherwise, the values are all the descendant device groups.
798 def get_devicegroups_name( 799 self, 800 parents=None, 801 with_connected_devices=None, 802 ): 803 """ 804 This returns the names of the devicegroups: 805 - parents: the returned list will only contain children of the provided parents (parents included) 806 - with_devices: the returned list will only contain devicegroups that have direct devices under them 807 """ 808 devicegroups = self._raw_get_devicegroups().xpath(".//devicegroups/entry") 809 if with_connected_devices: 810 names = [ 811 dg.attrib["name"] 812 for dg in devicegroups 813 if dg.xpath("./devices/entry/connected[text() = 'yes']") 814 ] 815 else: 816 names = [dg.attrib["name"] for dg in devicegroups] 817 if parents: 818 hierarchy = self.get_plan_dg_hierarchy(recursive=True) 819 tokeep = set(chain(*(hierarchy.get(p, []) for p in parents))) | set(parents) 820 names = list(set(names) & tokeep) 821 return names
This returns the names of the devicegroups:
- parents: the returned list will only contain children of the provided parents (parents included)
- with_devices: the returned list will only contain devicegroups that have direct devices under them
834 def get_addresses(self) -> List[types.Address]: 835 """ 836 Return the list of addresses known from Panorama as a python structure. 837 NOTE: This only works if the client is a Firewall. 838 """ 839 res = self._raw_get_addresses() 840 addresses = res.xpath(".//address/entry") 841 return [types.Address.from_xml(i) for i in addresses]
Return the list of addresses known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.
852 def get_routing_tables(self) -> List[types.RoutingTable]: 853 """ 854 Return the list of interface known from Panorama as a python structure. 855 NOTE: This only works if the client is a Firewall. 856 """ 857 res = self._raw_get_routing_tables() 858 routing_tables = res.xpath(".//routing-table") 859 # print(len(routing_tables)) 860 # from pa_api.xmlapi.utils import pprint 861 # for r in routing_tables: 862 # pprint(r) 863 return [types.RoutingTable.from_xml(i) for i in routing_tables]
Return the list of interface known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.
872 def get_interfaces(self) -> List[types.Interface]: 873 """ 874 Return the list of interface known from Panorama as a python structure. 875 NOTE: This only works if the client is a Firewall. 876 """ 877 res = self._raw_get_interfaces() 878 interfaces = res.xpath(".//interface") 879 return [types.Interface.from_xml(i) for i in interfaces]
Return the list of interface known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.
892 def get_zones(self) -> Element: 893 """ 894 Return the list of zones known from Panorama as a python structure. 895 NOTE: This only works if the client is a Firewall. 896 """ 897 res = self._raw_get_zones() 898 zones = res.xpath(".//zone/entry") 899 return [types.Zone.from_xml(i) for i in zones]
Return the list of zones known from Panorama as a python structure. NOTE: This only works if the client is a Firewall.
912 def get_templates_sync_status(self): 913 """ 914 Return the synchronization status the templates per devices 915 A device is in sync if it is up-to-date with the current version on Panorama. 916 NOTE: This only works on Panorama. 917 918 The result is a list of tuple of 3 values: 919 1. the template's name 920 2. the device's name 921 3. the sync status 922 """ 923 res = self._raw_get_templates() 924 statuses = [] 925 for entry in res.xpath("./result/templates/entry"): 926 template_name = entry.attrib["name"] 927 for device in entry.xpath("./devices/entry"): 928 device_name = device.attrib["name"] 929 template_status = next(device.xpath("./template-status/text()"), None) 930 status = (template_name, device_name, template_status) 931 statuses.append(status) 932 return statuses
Return the synchronization status the templates per devices A device is in sync if it is up-to-date with the current version on Panorama. NOTE: This only works on Panorama.
The result is a list of tuple of 3 values:
- the template's name
- the device's name
- the sync status
944 def get_vpn_flows(self, name=None): 945 """ 946 Returns the VPN flow information as a python structure. 947 NOTE: This only works on Panorama server, not firewalls 948 """ 949 entries = self._raw_get_vpn_flows(name=name).xpath(".//IPSec/entry") 950 return [types.VPNFlow.from_xml(e) for e in entries]
Returns the VPN flow information as a python structure. NOTE: This only works on Panorama server, not firewalls
959 def system_info(self) -> types.operations.SystemInfo: 960 """ 961 Returns informations about the system as a XML object. 962 """ 963 xml = self._raw_system_info() 964 info = xml.xpath("./result/system")[0] 965 return types.operations.SystemInfo.from_xml(info)
Returns informations about the system as a XML object.
975 def system_resources(self): 976 """ 977 Get the system resouces as a string. 978 The string is the raw output of a `ps` command. 979 """ 980 res = self._raw_system_resources() 981 text = res.xpath(".//result/text()")[0] 982 return text.split("\n\n")[0]
Get the system resouces as a string.
The string is the raw output of a ps
command.
992 def download_software(self, version) -> Optional[str]: 993 """ 994 Download the software version on the device. 995 version: the software version to download 996 997 Returns the download job's ID in case of successful launch, 998 None is returned otherwise. 999 """ 1000 res = self._raw_download_software(version) 1001 try: 1002 return res.xpath(".//job/text()")[0] 1003 except Exception: 1004 self.logger.debug("Download has not started") 1005 return None
Download the software version on the device. version: the software version to download
Returns the download job's ID in case of successful launch, None is returned otherwise.
1015 def install_software( 1016 self, version: Union[None, str, types.SoftwareVersion] 1017 ) -> Optional[str]: 1018 """ 1019 Install the software version on the device. 1020 version: the software version to install 1021 1022 Returns the download job's ID in case of successful launch, 1023 None is returned otherwise. 1024 """ 1025 if isinstance(version, types.SoftwareVersion): 1026 version = version.version 1027 res = self._raw_install_software(version) 1028 try: 1029 return res.xpath(".//job/text()")[0] 1030 except Exception: 1031 self.logger.debug("Download has not started") 1032 return None
Install the software version on the device. version: the software version to install
Returns the download job's ID in case of successful launch, None is returned otherwise.
1041 def restart(self): 1042 """ 1043 Restart the device 1044 """ 1045 return "".join(self._raw_restart().xpath(".//result/text()"))
Restart the device
1047 def automatic_download_software( 1048 self, version: Optional[str] = None 1049 ) -> types.SoftwareVersion: 1050 """ 1051 Automatically download the requested software version. 1052 if the version is not provided, it defaults to the latest one. 1053 1054 NOTE: This does not do the installation. 1055 This is usefull to download in anticipation of the upgrade. 1056 For automatic install, see `automatic_software_upgrade` 1057 """ 1058 version_str = version 1059 try: 1060 versions = self.get_versions() 1061 except ServerError: 1062 raise Exception( 1063 "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers." 1064 ) 1065 sw_version = None 1066 if not version_str: 1067 sw_version = next((v for v in versions if v.latest), None) 1068 else: 1069 sw_version = next((v for v in versions if v.version == version_str), None) 1070 if not sw_version: 1071 self.logger.error(f"Version {version_str} not found") 1072 return exit(1) 1073 1074 # Already downloaded: Nothing to do 1075 if sw_version.downloaded: 1076 self.logger.info(f"Version {sw_version.version} already downloaded") 1077 return sw_version 1078 1079 # Download minor version first (required) 1080 base_version = next( 1081 v for v in versions if v.version == sw_version.base_minor_version 1082 ) 1083 if not base_version.downloaded: 1084 self.logger.info( 1085 f"Launching download of minor version {base_version.version}" 1086 ) 1087 job_id = self.download_software(base_version.version) 1088 if not job_id: 1089 raise Exception("Download has not started") 1090 job = self.wait_job_completion(job_id) 1091 if job.result != "OK": 1092 self.logger.debug(job) 1093 raise Exception(job.details) 1094 print(job.details) 1095 1096 # Actually download the wanted version 1097 self.logger.info(f"Launching download of version {sw_version.version}") 1098 job_id = self.download_software(sw_version.version) 1099 if not job_id: 1100 raise Exception("Download has not started") 1101 job = self.wait_job_completion(job_id) 1102 if job.result != "OK": 1103 self.logger.debug(job) 1104 raise Exception(job.details) 1105 self.logger.info(job.details) 1106 return sw_version
Automatically download the requested software version. if the version is not provided, it defaults to the latest one.
NOTE: This does not do the installation.
This is usefull to download in anticipation of the upgrade.
For automatic install, see automatic_software_upgrade
1108 def automatic_software_upgrade( 1109 self, 1110 version: Optional[str] = None, 1111 install: bool = True, 1112 restart: bool = True, 1113 suspend: bool = False, 1114 ): 1115 """ 1116 Automatically download and install the requested software version. 1117 if the version is not provided, it defaults to the latest one. 1118 1119 NOTE: This does the software install and restart by default. 1120 If you only want to download, prefer to use `automatic_download_software` method, 1121 or set install=False. See the parameters for more information. 1122 1123 install: install the software after the download 1124 restart: restart the device after the installation. This option is ignored if install=False 1125 1126 """ 1127 sw_version = self.automatic_download_software(version) 1128 if sw_version.current: 1129 self.logger.info(f"Version {sw_version.version} is already installed") 1130 return sw_version 1131 if not install: 1132 return sw_version 1133 # We may get the following error: 1134 # "Error: Upgrading from 10.2.4-h10 to 11.1.2 requires a content version of 8761 or greater and found 8638-7689." 1135 # This should never happen, we decided to report the error and handle this manually 1136 self.logger.info(f"Launching install of version {sw_version.version}") 1137 1138 with self.suspended(suspend): 1139 job_id = self.install_software(sw_version.version) 1140 if not job_id: 1141 self.logger.error("Install has not started") 1142 raise Exception("Install has not started") 1143 job = self.wait_job_completion(job_id) 1144 self.logger.info(job.details) 1145 1146 # Do not restart if install failed 1147 if job.result != "OK": 1148 self.logger.error("Failed to install software version") 1149 return sw_version 1150 1151 if restart: 1152 self.logger.info("Restarting the device") 1153 restart_response = self.restart() 1154 self.logger.info(restart_response) 1155 return sw_version
Automatically download and install the requested software version. if the version is not provided, it defaults to the latest one.
NOTE: This does the software install and restart by default.
If you only want to download, prefer to use automatic_download_software
method,
or set install=False. See the parameters for more information.
install: install the software after the download restart: restart the device after the installation. This option is ignored if install=False
1157 @contextmanager 1158 def suspended(self, suspend=True): 1159 if not suspend: 1160 try: 1161 yield self 1162 finally: 1163 return 1164 self.logger.info("Suspending device") 1165 self.set_ha_status(active=False) 1166 try: 1167 yield self 1168 finally: 1169 self.check_availability() 1170 self.logger.info("Unsuspending device...") 1171 for _ in range(3): 1172 try: 1173 self.set_ha_status(active=True) 1174 break 1175 except Exception as e: 1176 self.logger.error(e) 1177 # time.sleep() 1178 else: 1179 raise UnsuspendError("Failed to unsuspend device") 1180 self.logger.info("Device successfully unsuspended")
1182 def wait_availability(self): 1183 logger = self.logger 1184 logger.info("Checking availability. Waiting for response...") 1185 max_duration = 1800 1186 for duration in wait(duration=max_duration): 1187 try: 1188 versions = self.get_versions() 1189 current = next(v for v in versions if v.current) 1190 if current is None: 1191 logger.warning("Device is not not answering") 1192 else: 1193 logger.info(f"Device responded after {duration} seconds") 1194 return current 1195 except ( 1196 urllib3.exceptions.MaxRetryError, 1197 requests.exceptions.ConnectionError, 1198 ): 1199 logger.warning("Firewall still not responding") 1200 except ServerError: 1201 raise Exception( 1202 "An error occured on the device while retrieving the device's versions. Be sure that the device can contact PaloAlto's servers." 1203 ) 1204 except Exception as e: 1205 logger.debug(f"Unexpected error of type {type(e)} occured on firewall") 1206 logger.error(f"Firewall is still not responding: {e}") 1207 raise Exception( 1208 f"Timeout while waiting for availability of firewall. Waited for {max_duration} seconds" 1209 )
1211 def check_availability(self): 1212 logger = self.logger 1213 ## Wait for the FW to respond 1214 version = self.wait_availability() 1215 if not version: 1216 logger.error("Device never responded") 1217 return False 1218 logger.info(f"Firewall is available on version {version.version}") 1219 return True