pa_api.restapi.restapi

  1import json
  2import logging
  3from itertools import chain
  4
  5import requests
  6
  7# Remove warning for unverified certificate
  8# https://stackoverflow.com/questions/27981545/suppress-insecurerequestwarning-unverified-https-request-is-being-made-in-pytho
  9from requests.packages.urllib3.exceptions import InsecureRequestWarning
 10
 11from pa_api.constants import PANORAMA_ERRORS, SUCCESS_CODE
 12from pa_api.utils import clean_url_host
 13
 14from .rest_resources import (
 15    PanoramaDevicesResourceType,
 16    PanoramaNetworkResourceType,
 17    PanoramaObjectsResourceType,
 18    PanoramaPanoramaResourceType,
 19    PanoramaPoliciesResourceType,
 20)
 21
 22requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
 23
 24OBJECT_RESOURCES = [
 25    "Addresses",
 26    "AddressGroups",
 27    "Regions",
 28    "Applications",
 29    "ApplicationGroups",
 30    "ApplicationFilters",
 31    "Services",
 32    "ServiceGroups",
 33    "Tags",
 34    "GlobalProtectHIPObjects",
 35    "GlobalProtectHIPProfiles",
 36    "ExternalDynamicLists",
 37    "CustomDataPatterns",
 38    "CustomSpywareSignatures",
 39    "CustomVulnerabilitySignatures",
 40    "CustomURLCategories",
 41    "AntivirusSecurityProfiles",
 42    "AntiSpywareSecurityProfiles",
 43    "VulnerabilityProtectionSecurityProfiles",
 44    "URLFilteringSecurityProfiles",
 45    "FileBlockingSecurityProfiles",
 46    "WildFireAnalysisSecurityProfiles",
 47    "DataFilteringSecurityProfiles",
 48    "DoSProtectionSecurityProfiles",
 49    "SecurityProfileGroups",
 50    "LogForwardingProfiles",
 51    "AuthenticationEnforcements",
 52    "DecryptionProfiles",
 53    "DecryptionForwardingProfiles",
 54    "Schedules",
 55    "SDWANPathQualityProfiles",
 56    "SDWANTrafficDistributionProfiles",
 57]
 58
 59POLICY_RESOURCES = [
 60    "SecurityRules",
 61    "NATRules",
 62    "QoSRules",
 63    "PolicyBasedForwardingRules",
 64    "DecryptionRules",
 65    "TunnelInspectionRules",
 66    "ApplicationOverrideRules",
 67    "AuthenticationRules",
 68    "DoSRules",
 69    "SDWANRules",
 70]
 71
 72NETWORK_RESOURCES = [
 73    "EthernetInterfaces",
 74    "AggregateEthernetInterfaces",
 75    "VLANInterfaces",
 76    "LoopbackInterfaces",
 77    "TunnelIntefaces",
 78    "SDWANInterfaces",
 79    "Zones",
 80    "VLANs",
 81    "VirtualWires",
 82    "VirtualRouters",
 83    "IPSecTunnels",
 84    "GRETunnels",
 85    "DHCPServers",
 86    "DHCPRelays",
 87    "DNSProxies",
 88    "GlobalProtectPortals",
 89    "GlobalProtectGateways",
 90    "GlobalProtectGatewayAgentTunnels",
 91    "GlobalProtectGatewaySatelliteTunnels",
 92    "GlobalProtectGatewayMDMServers",
 93    "GlobalProtectClientlessApps",
 94    "GlobalProtectClientlessAppGroups",
 95    "QoSInterfaces",
 96    "LLDP",
 97    "GlobalProtectIPSecCryptoNetworkProfiles",
 98    "IKEGatewayNetworkProfiles",
 99    "IKECryptoNetworkProfiles",
100    "MonitorNetworkProfiles",
101    "InterfaceManagementNetworkProfiles",
102    "ZoneProtectionNetworkProfiles",
103    "QoSNetworkProfiles",
104    "LLDPNetworkProfiles",
105    "SDWANInterfaceProfiles",
106]
107
108DEVICE_RESOURCES = [
109    "VirtualSystems",
110]
111
112DEFAULT_PARAMS = {
113    "output-format": "json",
114}
115
116
117class PanoramaAPI:
118    def __init__(self, api_key=None, verbose=False, verify=False, logger=None):
119        self._verbose = verbose
120        self._verify = verify
121        self._api_key = api_key
122        self.logger = logger or logging
123
124    def _inner_request(
125        self,
126        method,
127        url,
128        params=None,
129        headers=None,
130        data=None,
131        verify=None,
132    ):
133        if params is None:
134            params = {}
135        if headers is None:
136            headers = {}
137        if verify is None:
138            verify = self._verify
139        default_headers = {
140            "X-PAN-KEY": self._api_key,
141            # 'Accept': 'application/json, application/xml',
142            # 'Content-Type': 'application/json'
143        }
144        headers = {**default_headers, **headers}
145        params = {**DEFAULT_PARAMS, **params}
146        res = requests.request(
147            method,
148            url,
149            params=params,
150            headers=headers,
151            verify=verify,
152        )
153        # The API always returns a json, no matter what
154        # if not res.ok:
155        #     return None
156        try:
157            data = res.json()
158            code = int(
159                data.get("@code") or data.get("code") or SUCCESS_CODE,
160            )  # Sometimes, the code is a string, some other times it is a int
161            status = data.get("@status", "")
162            success = status == "success"
163            error_occured = (
164                not res.ok
165                or (
166                    not success and code < SUCCESS_CODE
167                )  # In case of success, the value 19 is used
168            )
169            if not error_occured:
170                return data, None
171            message = (
172                data.get("message")
173                or PANORAMA_ERRORS.get(data["@code"])
174                or "Something happened: " + json.dumps(data)
175            )
176            error = f"(CODE: {code}) {message}"
177            if self._verbose:
178                causes = list(
179                    chain(
180                        *(
181                            details.get("causes", {})
182                            for details in data.get("details", [])
183                        ),
184                    ),
185                )
186                details = "".join(c.get("description") for c in causes)
187                error = f"{error} {details}"
188            return data, error
189        except Exception as e:
190            return None, str(e)
191
192    def _request(
193        self,
194        method,
195        url,
196        params=None,
197        headers=None,
198        data=None,
199        verify=None,
200        no_exception=False,
201    ):
202        data, error = (
203            self._inner_request(
204                method,
205                url,
206                params=params,
207                headers=headers,
208                data=data,
209                verify=verify,
210            )
211            or {}
212        )
213        if error:
214            if no_exception:
215                self.logger.error(f"Could not {method.lower()} {url}: {error}")
216                return data, error
217            raise Exception(error)
218        data = data.get("result", {}).get("entry") or []
219        return data, error
220
221    def request(self, method, url, params=None, headers=None, data=None, verify=None):
222        data, _ = (
223            self._request(
224                method,
225                url,
226                params=params,
227                headers=headers,
228                data=data,
229                verify=verify,
230            )
231            or {}
232        )
233        return data
234
235    def get(self, url, params=None, headers=None, data=None, verify=None):
236        data, _ = (
237            self._request(
238                "GET",
239                url,
240                params=params,
241                headers=headers,
242                data=data,
243                verify=verify,
244            )
245            or {}
246        )
247        return data
248        # return data.get("result", {}).get("entry") or []
249
250    def delete(self, url, params=None, headers=None, data=None, verify=None):
251        data, _ = (
252            self._request(
253                "DELETE",
254                url,
255                params=params,
256                headers=headers,
257                data=data,
258                verify=verify,
259            )
260            or {}
261        )
262        return data
263
264
265class PanoramaClient:
266    """
267    Wrapper for the PaloAlto REST API
268    Resources (e.g. Addresses, Tags, ..) are grouped under their resource types.
269    See https://docs.paloaltonetworks.com/pan-os/10-1/pan-os-panorama-api/get-started-with-the-pan-os-rest-api/access-the-rest-api#id0e536ca4-6154-4188-b70f-227c2c113ec4
270
271    Attributes:
272
273        - objects: groups all the objects (Address, Tag, Service, ...)
274        - policies: groups all the policies (Security, NAT, ...)
275        - network: groups all the network resources (e.g. EthernetInterfaces, VLANInterfaces, ...)
276        - device: groups all device-related resources (only VirtualSystems)
277        - panorama: groups all panorama-management-related resources (only DeviceGroups)
278    """
279
280    objects: PanoramaObjectsResourceType
281    policies: PanoramaPoliciesResourceType
282    network: PanoramaNetworkResourceType
283    device: PanoramaDevicesResourceType
284    panorama: PanoramaPanoramaResourceType
285
286    def __init__(
287        self,
288        domain,
289        api_key=None,
290        version="v10.1",
291        verify=False,
292        verbose=False,
293    ):
294        domain, _, _ = clean_url_host(domain)
295        client = PanoramaAPI(api_key=api_key, verbose=verbose, verify=verify)
296        self.client = client
297        self.objects = PanoramaObjectsResourceType(client, domain, version=version)
298        self.policies = PanoramaPoliciesResourceType(client, domain, version=version)
299        self.network = PanoramaNetworkResourceType(client, domain, version=version)
300        self.device = PanoramaDevicesResourceType(client, domain, version=version)
301        self.panorama = PanoramaPanoramaResourceType(client, domain, version=version)
302
303
304__all__ = [
305    "PanoramaClient",
306]
class PanoramaClient:
266class PanoramaClient:
267    """
268    Wrapper for the PaloAlto REST API
269    Resources (e.g. Addresses, Tags, ..) are grouped under their resource types.
270    See https://docs.paloaltonetworks.com/pan-os/10-1/pan-os-panorama-api/get-started-with-the-pan-os-rest-api/access-the-rest-api#id0e536ca4-6154-4188-b70f-227c2c113ec4
271
272    Attributes:
273
274        - objects: groups all the objects (Address, Tag, Service, ...)
275        - policies: groups all the policies (Security, NAT, ...)
276        - network: groups all the network resources (e.g. EthernetInterfaces, VLANInterfaces, ...)
277        - device: groups all device-related resources (only VirtualSystems)
278        - panorama: groups all panorama-management-related resources (only DeviceGroups)
279    """
280
281    objects: PanoramaObjectsResourceType
282    policies: PanoramaPoliciesResourceType
283    network: PanoramaNetworkResourceType
284    device: PanoramaDevicesResourceType
285    panorama: PanoramaPanoramaResourceType
286
287    def __init__(
288        self,
289        domain,
290        api_key=None,
291        version="v10.1",
292        verify=False,
293        verbose=False,
294    ):
295        domain, _, _ = clean_url_host(domain)
296        client = PanoramaAPI(api_key=api_key, verbose=verbose, verify=verify)
297        self.client = client
298        self.objects = PanoramaObjectsResourceType(client, domain, version=version)
299        self.policies = PanoramaPoliciesResourceType(client, domain, version=version)
300        self.network = PanoramaNetworkResourceType(client, domain, version=version)
301        self.device = PanoramaDevicesResourceType(client, domain, version=version)
302        self.panorama = PanoramaPanoramaResourceType(client, domain, version=version)

Wrapper for the PaloAlto REST API Resources (e.g. Addresses, Tags, ..) are grouped under their resource types. See https://docs.paloaltonetworks.com/pan-os/10-1/pan-os-panorama-api/get-started-with-the-pan-os-rest-api/access-the-rest-api#id0e536ca4-6154-4188-b70f-227c2c113ec4

Attributes:

- objects: groups all the objects (Address, Tag, Service, ...)
- policies: groups all the policies (Security, NAT, ...)
- network: groups all the network resources (e.g. EthernetInterfaces, VLANInterfaces, ...)
- device: groups all device-related resources (only VirtualSystems)
- panorama: groups all panorama-management-related resources (only DeviceGroups)
PanoramaClient(domain, api_key=None, version='v10.1', verify=False, verbose=False)
287    def __init__(
288        self,
289        domain,
290        api_key=None,
291        version="v10.1",
292        verify=False,
293        verbose=False,
294    ):
295        domain, _, _ = clean_url_host(domain)
296        client = PanoramaAPI(api_key=api_key, verbose=verbose, verify=verify)
297        self.client = client
298        self.objects = PanoramaObjectsResourceType(client, domain, version=version)
299        self.policies = PanoramaPoliciesResourceType(client, domain, version=version)
300        self.network = PanoramaNetworkResourceType(client, domain, version=version)
301        self.device = PanoramaDevicesResourceType(client, domain, version=version)
302        self.panorama = PanoramaPanoramaResourceType(client, domain, version=version)
client