Skip to content

Commit

Permalink
Add support for Name.com
Browse files Browse the repository at this point in the history
  • Loading branch information
Jamim committed Jun 3, 2020
1 parent 57a90f2 commit 766fb98
Show file tree
Hide file tree
Showing 41 changed files with 6,211 additions and 1 deletion.
1 change: 1 addition & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ lexicon/providers/localzone.py @ags-slc
lexicon/providers/luadns.py @analogj
lexicon/providers/memset.py @tnwhitwell
lexicon/providers/namecheap.py @pschmitt @rbelnap
lexicon/providers/namecom.py @Jamim
lexicon/providers/namesilo.py @analogj
lexicon/providers/netcup.py @coldfix
lexicon/providers/nfsn.py @tersers
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The current supported providers are:
- Linode v4 ([docs](https://developers.linode.com/api/docs/v4#tag/Domains))
- LuaDNS ([docs](http://www.luadns.com/api.html))
- Memset ([docs](https://www.memset.com/apidocs/methods_dns.html))
- Name.com ([docs](https://www.name.com/api-docs/DNS))
- Namecheap ([docs](https://www.namecheap.com/support/api/methods.aspx))
- Namesilo ([docs](https://www.namesilo.com/api_reference.php))
- Netcup ([docs](https://ccp.netcup.net/run/webservice/servers/endpoint.php))
Expand Down
2 changes: 1 addition & 1 deletion lexicon/providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def authenticate(self):
Make any requests required to get the domain's id for this provider,
so it can be used in subsequent calls.
Should throw an error if authentication fails for any reason,
of if the domain does not exist.
or if the domain does not exist.
"""
return self._authenticate()

Expand Down
212 changes: 212 additions & 0 deletions lexicon/providers/namecom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
"""Module provider for Name.com"""
from __future__ import absolute_import

import logging

from requests import HTTPError, Session
from requests.auth import HTTPBasicAuth

from lexicon.providers.base import Provider as BaseProvider

LOGGER = logging.getLogger(__name__)

NAMESERVER_DOMAINS = ['name.com']

DUPLICATE_ERROR = {
'message': 'Invalid Argument',
'details': 'Parameter Value Error - Duplicate Record'
}


def provider_parser(subparser):
"""Configure a subparser for Name.com."""

subparser.add_argument('--auth-username', help='specify a username')
subparser.add_argument('--auth-token', help='specify an API token')


class NamecomLoader(object): # pylint: disable=useless-object-inheritance,too-few-public-methods
"""Loader that handles pagination for the Name.com provider."""

def __init__(self, get, url, data_key, next_page=1):
self.get = get
self.url = url
self.data_key = data_key
self.next_page = next_page

def __iter__(self):
while self.next_page:
response = self.get(self.url, {'page': self.next_page})
for data in response[self.data_key]:
yield data
self.next_page = response.get('next_page')


class NamecomProvider(BaseProvider):
"""Provider implementation for Name.com."""

def __init__(self, config):
super(Provider, self).__init__(config)
self.api_endpoint = 'https://api.name.com/v4'
self.session = Session()

def _authenticate(self):
self.session.auth = HTTPBasicAuth(
username=self._get_provider_option('auth_username'),
password=self._get_provider_option('auth_token')
)

# checking domain existence
domain_name = self.domain
for domain in NamecomLoader(self._get, '/domains', 'domains'):
if domain['domainName'] == domain_name:
self.domain_id = domain_name
return

raise Exception('{} domain does not exist'.format(domain_name))

def _create_record(self, rtype, name, content):
data = {
'type': rtype,
'host': self._relative_name(name),
'answer': content,
'ttl': self._get_lexicon_option('ttl')
}

if rtype in ('MX', 'SRV'):
# despite the documentation says a priority is
# required for MX and SRV, it's actually optional
priority = self._get_lexicon_option('priority')
if priority:
data['priority'] = priority

url = '/domains/{}/records'.format(self.domain)
try:
record_id = self._post(url, data)['id']
except HTTPError as error:
response = error.response
if response.status_code == 400 and \
response.json() == DUPLICATE_ERROR:
LOGGER.warning(
'create_record: duplicate record has been skipped'
)
return True
raise

LOGGER.debug('create_record: record %s has been created', record_id)

return record_id

def _list_records(self, rtype=None, name=None, content=None):
url = '/domains/{}/records'.format(self.domain)
records = []

for raw in NamecomLoader(self._get, url, 'records'):
record = {
'id': raw['id'],
'type': raw['type'],
'name': raw['fqdn'][:-1],
'ttl': raw['ttl'],
'content': raw['answer'],
}
records.append(record)

LOGGER.debug('list_records: retrieved %s records', len(records))

if rtype:
records = (record for record in records if record['type'] == rtype)
if name:
name = self._full_name(name)
records = (record for record in records if record['name'] == name)
if content:
records = (record for record in records
if record['content'] == content)

if not isinstance(records, list):
records = list(records)
LOGGER.debug('list_records: filtered %s records', len(records))

return records

def _update_record(self, identifier, rtype=None, name=None, content=None):
if not identifier:
if not (rtype and name):
raise ValueError(
'Record identifier or rtype+name must be specified'
)
records = self._list_records(rtype, name)
if not records:
raise Exception('There is no record to update')

if len(records) > 1:
filtered_records = [record for record in records
if record['content'] == content]
if filtered_records:
records = filtered_records

if len(records) > 1:
raise Exception(
'There are multiple records to update: {}'.format(
', '.join(record['id'] for record in records)
)
)

record_id = records[0]['id']
else:
record_id = identifier

data = {'ttl': self._get_lexicon_option('ttl')}

# even though the documentation says a type and an answer
# are required, they are not required actually
if rtype:
data['type'] = rtype
if name:
data['host'] = self._relative_name(name)
if content:
data['answer'] = content

url = '/domains/{}/records/{}'.format(self.domain, record_id)
record_id = self._put(url, data)['id']
logging.debug('update_record: record %s has been updated', record_id)

return record_id

def _delete_record(self, identifier=None,
rtype=None, name=None, content=None):
if not identifier:
if not (rtype and name):
raise ValueError(
'Record identifier or rtype+name must be specified'
)
records = self._list_records(rtype, name, content)
if not records:
LOGGER.warning('delete_record: there is no record to delete')
return None
record_ids = tuple(record['id'] for record in records)
else:
record_ids = (identifier,)

for record_id in record_ids:
url = '/domains/{}/records/{}'.format(self.domain, record_id)
self._delete(url)
LOGGER.debug(
'delete_record: record %s has been deleted', record_id
)

return record_ids if len(record_ids) > 1 else record_ids[0]

def _get_raw_record(self, record_id):
url = '/domains/{}/records/{}'.format(self.domain, record_id)
return self._get(url)

def _request(self, action='GET', url='/', data=None, query_params=None):
response = self.session.request(method=action,
url=self.api_endpoint + url,
json=data,
params=query_params)
response.raise_for_status()
return response.json()


Provider = NamecomProvider
148 changes: 148 additions & 0 deletions lexicon/tests/providers/test_namecom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Integration tests for Name.com"""
import json
from unittest import TestCase

import pytest
from mock import ANY, Mock, patch
from requests import HTTPError

from lexicon.config import DictConfigSource
from lexicon.providers.namecom import provider_parser
from lexicon.tests.providers.integration_tests import (
IntegrationTests, _vcr_integration_test
)


# Hook into testing framework by inheriting unittest.TestCase and reuse
# the tests which *each and every* implementation of the interface must
# pass, by inheritance from integration_tests.IntegrationTests
class NamecomProviderTests(TestCase, IntegrationTests):
"""TestCase for Name.com"""

# I don't think we really need some docstrings here.
# pylint: disable=missing-function-docstring

provider_name = 'namecom'
domain = 'mim.pw'

def _filter_headers(self):
return ['Authorization', 'Cookie']

def _filter_response(self, response):
headers = response['headers']
headers.pop('Set-Cookie', None)
headers.pop('content-length', None)

if response['status']['code'] == 200:
try:
data = json.loads(response['body']['string'].decode())
except ValueError:
pass
else:
if 'records' in data:
min_id = 10 ** 8
data['records'] = [
record for record in data['records']
if record['id'] > min_id
]
response['body']['string'] = json.dumps(data).encode()

return response

###########################
# Provider.authenticate() #
###########################
@_vcr_integration_test
def test_provider_authenticate(self):
provider = self._construct_authenticated_provider()
assert provider.session.auth

############################
# Provider.create_record() #
############################
@_vcr_integration_test
def test_provider_when_calling_create_record_for_MX_with_priority(self): # pylint: disable=invalid-name
priority = 42
config = self._test_config()
config.add_config_source(DictConfigSource({'priority': priority}), 0)
provider = self.provider_module.Provider(config)
provider.authenticate()

record_id = provider.create_record('MX', 'mx.test1', self.domain)
assert provider._get_raw_record(record_id)['priority'] == priority # pylint: disable=protected-access

@_vcr_integration_test
def test_provider_when_calling_create_record_for_MX_with_no_priority(self): # pylint: disable=invalid-name
provider = self._construct_authenticated_provider()
record_id = provider.create_record('MX', 'mx.test2', self.domain)
assert 'priority' not in provider._get_raw_record(record_id) # pylint: disable=protected-access

@_vcr_integration_test
def test_provider_when_calling_create_record_should_fail_on_http_error(self):
provider = self._construct_authenticated_provider()
error = HTTPError(response=Mock())
with patch.object(provider, '_request', side_effect=error):
with pytest.raises(HTTPError):
provider.create_record('TXT', 'httperror', 'HTTPError')

############################
# Provider.update_record() #
############################
@_vcr_integration_test
def test_provider_when_calling_update_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long
provider = self._construct_authenticated_provider()
with pytest.raises(ValueError):
provider.update_record(None)

@_vcr_integration_test
def test_provider_when_calling_update_record_should_fail_if_no_record_to_update(self):
provider = self._construct_authenticated_provider()
with pytest.raises(Exception):
provider.update_record(None, 'TXT', 'missingrecord')

@_vcr_integration_test
def test_provider_when_calling_update_record_should_fail_if_multiple_records_to_update(self):
provider = self._construct_authenticated_provider()
provider.create_record('TXT', 'multiple.test', 'foo')
provider.create_record('TXT', 'multiple.test', 'bar')
with pytest.raises(Exception):
provider.update_record(None, 'TXT', 'multiple.test', 'updated')

@_vcr_integration_test
def test_provider_when_calling_update_record_filter_by_content_should_pass(self):
provider = self._construct_authenticated_provider()
provider.create_record('TXT', 'multiple.test', 'foo')
provider.create_record('TXT', 'multiple.test', 'bar')
assert provider.update_record(None, 'TXT', 'multiple.test', 'foo')

@_vcr_integration_test
def test_provider_when_calling_update_record_by_identifier_with_no_other_args_should_pass(self):
provider = self._construct_authenticated_provider()
record_id = provider.create_record('TXT', 'update.test', 'foo')
assert provider.update_record(record_id)

############################
# Provider.delete_record() #
############################
@_vcr_integration_test
def test_provider_when_calling_delete_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long
provider = self._construct_authenticated_provider()
with pytest.raises(ValueError):
provider.delete_record()

@_vcr_integration_test
@patch('lexicon.providers.namecom.LOGGER.warning')
def test_provider_when_calling_delete_record_should_pass_if_no_record_to_delete(self, warning):
provider = self._construct_authenticated_provider()
provider.delete_record(None, 'TXT', 'missingrecord')
warning.assert_called_once()
assert 'no record' in warning.call_args.args[0]


def test_subparser_configuration():
"""Tests the provider_parser method."""

subparser = Mock()
provider_parser(subparser)
subparser.add_argument.assert_any_call('--auth-username', help=ANY)
subparser.add_argument.assert_any_call('--auth-token', help=ANY)
Loading

0 comments on commit 766fb98

Please sign in to comment.