cloudfloordns.client.sync.client
1import json 2import logging 3import os 4from typing import Dict, Iterable, List, Tuple 5 6import requests 7import requests.auth 8from ratelimit import RateLimitException, limits, sleep_and_retry 9 10from .domain import Domains 11from .groups import Groups 12from .record import Record, Records 13from .zone import Zone, Zones 14 15DEFAULT_HEADERS = { 16 "Accept": "application/json", 17 "Content-Type": "application/json", 18} 19 20DEFAULT_BASE_URL = "https://apiv3.mtgsy.net/api/v1" 21 22 23class BaseClient: 24 def __init__( 25 self, username=None, apikey=None, url=DEFAULT_BASE_URL, throttling=True 26 ) -> None: 27 if not username: 28 username = os.environ.get("CLOUDFLOOR_USERNAME", "").strip() 29 if not username: 30 raise Exception("username required") 31 32 if not apikey: 33 apikey = os.environ.get("CLOUDFLOOR_APIKEY", "").strip() 34 if not apikey: 35 raise Exception("apikey required") 36 self._username = username 37 self._apikey = apikey 38 self._url = url.rstrip("/") 39 self._throttling = throttling 40 41 def _request(self, method, url, data=None, params=None, timeout=None): 42 if not url.startswith("/"): 43 raise Exception( 44 f"url '{url}' is invalid: must be a path with a leading '/' " 45 ) 46 if not data: 47 data = {} 48 url = f"{self._url}{url}" 49 error_message = "Unknown error" 50 51 response = requests.request( 52 method, 53 url, 54 params=params, 55 auth=requests.auth.HTTPBasicAuth(self._username, self._apikey), 56 headers=DEFAULT_HEADERS, 57 data=json.dumps(data), 58 allow_redirects=True, 59 timeout=timeout, 60 ) 61 res = response.json() 62 error = res.get("error") 63 if not error: 64 error = res.get("message") 65 if error: 66 logging.debug(error) 67 error_message = error 68 if isinstance(error, dict): 69 error_message = error.get("description", "Unknown error") 70 if not isinstance(error_message, str): 71 error_message = str(error_message) 72 if "Too Many Requests" in error_message: 73 raise RateLimitException(error_message, 10) 74 raise Exception(error_message) 75 return res.get("data") 76 77 def _get(self, url, data=None, params=None, timeout=None, throttling=None): 78 return self._request("GET", url, data=data, params=params, timeout=timeout) 79 80 def _post(self, url, data=None, params=None, timeout=None, throttling=None): 81 return self._request("POST", url, data=data, params=params, timeout=timeout) 82 83 def _patch(self, url, data=None, params=None, timeout=None, throttling=None): 84 return self._request("PATCH", url, data=data, params=params, timeout=timeout) 85 86 def _delete(self, url, data=None, params=None, timeout=None, throttling=None): 87 return self._request("DELETE", url, data=data, params=params, timeout=timeout) 88 89 # https://stackoverflow.com/questions/401215/how-to-limit-rate-of-requests-to-web-services-in-python 90 # @sleep_and_retry 91 # @limits(calls=200, period=60) 92 # def _limited_request(self, method, url, data=None, timeout=None): 93 # return self._request(method, url, data=data, timeout=timeout) 94 95 @sleep_and_retry 96 @limits(calls=120, period=60) 97 def _limited_get(self, url, data=None, params=None, timeout=None): 98 return self._get(url, data=data, params=params, timeout=timeout) 99 100 @sleep_and_retry 101 @limits(calls=30, period=60) 102 def _limited_post(self, url, data=None, params=None, timeout=None): 103 return self._post(url, data=data, params=params, timeout=timeout) 104 105 @sleep_and_retry 106 @limits(calls=60, period=60) 107 def _limited_patch(self, url, data=None, params=None, timeout=None): 108 return self._patch(url, data=data, params=params, timeout=timeout) 109 110 @sleep_and_retry 111 @limits(calls=30, period=60) 112 def _limited_delete(self, url, data=None, params=None, timeout=None): 113 return self._delete(url, data=data, params=params, timeout=timeout) 114 115 def get(self, url, data=None, params=None, timeout=None, throttling=None): 116 if throttling is None: 117 throttling = self._throttling 118 if throttling: 119 return self._limited_get(url, data=data, params=params, timeout=timeout) 120 return self._get(url, data=data, params=params, timeout=timeout) 121 122 def post(self, url, data=None, params=None, timeout=None, throttling=None): 123 if throttling is None: 124 throttling = self._throttling 125 if throttling: 126 return self._limited_post(url, data=data, params=params, timeout=timeout) 127 return self._post(url, data=data, params=params, timeout=timeout) 128 129 def patch(self, url, data=None, params=None, timeout=None, throttling=None): 130 if throttling is None: 131 throttling = self._throttling 132 if throttling: 133 return self._limited_patch(url, data=data, params=params, timeout=timeout) 134 return self._patch(url, data=data, params=params, timeout=timeout) 135 136 def delete(self, url, data=None, params=None, timeout=None, throttling=None): 137 if throttling is None: 138 throttling = self._throttling 139 if throttling: 140 return self._limited_delete(url, data=data, params=params, timeout=timeout) 141 return self._delete(url, data=data, params=params, timeout=timeout) 142 143 144class Client(BaseClient): 145 records: Records 146 zones: Zones 147 domains: Domains 148 groups: Groups 149 150 def __init__( 151 self, username=None, apikey=None, url=DEFAULT_BASE_URL, throttling=True 152 ) -> None: 153 super().__init__( 154 username=username, apikey=apikey, url=url, throttling=throttling 155 ) 156 self.records = Records(self) 157 self.zones = Zones(self) 158 self.domains = Domains(self) 159 self.groups = Groups(self) 160 161 # @property 162 # def domains(self) -> Zones: 163 # logging.warning(f"Attribute 'domains' in class '{Client}' is deprecated. Use 'zones' instead") 164 # return self.zones 165 166 def yield_all_domains_records(self) -> Iterable[Tuple[Zone, List[Record]]]: 167 domains = self.zones.list() 168 for d in domains: 169 records = self.records.list(d.domainname) 170 yield d, records 171 172 def all_domains_records(self) -> Dict[Zone, List[Record]]: 173 return dict(self.yield_all_domains_records()) 174 175 176__all__ = [ 177 "Client", 178]
145class Client(BaseClient): 146 records: Records 147 zones: Zones 148 domains: Domains 149 groups: Groups 150 151 def __init__( 152 self, username=None, apikey=None, url=DEFAULT_BASE_URL, throttling=True 153 ) -> None: 154 super().__init__( 155 username=username, apikey=apikey, url=url, throttling=throttling 156 ) 157 self.records = Records(self) 158 self.zones = Zones(self) 159 self.domains = Domains(self) 160 self.groups = Groups(self) 161 162 # @property 163 # def domains(self) -> Zones: 164 # logging.warning(f"Attribute 'domains' in class '{Client}' is deprecated. Use 'zones' instead") 165 # return self.zones 166 167 def yield_all_domains_records(self) -> Iterable[Tuple[Zone, List[Record]]]: 168 domains = self.zones.list() 169 for d in domains: 170 records = self.records.list(d.domainname) 171 yield d, records 172 173 def all_domains_records(self) -> Dict[Zone, List[Record]]: 174 return dict(self.yield_all_domains_records())
Client( username=None, apikey=None, url='https://apiv3.mtgsy.net/api/v1', throttling=True)
151 def __init__( 152 self, username=None, apikey=None, url=DEFAULT_BASE_URL, throttling=True 153 ) -> None: 154 super().__init__( 155 username=username, apikey=apikey, url=url, throttling=throttling 156 ) 157 self.records = Records(self) 158 self.zones = Zones(self) 159 self.domains = Domains(self) 160 self.groups = Groups(self)
def
yield_all_domains_records( self) -> Iterable[Tuple[cloudfloordns.models.zone.Zone, List[cloudfloordns.models.record.Record]]]:
def
all_domains_records( self) -> Dict[cloudfloordns.models.zone.Zone, List[cloudfloordns.models.record.Record]]: