Skip to content

Commit db12489

Browse files
committed
Add support for Name.com
Name.com API docs: https://www.name.com/api-docs/DNS
1 parent 964c040 commit db12489

File tree

41 files changed

+6211
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+6211
-1
lines changed

CODEOWNERS

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ lexicon/providers/localzone.py @ags-slc
5050
lexicon/providers/luadns.py @analogj
5151
lexicon/providers/memset.py @tnwhitwell
5252
lexicon/providers/namecheap.py @pschmitt @rbelnap
53+
lexicon/providers/namecom.py @Jamim
5354
lexicon/providers/namesilo.py @analogj
5455
lexicon/providers/netcup.py @coldfix
5556
lexicon/providers/nfsn.py @tersers

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ The current supported providers are:
6666
- Linode v4 ([docs](https://developers.linode.com/api/docs/v4#tag/Domains))
6767
- LuaDNS ([docs](http://www.luadns.com/api.html))
6868
- Memset ([docs](https://www.memset.com/apidocs/methods_dns.html))
69+
- Name.com ([docs](https://www.name.com/api-docs/DNS))
6970
- Namecheap ([docs](https://www.namecheap.com/support/api/methods.aspx))
7071
- Namesilo ([docs](https://www.namesilo.com/api_reference.php))
7172
- Netcup ([docs](https://ccp.netcup.net/run/webservice/servers/endpoint.php))

lexicon/providers/base.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def authenticate(self):
6464
Make any requests required to get the domain's id for this provider,
6565
so it can be used in subsequent calls.
6666
Should throw an error if authentication fails for any reason,
67-
of if the domain does not exist.
67+
or if the domain does not exist.
6868
"""
6969
return self._authenticate()
7070

lexicon/providers/namecom.py

+212
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
"""Module provider for Name.com"""
2+
from __future__ import absolute_import
3+
4+
import logging
5+
6+
from requests import HTTPError, Session
7+
from requests.auth import HTTPBasicAuth
8+
9+
from lexicon.providers.base import Provider as BaseProvider
10+
11+
LOGGER = logging.getLogger(__name__)
12+
13+
NAMESERVER_DOMAINS = ['name.com']
14+
15+
DUPLICATE_ERROR = {
16+
'message': 'Invalid Argument',
17+
'details': 'Parameter Value Error - Duplicate Record'
18+
}
19+
20+
21+
def provider_parser(subparser):
22+
"""Configure a subparser for Name.com."""
23+
24+
subparser.add_argument('--auth-username', help='specify a username')
25+
subparser.add_argument('--auth-token', help='specify an API token')
26+
27+
28+
class NamecomLoader(object): # pylint: disable=useless-object-inheritance,too-few-public-methods
29+
"""Loader that handles pagination for the Name.com provider."""
30+
31+
def __init__(self, get, url, data_key, next_page=1):
32+
self.get = get
33+
self.url = url
34+
self.data_key = data_key
35+
self.next_page = next_page
36+
37+
def __iter__(self):
38+
while self.next_page:
39+
response = self.get(self.url, {'page': self.next_page})
40+
for data in response[self.data_key]:
41+
yield data
42+
self.next_page = response.get('next_page')
43+
44+
45+
class NamecomProvider(BaseProvider):
46+
"""Provider implementation for Name.com."""
47+
48+
def __init__(self, config):
49+
super(Provider, self).__init__(config)
50+
self.api_endpoint = 'https://api.name.com/v4'
51+
self.session = Session()
52+
53+
def _authenticate(self):
54+
self.session.auth = HTTPBasicAuth(
55+
username=self._get_provider_option('auth_username'),
56+
password=self._get_provider_option('auth_token')
57+
)
58+
59+
# checking domain existence
60+
domain_name = self.domain
61+
for domain in NamecomLoader(self._get, '/domains', 'domains'):
62+
if domain['domainName'] == domain_name:
63+
self.domain_id = domain_name
64+
return
65+
66+
raise Exception('{} domain does not exist'.format(domain_name))
67+
68+
def _create_record(self, rtype, name, content):
69+
data = {
70+
'type': rtype,
71+
'host': self._relative_name(name),
72+
'answer': content,
73+
'ttl': self._get_lexicon_option('ttl')
74+
}
75+
76+
if rtype in ('MX', 'SRV'):
77+
# despite the documentation says a priority is
78+
# required for MX and SRV, it's actually optional
79+
priority = self._get_lexicon_option('priority')
80+
if priority:
81+
data['priority'] = priority
82+
83+
url = '/domains/{}/records'.format(self.domain)
84+
try:
85+
record_id = self._post(url, data)['id']
86+
except HTTPError as error:
87+
response = error.response
88+
if response.status_code == 400 and \
89+
response.json() == DUPLICATE_ERROR:
90+
LOGGER.warning(
91+
'create_record: duplicate record has been skipped'
92+
)
93+
return True
94+
raise
95+
96+
LOGGER.debug('create_record: record %s has been created', record_id)
97+
98+
return record_id
99+
100+
def _list_records(self, rtype=None, name=None, content=None):
101+
url = '/domains/{}/records'.format(self.domain)
102+
records = []
103+
104+
for raw in NamecomLoader(self._get, url, 'records'):
105+
record = {
106+
'id': raw['id'],
107+
'type': raw['type'],
108+
'name': raw['fqdn'][:-1],
109+
'ttl': raw['ttl'],
110+
'content': raw['answer'],
111+
}
112+
records.append(record)
113+
114+
LOGGER.debug('list_records: retrieved %s records', len(records))
115+
116+
if rtype:
117+
records = (record for record in records if record['type'] == rtype)
118+
if name:
119+
name = self._full_name(name)
120+
records = (record for record in records if record['name'] == name)
121+
if content:
122+
records = (record for record in records
123+
if record['content'] == content)
124+
125+
if not isinstance(records, list):
126+
records = list(records)
127+
LOGGER.debug('list_records: filtered %s records', len(records))
128+
129+
return records
130+
131+
def _update_record(self, identifier, rtype=None, name=None, content=None):
132+
if not identifier:
133+
if not (rtype and name):
134+
raise ValueError(
135+
'Record identifier or rtype+name must be specified'
136+
)
137+
records = self._list_records(rtype, name)
138+
if not records:
139+
raise Exception('There is no record to update')
140+
141+
if len(records) > 1:
142+
filtered_records = [record for record in records
143+
if record['content'] == content]
144+
if filtered_records:
145+
records = filtered_records
146+
147+
if len(records) > 1:
148+
raise Exception(
149+
'There are multiple records to update: {}'.format(
150+
', '.join(record['id'] for record in records)
151+
)
152+
)
153+
154+
record_id = records[0]['id']
155+
else:
156+
record_id = identifier
157+
158+
data = {'ttl': self._get_lexicon_option('ttl')}
159+
160+
# even though the documentation says a type and an answer
161+
# are required, they are not required actually
162+
if rtype:
163+
data['type'] = rtype
164+
if name:
165+
data['host'] = self._relative_name(name)
166+
if content:
167+
data['answer'] = content
168+
169+
url = '/domains/{}/records/{}'.format(self.domain, record_id)
170+
record_id = self._put(url, data)['id']
171+
logging.debug('update_record: record %s has been updated', record_id)
172+
173+
return record_id
174+
175+
def _delete_record(self, identifier=None,
176+
rtype=None, name=None, content=None):
177+
if not identifier:
178+
if not (rtype and name):
179+
raise ValueError(
180+
'Record identifier or rtype+name must be specified'
181+
)
182+
records = self._list_records(rtype, name, content)
183+
if not records:
184+
LOGGER.warning('delete_record: there is no record to delete')
185+
return None
186+
record_ids = tuple(record['id'] for record in records)
187+
else:
188+
record_ids = (identifier,)
189+
190+
for record_id in record_ids:
191+
url = '/domains/{}/records/{}'.format(self.domain, record_id)
192+
self._delete(url)
193+
LOGGER.debug(
194+
'delete_record: record %s has been deleted', record_id
195+
)
196+
197+
return record_ids if len(record_ids) > 1 else record_ids[0]
198+
199+
def _get_raw_record(self, record_id):
200+
url = '/domains/{}/records/{}'.format(self.domain, record_id)
201+
return self._get(url)
202+
203+
def _request(self, action='GET', url='/', data=None, query_params=None):
204+
response = self.session.request(method=action,
205+
url=self.api_endpoint + url,
206+
json=data,
207+
params=query_params)
208+
response.raise_for_status()
209+
return response.json()
210+
211+
212+
Provider = NamecomProvider
+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
"""Integration tests for Name.com"""
2+
import json
3+
from unittest import TestCase
4+
5+
import pytest
6+
from mock import ANY, Mock, patch
7+
from requests import HTTPError
8+
9+
from lexicon.config import DictConfigSource
10+
from lexicon.providers.namecom import provider_parser
11+
from lexicon.tests.providers.integration_tests import (
12+
IntegrationTests, _vcr_integration_test
13+
)
14+
15+
16+
# Hook into testing framework by inheriting unittest.TestCase and reuse
17+
# the tests which *each and every* implementation of the interface must
18+
# pass, by inheritance from integration_tests.IntegrationTests
19+
class NamecomProviderTests(TestCase, IntegrationTests):
20+
"""TestCase for Name.com"""
21+
22+
# I don't think we really need some docstrings here.
23+
# pylint: disable=missing-function-docstring
24+
25+
provider_name = 'namecom'
26+
domain = 'mim.pw'
27+
28+
def _filter_headers(self):
29+
return ['Authorization', 'Cookie']
30+
31+
def _filter_response(self, response):
32+
headers = response['headers']
33+
headers.pop('Set-Cookie', None)
34+
headers.pop('content-length', None)
35+
36+
if response['status']['code'] == 200:
37+
try:
38+
data = json.loads(response['body']['string'].decode())
39+
except ValueError:
40+
pass
41+
else:
42+
if 'records' in data:
43+
min_id = 10 ** 8
44+
data['records'] = [
45+
record for record in data['records']
46+
if record['id'] > min_id
47+
]
48+
response['body']['string'] = json.dumps(data).encode()
49+
50+
return response
51+
52+
###########################
53+
# Provider.authenticate() #
54+
###########################
55+
@_vcr_integration_test
56+
def test_provider_authenticate(self):
57+
provider = self._construct_authenticated_provider()
58+
assert provider.session.auth
59+
60+
############################
61+
# Provider.create_record() #
62+
############################
63+
@_vcr_integration_test
64+
def test_provider_when_calling_create_record_for_MX_with_priority(self): # pylint: disable=invalid-name
65+
priority = 42
66+
config = self._test_config()
67+
config.add_config_source(DictConfigSource({'priority': priority}), 0)
68+
provider = self.provider_module.Provider(config)
69+
provider.authenticate()
70+
71+
record_id = provider.create_record('MX', 'mx.test1', self.domain)
72+
assert provider._get_raw_record(record_id)['priority'] == priority # pylint: disable=protected-access
73+
74+
@_vcr_integration_test
75+
def test_provider_when_calling_create_record_for_MX_with_no_priority(self): # pylint: disable=invalid-name
76+
provider = self._construct_authenticated_provider()
77+
record_id = provider.create_record('MX', 'mx.test2', self.domain)
78+
assert 'priority' not in provider._get_raw_record(record_id) # pylint: disable=protected-access
79+
80+
@_vcr_integration_test
81+
def test_provider_when_calling_create_record_should_fail_on_http_error(self):
82+
provider = self._construct_authenticated_provider()
83+
error = HTTPError(response=Mock())
84+
with patch.object(provider, '_request', side_effect=error):
85+
with pytest.raises(HTTPError):
86+
provider.create_record('TXT', 'httperror', 'HTTPError')
87+
88+
############################
89+
# Provider.update_record() #
90+
############################
91+
@_vcr_integration_test
92+
def test_provider_when_calling_update_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long
93+
provider = self._construct_authenticated_provider()
94+
with pytest.raises(ValueError):
95+
provider.update_record(None)
96+
97+
@_vcr_integration_test
98+
def test_provider_when_calling_update_record_should_fail_if_no_record_to_update(self):
99+
provider = self._construct_authenticated_provider()
100+
with pytest.raises(Exception):
101+
provider.update_record(None, 'TXT', 'missingrecord')
102+
103+
@_vcr_integration_test
104+
def test_provider_when_calling_update_record_should_fail_if_multiple_records_to_update(self):
105+
provider = self._construct_authenticated_provider()
106+
provider.create_record('TXT', 'multiple.test', 'foo')
107+
provider.create_record('TXT', 'multiple.test', 'bar')
108+
with pytest.raises(Exception):
109+
provider.update_record(None, 'TXT', 'multiple.test', 'updated')
110+
111+
@_vcr_integration_test
112+
def test_provider_when_calling_update_record_filter_by_content_should_pass(self):
113+
provider = self._construct_authenticated_provider()
114+
provider.create_record('TXT', 'multiple.test', 'foo')
115+
provider.create_record('TXT', 'multiple.test', 'bar')
116+
assert provider.update_record(None, 'TXT', 'multiple.test', 'foo')
117+
118+
@_vcr_integration_test
119+
def test_provider_when_calling_update_record_by_identifier_with_no_other_args_should_pass(self):
120+
provider = self._construct_authenticated_provider()
121+
record_id = provider.create_record('TXT', 'update.test', 'foo')
122+
assert provider.update_record(record_id)
123+
124+
############################
125+
# Provider.delete_record() #
126+
############################
127+
@_vcr_integration_test
128+
def test_provider_when_calling_delete_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long
129+
provider = self._construct_authenticated_provider()
130+
with pytest.raises(ValueError):
131+
provider.delete_record()
132+
133+
@_vcr_integration_test
134+
@patch('lexicon.providers.namecom.LOGGER.warning')
135+
def test_provider_when_calling_delete_record_should_pass_if_no_record_to_delete(self, warning):
136+
provider = self._construct_authenticated_provider()
137+
provider.delete_record(None, 'TXT', 'missingrecord')
138+
warning.assert_called_once()
139+
assert 'no record' in warning.call_args.args[0]
140+
141+
142+
def test_subparser_configuration():
143+
"""Tests the provider_parser method."""
144+
145+
subparser = Mock()
146+
provider_parser(subparser)
147+
subparser.add_argument.assert_any_call('--auth-username', help=ANY)
148+
subparser.add_argument.assert_any_call('--auth-token', help=ANY)

0 commit comments

Comments
 (0)