pa_api.restapi.restapi
1import json 2import logging 3from itertools import chain 4 5import requests 6 7# Remove warning for unverified certificate 8# https://stackoverflow.com/questions/27981545/suppress-insecurerequestwarning-unverified-https-request-is-being-made-in-pytho 9from requests.packages.urllib3.exceptions import InsecureRequestWarning 10 11from pa_api.constants import PANORAMA_ERRORS, SUCCESS_CODE 12from pa_api.utils import clean_url_host 13 14from .rest_resources import ( 15 PanoramaDevicesResourceType, 16 PanoramaNetworkResourceType, 17 PanoramaObjectsResourceType, 18 PanoramaPanoramaResourceType, 19 PanoramaPoliciesResourceType, 20) 21 22requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 23 24OBJECT_RESOURCES = [ 25 "Addresses", 26 "AddressGroups", 27 "Regions", 28 "Applications", 29 "ApplicationGroups", 30 "ApplicationFilters", 31 "Services", 32 "ServiceGroups", 33 "Tags", 34 "GlobalProtectHIPObjects", 35 "GlobalProtectHIPProfiles", 36 "ExternalDynamicLists", 37 "CustomDataPatterns", 38 "CustomSpywareSignatures", 39 "CustomVulnerabilitySignatures", 40 "CustomURLCategories", 41 "AntivirusSecurityProfiles", 42 "AntiSpywareSecurityProfiles", 43 "VulnerabilityProtectionSecurityProfiles", 44 "URLFilteringSecurityProfiles", 45 "FileBlockingSecurityProfiles", 46 "WildFireAnalysisSecurityProfiles", 47 "DataFilteringSecurityProfiles", 48 "DoSProtectionSecurityProfiles", 49 "SecurityProfileGroups", 50 "LogForwardingProfiles", 51 "AuthenticationEnforcements", 52 "DecryptionProfiles", 53 "DecryptionForwardingProfiles", 54 "Schedules", 55 "SDWANPathQualityProfiles", 56 "SDWANTrafficDistributionProfiles", 57] 58 59POLICY_RESOURCES = [ 60 "SecurityRules", 61 "NATRules", 62 "QoSRules", 63 "PolicyBasedForwardingRules", 64 "DecryptionRules", 65 "TunnelInspectionRules", 66 "ApplicationOverrideRules", 67 "AuthenticationRules", 68 "DoSRules", 69 "SDWANRules", 70] 71 72NETWORK_RESOURCES = [ 73 "EthernetInterfaces", 74 "AggregateEthernetInterfaces", 75 "VLANInterfaces", 76 "LoopbackInterfaces", 77 "TunnelIntefaces", 78 "SDWANInterfaces", 79 "Zones", 80 "VLANs", 81 "VirtualWires", 82 "VirtualRouters", 83 "IPSecTunnels", 84 "GRETunnels", 85 "DHCPServers", 86 "DHCPRelays", 87 "DNSProxies", 88 "GlobalProtectPortals", 89 "GlobalProtectGateways", 90 "GlobalProtectGatewayAgentTunnels", 91 "GlobalProtectGatewaySatelliteTunnels", 92 "GlobalProtectGatewayMDMServers", 93 "GlobalProtectClientlessApps", 94 "GlobalProtectClientlessAppGroups", 95 "QoSInterfaces", 96 "LLDP", 97 "GlobalProtectIPSecCryptoNetworkProfiles", 98 "IKEGatewayNetworkProfiles", 99 "IKECryptoNetworkProfiles", 100 "MonitorNetworkProfiles", 101 "InterfaceManagementNetworkProfiles", 102 "ZoneProtectionNetworkProfiles", 103 "QoSNetworkProfiles", 104 "LLDPNetworkProfiles", 105 "SDWANInterfaceProfiles", 106] 107 108DEVICE_RESOURCES = [ 109 "VirtualSystems", 110] 111 112DEFAULT_PARAMS = { 113 "output-format": "json", 114} 115 116 117class PanoramaAPI: 118 def __init__(self, api_key=None, verbose=False, verify=False, logger=None): 119 self._verbose = verbose 120 self._verify = verify 121 self._api_key = api_key 122 self.logger = logger or logging 123 124 def _inner_request( 125 self, 126 method, 127 url, 128 params=None, 129 headers=None, 130 data=None, 131 verify=None, 132 ): 133 if params is None: 134 params = {} 135 if headers is None: 136 headers = {} 137 if verify is None: 138 verify = self._verify 139 default_headers = { 140 "X-PAN-KEY": self._api_key, 141 # 'Accept': 'application/json, application/xml', 142 # 'Content-Type': 'application/json' 143 } 144 headers = {**default_headers, **headers} 145 params = {**DEFAULT_PARAMS, **params} 146 res = requests.request( 147 method, 148 url, 149 params=params, 150 headers=headers, 151 verify=verify, 152 ) 153 # The API always returns a json, no matter what 154 # if not res.ok: 155 # return None 156 try: 157 data = res.json() 158 code = int( 159 data.get("@code") or data.get("code") or SUCCESS_CODE, 160 ) # Sometimes, the code is a string, some other times it is a int 161 status = data.get("@status", "") 162 success = status == "success" 163 error_occured = ( 164 not res.ok 165 or ( 166 not success and code < SUCCESS_CODE 167 ) # In case of success, the value 19 is used 168 ) 169 if not error_occured: 170 return data, None 171 message = ( 172 data.get("message") 173 or PANORAMA_ERRORS.get(data["@code"]) 174 or "Something happened: " + json.dumps(data) 175 ) 176 error = f"(CODE: {code}) {message}" 177 if self._verbose: 178 causes = list( 179 chain( 180 *( 181 details.get("causes", {}) 182 for details in data.get("details", []) 183 ), 184 ), 185 ) 186 details = "".join(c.get("description") for c in causes) 187 error = f"{error} {details}" 188 return data, error 189 except Exception as e: 190 return None, str(e) 191 192 def _request( 193 self, 194 method, 195 url, 196 params=None, 197 headers=None, 198 data=None, 199 verify=None, 200 no_exception=False, 201 ): 202 data, error = ( 203 self._inner_request( 204 method, 205 url, 206 params=params, 207 headers=headers, 208 data=data, 209 verify=verify, 210 ) 211 or {} 212 ) 213 if error: 214 if no_exception: 215 self.logger.error(f"Could not {method.lower()} {url}: {error}") 216 return data, error 217 raise Exception(error) 218 data = data.get("result", {}).get("entry") or [] 219 return data, error 220 221 def request(self, method, url, params=None, headers=None, data=None, verify=None): 222 data, _ = ( 223 self._request( 224 method, 225 url, 226 params=params, 227 headers=headers, 228 data=data, 229 verify=verify, 230 ) 231 or {} 232 ) 233 return data 234 235 def get(self, url, params=None, headers=None, data=None, verify=None): 236 data, _ = ( 237 self._request( 238 "GET", 239 url, 240 params=params, 241 headers=headers, 242 data=data, 243 verify=verify, 244 ) 245 or {} 246 ) 247 return data 248 # return data.get("result", {}).get("entry") or [] 249 250 def delete(self, url, params=None, headers=None, data=None, verify=None): 251 data, _ = ( 252 self._request( 253 "DELETE", 254 url, 255 params=params, 256 headers=headers, 257 data=data, 258 verify=verify, 259 ) 260 or {} 261 ) 262 return data 263 264 265class PanoramaClient: 266 """ 267 Wrapper for the PaloAlto REST API 268 Resources (e.g. Addresses, Tags, ..) are grouped under their resource types. 269 See https://docs.paloaltonetworks.com/pan-os/10-1/pan-os-panorama-api/get-started-with-the-pan-os-rest-api/access-the-rest-api#id0e536ca4-6154-4188-b70f-227c2c113ec4 270 271 Attributes: 272 273 - objects: groups all the objects (Address, Tag, Service, ...) 274 - policies: groups all the policies (Security, NAT, ...) 275 - network: groups all the network resources (e.g. EthernetInterfaces, VLANInterfaces, ...) 276 - device: groups all device-related resources (only VirtualSystems) 277 - panorama: groups all panorama-management-related resources (only DeviceGroups) 278 """ 279 280 objects: PanoramaObjectsResourceType 281 policies: PanoramaPoliciesResourceType 282 network: PanoramaNetworkResourceType 283 device: PanoramaDevicesResourceType 284 panorama: PanoramaPanoramaResourceType 285 286 def __init__( 287 self, 288 domain, 289 api_key=None, 290 version="v10.1", 291 verify=False, 292 verbose=False, 293 ): 294 domain, _, _ = clean_url_host(domain) 295 client = PanoramaAPI(api_key=api_key, verbose=verbose, verify=verify) 296 self.client = client 297 self.objects = PanoramaObjectsResourceType(client, domain, version=version) 298 self.policies = PanoramaPoliciesResourceType(client, domain, version=version) 299 self.network = PanoramaNetworkResourceType(client, domain, version=version) 300 self.device = PanoramaDevicesResourceType(client, domain, version=version) 301 self.panorama = PanoramaPanoramaResourceType(client, domain, version=version) 302 303 304__all__ = [ 305 "PanoramaClient", 306]
class
PanoramaClient:
266class PanoramaClient: 267 """ 268 Wrapper for the PaloAlto REST API 269 Resources (e.g. Addresses, Tags, ..) are grouped under their resource types. 270 See https://docs.paloaltonetworks.com/pan-os/10-1/pan-os-panorama-api/get-started-with-the-pan-os-rest-api/access-the-rest-api#id0e536ca4-6154-4188-b70f-227c2c113ec4 271 272 Attributes: 273 274 - objects: groups all the objects (Address, Tag, Service, ...) 275 - policies: groups all the policies (Security, NAT, ...) 276 - network: groups all the network resources (e.g. EthernetInterfaces, VLANInterfaces, ...) 277 - device: groups all device-related resources (only VirtualSystems) 278 - panorama: groups all panorama-management-related resources (only DeviceGroups) 279 """ 280 281 objects: PanoramaObjectsResourceType 282 policies: PanoramaPoliciesResourceType 283 network: PanoramaNetworkResourceType 284 device: PanoramaDevicesResourceType 285 panorama: PanoramaPanoramaResourceType 286 287 def __init__( 288 self, 289 domain, 290 api_key=None, 291 version="v10.1", 292 verify=False, 293 verbose=False, 294 ): 295 domain, _, _ = clean_url_host(domain) 296 client = PanoramaAPI(api_key=api_key, verbose=verbose, verify=verify) 297 self.client = client 298 self.objects = PanoramaObjectsResourceType(client, domain, version=version) 299 self.policies = PanoramaPoliciesResourceType(client, domain, version=version) 300 self.network = PanoramaNetworkResourceType(client, domain, version=version) 301 self.device = PanoramaDevicesResourceType(client, domain, version=version) 302 self.panorama = PanoramaPanoramaResourceType(client, domain, version=version)
Wrapper for the PaloAlto REST API Resources (e.g. Addresses, Tags, ..) are grouped under their resource types. See https://docs.paloaltonetworks.com/pan-os/10-1/pan-os-panorama-api/get-started-with-the-pan-os-rest-api/access-the-rest-api#id0e536ca4-6154-4188-b70f-227c2c113ec4
Attributes:
- objects: groups all the objects (Address, Tag, Service, ...)
- policies: groups all the policies (Security, NAT, ...)
- network: groups all the network resources (e.g. EthernetInterfaces, VLANInterfaces, ...)
- device: groups all device-related resources (only VirtualSystems)
- panorama: groups all panorama-management-related resources (only DeviceGroups)
PanoramaClient(domain, api_key=None, version='v10.1', verify=False, verbose=False)
287 def __init__( 288 self, 289 domain, 290 api_key=None, 291 version="v10.1", 292 verify=False, 293 verbose=False, 294 ): 295 domain, _, _ = clean_url_host(domain) 296 client = PanoramaAPI(api_key=api_key, verbose=verbose, verify=verify) 297 self.client = client 298 self.objects = PanoramaObjectsResourceType(client, domain, version=version) 299 self.policies = PanoramaPoliciesResourceType(client, domain, version=version) 300 self.network = PanoramaNetworkResourceType(client, domain, version=version) 301 self.device = PanoramaDevicesResourceType(client, domain, version=version) 302 self.panorama = PanoramaPanoramaResourceType(client, domain, version=version)