diff --git a/modules/miscutil/lib/Makefile.am b/modules/miscutil/lib/Makefile.am index 2d094d2711..3e162418bf 100644 --- a/modules/miscutil/lib/Makefile.am +++ b/modules/miscutil/lib/Makefile.am @@ -33,6 +33,8 @@ pylib_DATA = __init__.py \ dbquery_regression_tests.py \ dataciteutils.py \ dataciteutils_tester.py \ + ldap_cern.py \ + ldap_cern_unit_tests.py \ logicutils.py \ logicutils_unit_tests.py \ mailutils.py \ diff --git a/modules/miscutil/lib/ldap_cern.py b/modules/miscutil/lib/ldap_cern.py new file mode 100644 index 0000000000..7a8299df7a --- /dev/null +++ b/modules/miscutil/lib/ldap_cern.py @@ -0,0 +1,132 @@ +# This file is part of Invenio. +# Copyright (C) 2009, 2010, 2011, 2014, 2015, 2016 CERN. +# +# Invenio is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +"""Invenio LDAP interface for CERN.""" + +from thread import get_ident + +import ldap + +from ldap.controls import SimplePagedResultsControl + + +CFG_CERN_LDAP_URI = "ldap://xldap.cern.ch:389" +CFG_CERN_LDAP_BASE = "OU=Users,OU=Organic Units,DC=cern,DC=ch" +CFG_CERN_LDAP_PAGESIZE = 250 + +_ldap_connection_pool = {} + + +class LDAPError(Exception): + + """Base class for exceptions in this module.""" + + pass + + +def _cern_ldap_login(): + """Get a connection from _ldap_connection_pool or create a new one.""" + try: + connection = _ldap_connection_pool[get_ident()] + except KeyError: + connection = _ldap_connection_pool[get_ident()] + _ldap_connection_pool[get_ident()] = ldap.initialize(CFG_CERN_LDAP_URI) + + connection.set_option(ldap.OPT_PROTOCOL_VERSION, 3) + return connection + + +def _msgid(connection, req_ctrl, query_filter, attr_list=None): + """Run the search request using search_ext. + + :param string query_filter: filter to apply in the LDAP search + :param list attr_list: retrieved LDAP attributes. If None, all attributes + are returned + :return: msgid + """ + try: + return connection.search_ext( + CFG_CERN_LDAP_BASE, + ldap.SCOPE_SUBTREE, + query_filter, + attr_list, + attrsonly=0, + serverctrls=[req_ctrl]) + except ldap.SERVER_DOWN as e: + raise LDAPError("Error: Connection to CERN LDAP failed. ({0})" + .format(e)) + + +def _paged_search(connection, query_filter, attr_list=None): + """Search the CERN LDAP server using pagination. + + :param string query_filter: filter to apply in the LDAP search + :param list attr_list: retrieved LDAP attributes. If None, all attributes + are returned + :return: list of tuples (result-type, result-data) or empty list, + where result-data contains the user dictionary + """ + req_ctrl = SimplePagedResultsControl(True, CFG_CERN_LDAP_PAGESIZE, "") + msgid = _msgid(connection, req_ctrl, query_filter, attr_list) + result_pages = 0 + results = [] + + while True: + rtype, rdata, rmsgid, rctrls = connection.result3(msgid) + results.extend(rdata) + result_pages += 1 + + pctrls = [ + c + for c in rctrls + if c.controlType == SimplePagedResultsControl.controlType + ] + if pctrls: + if pctrls[0].cookie: + req_ctrl.cookie = pctrls[0].cookie + msgid = _msgid(connection, req_ctrl, + query_filter, attr_list) + else: + break + + return results + + +def get_users_records_data(query_filter, attr_list=None, decode_encoding=None): + """Get result-data of records. + + :param string query_filter: filter to apply in the LDAP search + :param list attr_list: retrieved LDAP attributes. If None, all attributes + are returned + :param string decode_encoding: decode the values of the LDAP records + :return: list of LDAP records, but result-data only + """ + connection = _cern_ldap_login() + records = _paged_search(connection, query_filter, attr_list) + + records_data = [] + + if decode_encoding: + records_data = [ + dict( + (k, [v[0].decode(decode_encoding)]) for (k, v) in x.iteritems() + ) + for (dummy, x) in records] + else: + records_data = [x for (dummy, x) in records] + + return records_data diff --git a/modules/miscutil/lib/ldap_cern_unit_tests.py b/modules/miscutil/lib/ldap_cern_unit_tests.py new file mode 100644 index 0000000000..5c7e5b4512 --- /dev/null +++ b/modules/miscutil/lib/ldap_cern_unit_tests.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2008, 2009, 2010, 2011, 2013, 2014, 2015 CERN. +# +# Invenio is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +"""Unit tests for the solrutils library.""" + +from invenio.ldap_cern import ( + get_users_info_by_displayName, get_users_info_by_displayName_or_email, + get_users_records_data) +from invenio.testutils import InvenioTestCase +from invenio.testutils import make_test_suite, run_test_suite + + +class TestLDAPGetUserInfo(InvenioTestCase): + + """Test for retrieving users information from LDAP at CERN.""" + + def test_no_user(self): + """Try to get user that doesn't exists.""" + username = "John Nonexisting" + expected_info = [] + self.assertEqual( + get_users_info_by_displayName(username), expected_info) + self.assertEqual( + get_users_info_by_displayName_or_email(username), expected_info) + + def test_single_user(self): + """Try to get a specific user (requires a user from CERN).""" + username = "Tibor Simko" + expected_results = 1 + expected_displayName = "Tibor Simko" + expected_email = "Tibor.Simko@cern.ch" + expected_affiliation = "CERN" + ldap_info = get_users_info_by_displayName(username) + ldap_info2 = get_users_info_by_displayName_or_email(username) + self.assertEqual(ldap_info, ldap_info2) + self.assertEqual(len(ldap_info), expected_results) + self.assertEqual( + ldap_info[0][1].get('displayName', [])[0], expected_displayName) + self.assertEqual( + ldap_info[0][1].get('mail', [])[0], expected_email) + self.assertEqual( + ldap_info[0][1].get( + 'cernInstituteName', [])[0], expected_affiliation) + + def test_users_records_data(self): + """Try to get a specific user data (requires a user from CERN).""" + searchfilter = (r"(&(objectClass=*)(employeeType=Primary)" + "(mail=Tibor.Simko@cern.ch))") + attrlist = ["mail", "displayName", "cernInstituteName"] + expected_results = 1 + expected_displayName = "Tibor Simko" + expected_email = "Tibor.Simko@cern.ch" + expected_affiliation = "CERN" + records = get_users_records_data(searchfilter, attrlist) + self.assertEqual(len(records), expected_results) + self.assertEqual( + records[0].get("displayName", [])[0], expected_displayName) + self.assertEqual( + records[0].get("mail", [])[0], expected_email) + self.assertEqual( + records[0].get("cernInstituteName", [])[0], expected_affiliation) + +TEST_SUITE = make_test_suite(TestLDAPGetUserInfo) + +if __name__ == "__main__": + run_test_suite(TEST_SUITE)