Skip to content

Commit

Permalink
#1 finished remaining tests/implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Isaiah committed Jun 15, 2021
1 parent 309cf17 commit cb1224b
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 33 deletions.
52 changes: 46 additions & 6 deletions lowball_arangodb_authdb/authdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def _init_collection(self):
except KeyError:
self.collection = self.database.createCollection(self.collection_name, waitForSync=True)

def get_now(self):
return datetime.utcnow()

@property
def url(self):
return self._url
Expand Down Expand Up @@ -211,8 +214,7 @@ def revoke_token(self, token_id):

def revoke_all(self):

for doc in self.collection.fetchAll():
doc.delete()
self.collection.truncate()

def list_tokens(self):

Expand All @@ -229,12 +231,50 @@ def list_tokens(self):

def list_tokens_by_client_id(self, client_id):

pass
QUERY = f"""
FOR token in {self.collection_name}
FILTER token.cid == @client_id
RETURN token
"""
bind_vars = {
"client_id": client_id
}
tokens = self.collection.database.AQLQuery(QUERY, bind_vars=bind_vars)

results = []
for token in tokens:
try:
t = Token(**token.getStore())
results.append(t)
except:
token.delete()
return results

def list_tokens_by_role(self, role):

pass
QUERY = f"""
FOR token in {self.collection_name}
FILTER @role in token.r
return token
"""
bind_vars = {
"role": role
}
tokens = self.collection.database.AQLQuery(QUERY, bind_vars=bind_vars)

results = []
for token in tokens:
try:
t = Token(**token.getStore())
results.append(t)
except:
token.delete()
return results

def cleanup_tokens(self):

pass
QUERY = f"""
FOR token in {self.collection_name}
FILTER token.ets < "{str(self.get_now()).split(".")[0]}"
REMOVE token
"""
self.collection.database.AQLQuery(QUERY)
78 changes: 75 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ def __init__(self, *args, **kwargs):
class TestMockCollection(Collection):

def __init__(self, *args, **kwargs):

pass
self.database = TestMockDatabase()

class TestMockDocument(Document):

Expand Down Expand Up @@ -333,6 +332,13 @@ def mock_get_item(value):
def mock_auth_db(monkeypatch):

monkeypatch.setattr(AuthDB, "__init__", Mock(return_value=None))
AuthDB.url = "http://127.0.0.1"
AuthDB.port = 8529
AuthDB.user = "root"
AuthDB.password = None
AuthDB.verify = True
AuthDB.database_name = "lowball_authdb"
AuthDB.collection_name = "authentication_tokens"
AuthDB.collection = TestMockCollection()


Expand Down Expand Up @@ -466,7 +472,7 @@ def mock_collection_fetch_all():
monkeypatch.setattr(TestMockCollection, "fetchDocument", Mock(wraps=mock_collection_fetch_document))
monkeypatch.setattr(TestMockCollection, "__getitem__", Mock(wraps=mock_collection_getitem))
monkeypatch.setattr(TestMockCollection, "fetchAll", Mock(wraps=mock_collection_fetch_all))

monkeypatch.setattr(TestMockCollection, "truncate", Mock())

@pytest.fixture
def mock_filled_token_collection_bad_values(token_dict_map, monkeypatch, mocked_document):
Expand Down Expand Up @@ -725,3 +731,69 @@ def admin_user2_test_token2(test_token_id8, admin_user_id2, admin_role, test_rol
tid=test_token_id8
)

@pytest.fixture
def list_tokens_by_client_id_request_response(
basic_user1_test_token1,
basic_user1_test_token2,
basic_user_id1,
monkeypatch
):

response = [basic_user1_test_token1, basic_user1_test_token2]
response_documents = []
for token in response:
doc = TestMockDocument()
doc.token_json = token.to_dict()
doc.test_key = token.token_id
response_documents.append(doc)
bad_doc = TestMockDocument()
bad_doc.token_json = {
"bad_value": "not token"
}
bad_doc.test_key = "bigbaddoc"
response_documents.append(bad_doc)
monkeypatch.setattr(TestMockDatabase, "AQLQuery", Mock(return_value=response_documents))
return basic_user_id1, response

@pytest.fixture
def list_tokens_by_role_request_response(
basic_user1_test_token1,
basic_user1_test_token2,
basic_user2_test_token1,
basic_user2_test_token2,
admin_user1_test_token1,
admin_user2_test_token1,
admin_role,
test_role1,
test_role2,
monkeypatch
):

response = [admin_user1_test_token1, admin_user2_test_token1]
response_documents = []
for token in response:
doc = TestMockDocument()
doc.token_json = token.to_dict()
doc.test_key = token.token_id
response_documents.append(doc)

bad_doc = TestMockDocument()
bad_doc.token_json = {
"bad_value": "not token"
}
bad_doc.test_key = "bigbaddoc"
response_documents.append(bad_doc)
monkeypatch.setattr(TestMockDatabase, "AQLQuery", Mock(return_value=response_documents))
return admin_role, response


@pytest.fixture
def fake_utcnow(monkeypatch):
from datetime import datetime
now = datetime.utcnow()
monkeypatch.setattr(AuthDB, "get_now", Mock(return_value=now))
return now

@pytest.fixture
def simple_mock_aql_query(monkeypatch):
monkeypatch.setattr(TestMockDatabase, "AQLQuery", Mock(return_value=[]))
92 changes: 68 additions & 24 deletions tests/test_authdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def test_calls_delete_on_token_document_when_found(self,

class TestRevokeAll:

def test_calls_delete_on_each_document_in_the_database(self,
def test_calls_revoke_all_on_the_collection(self,
mock_pyarango,
mock_auth_db,
mock_filled_token_collection,
Expand All @@ -401,9 +401,7 @@ def test_calls_delete_on_each_document_in_the_database(self,

authdb = AuthDB()
assert authdb.revoke_all() is None
# 6 because that's how many tokens are in our mocked collection map
assert lowball_arangodb_authdb.authdb.Document.delete.call_count == 6

authdb.collection.truncate.assert_called_once()

class TestListTokens:

Expand Down Expand Up @@ -468,45 +466,91 @@ class TestListTokensByClientID:
"""

def test_returns_list_of_token_objects(self):
pass
QUERY = """
FOR token in {}
FILTER token.cid == @client_id
RETURN token
"""

def test_all_tokens_in_list_are_owned_by_the_specified_client_id(self):
"""Not sure we can test this properly, as the actual filtering is done on the arango side
def test_calls_query_as_expected_and_cleans_up_bad_tokens(self,
mock_pyarango,
mock_auth_db,
mock_filled_token_collection,
mock_document_delete,
list_tokens_by_client_id_request_response
):

all we can test is calling the query correctly
authdb = AuthDB()
expected_query = self.QUERY.format(authdb.collection_name)

client_id, expected_response = list_tokens_by_client_id_request_response

"""
pass
expected_bind_vars = {
"client_id": client_id
}

def test_aql_query_called_with_correct_inputs(self):
results = authdb.list_tokens_by_client_id(client_id)

pass
assert all(token in results for token in expected_response) and all(token in expected_response for token in results)
authdb.collection.database.AQLQuery.assert_called_once_with(expected_query, bind_vars=expected_bind_vars)
lowball_arangodb_authdb.authdb.Document.delete.assert_called_once()


class TestListTokensByRole:
"""This may be an aql queryable option as well, will hae to investigate
"""
def test_returns_list_of_token_objects(self):
pass

def test_all_tokens_in_list_possess_the_requested_role(self):
QUERY = """
FOR token in {}
FILTER @role in token.r
return token
"""

def test_calls_query_as_expected_and_cleans_up_bad_tokens(self,
mock_pyarango,
mock_auth_db,
mock_filled_token_collection,
mock_document_delete,
list_tokens_by_role_request_response
):
authdb = AuthDB()
expected_query = self.QUERY.format(authdb.collection_name)

pass
role, expected_response = list_tokens_by_role_request_response

def test_aql_query_called_with_correct_inputs(self):
pass
expected_bind_vars = {
"role": role
}

results = authdb.list_tokens_by_role(role)

assert all(token in results for token in expected_response) and all(
token in expected_response for token in results)
authdb.collection.database.AQLQuery.assert_called_once_with(expected_query, bind_vars=expected_bind_vars)
lowball_arangodb_authdb.authdb.Document.delete.assert_called_once()


class TestCleanupTokens:
"""i believe this is again an aql query we can do
"""
def test_calls_delete_on_all_tokens_which_are_expired(self):

pass
QUERY = """
FOR token in {}
FILTER token.ets < "{}"
REMOVE token
"""
def test_aql_query_called_with_correct_inputs(self,
fake_utcnow,
simple_mock_aql_query,
mock_pyarango,
mock_auth_db
):
authdb = AuthDB()
now = fake_utcnow
search_date = str(now).split(".")[0]
expected_query = self.QUERY.format(authdb.collection_name, search_date)

def test_aql_query_called_with_correct_inputs(self):
authdb.cleanup_tokens()

pass
authdb.collection.database.AQLQuery.assert_called_once_with(expected_query)

0 comments on commit cb1224b

Please sign in to comment.