cloudfloordns.client.sync.client

  1import json
  2import logging
  3import os
  4from typing import Dict, Iterable, List, Tuple
  5
  6import requests
  7import requests.auth
  8from ratelimit import RateLimitException, limits, sleep_and_retry
  9
 10from .domain import Domains
 11from .groups import Groups
 12from .record import Record, Records
 13from .zone import Zone, Zones
 14
 15DEFAULT_HEADERS = {
 16    "Accept": "application/json",
 17    "Content-Type": "application/json",
 18}
 19
 20DEFAULT_BASE_URL = "https://apiv3.mtgsy.net/api/v1"
 21
 22
 23class BaseClient:
 24    def __init__(
 25        self, username=None, apikey=None, url=DEFAULT_BASE_URL, throttling=True
 26    ) -> None:
 27        if not username:
 28            username = os.environ.get("CLOUDFLOOR_USERNAME", "").strip()
 29        if not username:
 30            raise Exception("username required")
 31
 32        if not apikey:
 33            apikey = os.environ.get("CLOUDFLOOR_APIKEY", "").strip()
 34        if not apikey:
 35            raise Exception("apikey required")
 36        self._username = username
 37        self._apikey = apikey
 38        self._url = url.rstrip("/")
 39        self._throttling = throttling
 40
 41    def _request(self, method, url, data=None, params=None, timeout=None):
 42        if not url.startswith("/"):
 43            raise Exception(
 44                f"url '{url}' is invalid: must be a path with a leading '/' "
 45            )
 46        if not data:
 47            data = {}
 48        url = f"{self._url}{url}"
 49        error_message = "Unknown error"
 50
 51        response = requests.request(
 52            method,
 53            url,
 54            params=params,
 55            auth=requests.auth.HTTPBasicAuth(self._username, self._apikey),
 56            headers=DEFAULT_HEADERS,
 57            data=json.dumps(data),
 58            allow_redirects=True,
 59            timeout=timeout,
 60        )
 61        res = response.json()
 62        error = res.get("error")
 63        if not error:
 64            error = res.get("message")
 65        if error:
 66            logging.debug(error)
 67            error_message = error
 68            if isinstance(error, dict):
 69                error_message = error.get("description", "Unknown error")
 70            if not isinstance(error_message, str):
 71                error_message = str(error_message)
 72            if "Too Many Requests" in error_message:
 73                raise RateLimitException(error_message, 10)
 74            raise Exception(error_message)
 75        return res.get("data")
 76
 77    def _get(self, url, data=None, params=None, timeout=None, throttling=None):
 78        return self._request("GET", url, data=data, params=params, timeout=timeout)
 79
 80    def _post(self, url, data=None, params=None, timeout=None, throttling=None):
 81        return self._request("POST", url, data=data, params=params, timeout=timeout)
 82
 83    def _patch(self, url, data=None, params=None, timeout=None, throttling=None):
 84        return self._request("PATCH", url, data=data, params=params, timeout=timeout)
 85
 86    def _delete(self, url, data=None, params=None, timeout=None, throttling=None):
 87        return self._request("DELETE", url, data=data, params=params, timeout=timeout)
 88
 89    # https://stackoverflow.com/questions/401215/how-to-limit-rate-of-requests-to-web-services-in-python
 90    # @sleep_and_retry
 91    # @limits(calls=200, period=60)
 92    # def _limited_request(self, method, url, data=None, timeout=None):
 93    #     return self._request(method, url, data=data, timeout=timeout)
 94
 95    @sleep_and_retry
 96    @limits(calls=120, period=60)
 97    def _limited_get(self, url, data=None, params=None, timeout=None):
 98        return self._get(url, data=data, params=params, timeout=timeout)
 99
100    @sleep_and_retry
101    @limits(calls=30, period=60)
102    def _limited_post(self, url, data=None, params=None, timeout=None):
103        return self._post(url, data=data, params=params, timeout=timeout)
104
105    @sleep_and_retry
106    @limits(calls=60, period=60)
107    def _limited_patch(self, url, data=None, params=None, timeout=None):
108        return self._patch(url, data=data, params=params, timeout=timeout)
109
110    @sleep_and_retry
111    @limits(calls=30, period=60)
112    def _limited_delete(self, url, data=None, params=None, timeout=None):
113        return self._delete(url, data=data, params=params, timeout=timeout)
114
115    def get(self, url, data=None, params=None, timeout=None, throttling=None):
116        if throttling is None:
117            throttling = self._throttling
118        if throttling:
119            return self._limited_get(url, data=data, params=params, timeout=timeout)
120        return self._get(url, data=data, params=params, timeout=timeout)
121
122    def post(self, url, data=None, params=None, timeout=None, throttling=None):
123        if throttling is None:
124            throttling = self._throttling
125        if throttling:
126            return self._limited_post(url, data=data, params=params, timeout=timeout)
127        return self._post(url, data=data, params=params, timeout=timeout)
128
129    def patch(self, url, data=None, params=None, timeout=None, throttling=None):
130        if throttling is None:
131            throttling = self._throttling
132        if throttling:
133            return self._limited_patch(url, data=data, params=params, timeout=timeout)
134        return self._patch(url, data=data, params=params, timeout=timeout)
135
136    def delete(self, url, data=None, params=None, timeout=None, throttling=None):
137        if throttling is None:
138            throttling = self._throttling
139        if throttling:
140            return self._limited_delete(url, data=data, params=params, timeout=timeout)
141        return self._delete(url, data=data, params=params, timeout=timeout)
142
143
144class Client(BaseClient):
145    records: Records
146    zones: Zones
147    domains: Domains
148    groups: Groups
149
150    def __init__(
151        self, username=None, apikey=None, url=DEFAULT_BASE_URL, throttling=True
152    ) -> None:
153        super().__init__(
154            username=username, apikey=apikey, url=url, throttling=throttling
155        )
156        self.records = Records(self)
157        self.zones = Zones(self)
158        self.domains = Domains(self)
159        self.groups = Groups(self)
160
161    # @property
162    # def domains(self) -> Zones:
163    #     logging.warning(f"Attribute 'domains' in class '{Client}' is deprecated. Use 'zones' instead")
164    #     return self.zones
165
166    def yield_all_domains_records(self) -> Iterable[Tuple[Zone, List[Record]]]:
167        domains = self.zones.list()
168        for d in domains:
169            records = self.records.list(d.domainname)
170            yield d, records
171
172    def all_domains_records(self) -> Dict[Zone, List[Record]]:
173        return dict(self.yield_all_domains_records())
174
175
176__all__ = [
177    "Client",
178]
class Client(BaseClient):
145class Client(BaseClient):
146    records: Records
147    zones: Zones
148    domains: Domains
149    groups: Groups
150
151    def __init__(
152        self, username=None, apikey=None, url=DEFAULT_BASE_URL, throttling=True
153    ) -> None:
154        super().__init__(
155            username=username, apikey=apikey, url=url, throttling=throttling
156        )
157        self.records = Records(self)
158        self.zones = Zones(self)
159        self.domains = Domains(self)
160        self.groups = Groups(self)
161
162    # @property
163    # def domains(self) -> Zones:
164    #     logging.warning(f"Attribute 'domains' in class '{Client}' is deprecated. Use 'zones' instead")
165    #     return self.zones
166
167    def yield_all_domains_records(self) -> Iterable[Tuple[Zone, List[Record]]]:
168        domains = self.zones.list()
169        for d in domains:
170            records = self.records.list(d.domainname)
171            yield d, records
172
173    def all_domains_records(self) -> Dict[Zone, List[Record]]:
174        return dict(self.yield_all_domains_records())
Client( username=None, apikey=None, url='https://apiv3.mtgsy.net/api/v1', throttling=True)
151    def __init__(
152        self, username=None, apikey=None, url=DEFAULT_BASE_URL, throttling=True
153    ) -> None:
154        super().__init__(
155            username=username, apikey=apikey, url=url, throttling=throttling
156        )
157        self.records = Records(self)
158        self.zones = Zones(self)
159        self.domains = Domains(self)
160        self.groups = Groups(self)
def yield_all_domains_records( self) -> Iterable[Tuple[cloudfloordns.models.zone.Zone, List[cloudfloordns.models.record.Record]]]:
167    def yield_all_domains_records(self) -> Iterable[Tuple[Zone, List[Record]]]:
168        domains = self.zones.list()
169        for d in domains:
170            records = self.records.list(d.domainname)
171            yield d, records
def all_domains_records( self) -> Dict[cloudfloordns.models.zone.Zone, List[cloudfloordns.models.record.Record]]:
173    def all_domains_records(self) -> Dict[Zone, List[Record]]:
174        return dict(self.yield_all_domains_records())
Inherited Members
BaseClient
get
post
patch
delete