Skip to content

Commit

Permalink
Add support for Name.com
Browse files Browse the repository at this point in the history
  • Loading branch information
giuseongit committed Apr 24, 2022
1 parent bd347b5 commit b94a5d6
Show file tree
Hide file tree
Showing 41 changed files with 9,030 additions and 0 deletions.
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ lexicon/providers/memset.py @tnwhitwell
lexicon/providers/misaka.py @misakaio @ym
lexicon/providers/mythicbeasts.py @lexitus
lexicon/providers/namecheap.py @pschmitt @rbelnap
lexicon/providers/namecom.py @Jamim
lexicon/providers/namesilo.py @analogj
lexicon/providers/netcup.py @coldfix
lexicon/providers/nfsn.py @tersers
Expand Down
210 changes: 210 additions & 0 deletions lexicon/providers/namecom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
"""Module provider for Name.com"""
from __future__ import absolute_import

import logging

from requests import HTTPError, Session
from requests.auth import HTTPBasicAuth

from lexicon.providers.base import Provider as BaseProvider

LOGGER = logging.getLogger(__name__)

NAMESERVER_DOMAINS = ['name.com']

DUPLICATE_ERROR = {
'message': 'Invalid Argument',
'details': 'Parameter Value Error - Duplicate Record'
}


def provider_parser(subparser):
"""Configure a subparser for Name.com."""

subparser.add_argument('--auth-username', help='specify a username')
subparser.add_argument('--auth-token', help='specify an API token')


class NamecomLoader(object): # pylint: disable=useless-object-inheritance,too-few-public-methods
"""Loader that handles pagination for the Name.com provider."""

def __init__(self, get, url, data_key, next_page=1):
self.get = get
self.url = url
self.data_key = data_key
self.next_page = next_page

def __iter__(self):
while self.next_page:
response = self.get(self.url, {'page': self.next_page})
for data in response[self.data_key]:
yield data
self.next_page = response.get('next_page')


class NamecomProvider(BaseProvider):
"""Provider implementation for Name.com."""

def __init__(self, config):
super(Provider, self).__init__(config)
self.api_endpoint = 'https://api.name.com/v4'
self.session = Session()

def _authenticate(self):
self.session.auth = HTTPBasicAuth(
username=self._get_provider_option('auth_username'),
password=self._get_provider_option('auth_token')
)

# checking domain existence
domain_name = self.domain
for domain in NamecomLoader(self._get, '/domains', 'domains'):
if domain['domainName'] == domain_name:
self.domain_id = domain_name
return

raise Exception('{} domain does not exist'.format(domain_name))

def _create_record(self, rtype, name, content):
data = {
'type': rtype,
'host': self._relative_name(name),
'answer': content,
'ttl': self._get_lexicon_option('ttl')
}

if rtype in ('MX', 'SRV'):
# despite the documentation says a priority is
# required for MX and SRV, it's actually optional
priority = self._get_lexicon_option('priority')
if priority:
data['priority'] = priority

url = '/domains/{}/records'.format(self.domain)
try:
record_id = self._post(url, data)['id']
except HTTPError as error:
response = error.response
if response.status_code == 400 and \
response.json() == DUPLICATE_ERROR:
LOGGER.warning(
'create_record: duplicate record has been skipped'
)
return True
raise

LOGGER.debug('create_record: record %s has been created', record_id)

return record_id

def _list_records(self, rtype=None, name=None, content=None):
url = '/domains/{}/records'.format(self.domain)
records = []

for raw in NamecomLoader(self._get, url, 'records'):
record = {
'id': raw['id'],
'type': raw['type'],
'name': raw['fqdn'][:-1],
'ttl': raw['ttl'],
'content': raw['answer'],
}
records.append(record)

LOGGER.debug('list_records: retrieved %s records', len(records))

if rtype:
records = [record for record in records if record['type'] == rtype]
if name:
name = self._full_name(name)
records = [record for record in records if record['name'] == name]
if content:
records = [record for record in records
if record['content'] == content]

LOGGER.debug('list_records: filtered %s records', len(records))

return records

def _update_record(self, identifier, rtype=None, name=None, content=None):
if not identifier:
if not (rtype and name):
raise ValueError(
'Record identifier or rtype+name must be specified'
)
records = self._list_records(rtype, name)
if not records:
raise Exception('There is no record to update')

if len(records) > 1:
filtered_records = [record for record in records
if record['content'] == content]
if filtered_records:
records = filtered_records

if len(records) > 1:
raise Exception(
'There are multiple records to update: {}'.format(
', '.join(record['id'] for record in records)
)
)

record_id = records[0]['id']
else:
record_id = identifier

data = {'ttl': self._get_lexicon_option('ttl')}

# even though the documentation says a type and an answer
# are required, they are not required actually
if rtype:
data['type'] = rtype
if name:
data['host'] = self._relative_name(name)
if content:
data['answer'] = content

url = '/domains/{}/records/{}'.format(self.domain, record_id)
record_id = self._put(url, data)['id']
logging.debug('update_record: record %s has been updated', record_id)

return record_id

def _delete_record(self, identifier=None,
rtype=None, name=None, content=None):
if not identifier:
if not (rtype and name):
raise ValueError(
'Record identifier or rtype+name must be specified'
)
records = self._list_records(rtype, name, content)
if not records:
LOGGER.warning('delete_record: there is no record to delete')
return False
record_ids = [record['id'] for record in records]
else:
record_ids = [identifier, ]

for record_id in record_ids:
url = '/domains/{}/records/{}'.format(self.domain, record_id)
self._delete(url)
LOGGER.debug(
'delete_record: record %s has been deleted', record_id
)

return True

def _get_raw_record(self, record_id):
url = '/domains/{}/records/{}'.format(self.domain, record_id)
return self._get(url)

def _request(self, action='GET', url='/', data=None, query_params=None):
response = self.session.request(method=action,
url=self.api_endpoint + url,
json=data,
params=query_params)
response.raise_for_status()
return response.json()


Provider = NamecomProvider
148 changes: 148 additions & 0 deletions lexicon/tests/providers/test_namecom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Integration tests for Name.com"""
import json
from unittest import TestCase

import pytest
from unittest.mock import ANY, Mock, patch
from requests import HTTPError

from lexicon.config import DictConfigSource
from lexicon.providers.namecom import provider_parser
from lexicon.tests.providers.integration_tests import (
IntegrationTestsV2, vcr_integration_test
)


# Hook into testing framework by inheriting unittest.TestCase and reuse
# the tests which *each and every* implementation of the interface must
# pass, by inheritance from integration_tests.IntegrationTests
class NamecomProviderTests(TestCase, IntegrationTestsV2):
"""TestCase for Name.com"""

# I don't think we really need some docstrings here.
# pylint: disable=missing-function-docstring

provider_name = 'namecom'
domain = 'mim.pw'

def _filter_headers(self):
return ['Authorization', 'Cookie']

def _filter_response(self, response):
headers = response['headers']
headers.pop('Set-Cookie', None)
headers.pop('content-length', None)

if response['status']['code'] == 200:
try:
data = json.loads(response['body']['string'].decode())
except ValueError:
pass
else:
if 'records' in data:
min_id = 10 ** 8
data['records'] = [
record for record in data['records']
if record['id'] > min_id
]
response['body']['string'] = json.dumps(data).encode()

return response

###########################
# Provider.authenticate() #
###########################
@vcr_integration_test
def test_provider_authentication_method(self):
provider = self._construct_authenticated_provider()
assert provider.session.auth

############################
# Provider.create_record() #
############################
@vcr_integration_test
def test_provider_when_calling_create_record_for_MX_with_priority(self): # pylint: disable=invalid-name
priority = 42
config = self._test_config()
config.add_config_source(DictConfigSource({'priority': priority}), 0)
provider = self.provider_module.Provider(config)
provider.authenticate()

record_id = provider.create_record('MX', 'mx.test1', self.domain)
assert provider._get_raw_record(record_id)['priority'] == priority # pylint: disable=protected-access

@vcr_integration_test
def test_provider_when_calling_create_record_for_MX_with_no_priority(self): # pylint: disable=invalid-name
provider = self._construct_authenticated_provider()
record_id = provider.create_record('MX', 'mx.test2', self.domain)
assert 'priority' not in provider._get_raw_record(record_id) # pylint: disable=protected-access

@vcr_integration_test
def test_provider_when_calling_create_record_should_fail_on_http_error(self):
provider = self._construct_authenticated_provider()
error = HTTPError(response=Mock())
with patch.object(provider, '_request', side_effect=error):
with pytest.raises(HTTPError):
provider.create_record('TXT', 'httperror', 'HTTPError')

############################
# Provider.update_record() #
############################
@vcr_integration_test
def test_provider_when_calling_update_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long
provider = self._construct_authenticated_provider()
with pytest.raises(ValueError):
provider.update_record(None)

@vcr_integration_test
def test_provider_when_calling_update_record_should_fail_if_no_record_to_update(self):
provider = self._construct_authenticated_provider()
with pytest.raises(Exception):
provider.update_record(None, 'TXT', 'missingrecord')

@vcr_integration_test
def test_provider_when_calling_update_record_should_fail_if_multiple_records_to_update(self):
provider = self._construct_authenticated_provider()
provider.create_record('TXT', 'multiple.test', 'foo')
provider.create_record('TXT', 'multiple.test', 'bar')
with pytest.raises(Exception):
provider.update_record(None, 'TXT', 'multiple.test', 'updated')

@vcr_integration_test
def test_provider_when_calling_update_record_filter_by_content_should_pass(self):
provider = self._construct_authenticated_provider()
provider.create_record('TXT', 'multiple.test', 'foo')
provider.create_record('TXT', 'multiple.test', 'bar')
assert provider.update_record(None, 'TXT', 'multiple.test', 'foo')

@vcr_integration_test
def test_provider_when_calling_update_record_by_identifier_with_no_other_args_should_pass(self):
provider = self._construct_authenticated_provider()
record_id = provider.create_record('TXT', 'update.test', 'foo')
assert provider.update_record(record_id)

############################
# Provider.delete_record() #
############################
@vcr_integration_test
def test_provider_when_calling_delete_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long
provider = self._construct_authenticated_provider()
with pytest.raises(ValueError):
provider.delete_record()

@vcr_integration_test
@patch('lexicon.providers.namecom.LOGGER.warning')
def test_provider_when_calling_delete_record_should_pass_if_no_record_to_delete(self, warning):
provider = self._construct_authenticated_provider()
provider.delete_record(None, 'TXT', 'missingrecord')
warning.assert_called_once()
assert 'no record' in warning.call_args.args[0]


def test_subparser_configuration():
"""Tests the provider_parser method."""

subparser = Mock()
provider_parser(subparser)
subparser.add_argument.assert_any_call('--auth-username', help=ANY)
subparser.add_argument.assert_any_call('--auth-token', help=ANY)
Loading

0 comments on commit b94a5d6

Please sign in to comment.