Source code for useridresolver.SCIMIdResolver

# -*- coding: utf-8 -*-

#
#    LinOTP - the open source solution for two factor authentication
#    Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
#    This file is part of LinOTP userid resolvers.
#
#    This program is free software: you can redistribute it and/or
#    modify it under the terms of the GNU Affero General Public
#    License, version 3, as published by the Free Software Foundation.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the
#               GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#    E-mail: linotp@lsexperts.de
#    Contact: www.linotp.org
#    Support: www.lsexperts.de
#
""" This module implements the communication interface
                for resolving user information from a SCIM service

    status is experimental
"""

import logging
log = logging.getLogger(__name__)

from UserIdResolver import UserIdResolver
from UserIdResolver import getResolverClass

try:
    from osiam import connector
    from requests.auth import HTTPBasicAuth
    import requests
except Exception as ex:
    log.warn("Missing modules for SCIMResolver: %r" % str(ex))
    raise ex

import json


[docs]class IdResolver (UserIdResolver): @classmethod
[docs] def setup(cls, config=None, cache_dir=None): ''' this setup hook is triggered, when the server starts to serve the first request :param config: the linotp config :type config: the linotp config dict ''' log.info("Setting up the SCIMResolver") return
[docs] def close(self): return
[docs] def check_scim(self): if self.scim is None: raise Exception("SCIMResolver has no scim object set. You need to call the loadconfig before calling user functions.")
def __init__(self): self.name = "scim-default" self.auth_server = '' self.resource_server = '' self.auth_client = 'localhost' self.auth_secret = '' self.scim = None
[docs] def get_access_token(self, server=None, client=None, secret=None): res = requests.post('%s/oauth/token' % self.auth_server, auth=HTTPBasicAuth(client, secret), params={'grant_type': 'client_credentials', 'scope' : 'GET POST'}, verify=False) access_token = res.json().get('access_token') log.debug("Access Token: %r" % access_token) return access_token
[docs] def create_scim_object(self): access_token = self.get_access_token(self.auth_server, self.auth_client, self.auth_secret) self.scim = connector.SCIM(self.resource_server, access_token) return
[docs] def checkPass(self, uid, password): """ This function checks the password for a given uid. - returns true in case of success - false if password does not match TODO """ return False
[docs] def getUserInfo(self, userId): ''' returns the user information for a given uid. ''' ret = {} self.check_scim() res = self.scim.search_with_get_on_users('filter=%s eq %s' % (self.mapping.get("userid"), userId)) user = res.get("Resources", [{}])[0] ret['username'] = user.get(self.mapping.get("username")) ret['givenname'] = user.get(self.mapping.get("givenname"), "") ret['surname'] = user.get(self.mapping.get("surname"), "") ret['phone'] = user.get(self.mapping.get("phone"), "") ret['mobile'] = user.get(self.mapping.get("mobile"), "") ret['email'] = user.get(self.mapping.get("email"), "") return ret
[docs] def getUsername(self, userId): ''' returns the loginname for a given userId ''' user = self.getUserInfo(userId) return user.get("username")
[docs] def getUserId(self, LoginName): """ returns the uid for a given loginname/username """ self.check_scim() res = self.scim.search_with_get_on_users('filter=%s eq %s' % (self.mapping.get("username"), LoginName)) return res.get("Resources", [{}])[0].get(self.mapping.get("userid"))
[docs] def getUserList(self, searchDict): ''' Return the list of users ''' ret = [] ''' TODO: search dict ''' self.check_scim() res = self.scim.search_with_get_on_users("") for user in res.get("Resources"): ret_user = {} ret_user['username'] = user.get(self.mapping.get("username")) ret_user['userid'] = user.get(self.mapping.get("userid")) ret_user['surname'] = user.get(self.mapping.get("surname"), "") ret_user['givenname'] = user.get(self.mapping.get("givenname"), "") ret_user['email'] = user.get(self.mapping.get("email"), "") ret_user['phone'] = user.get(self.mapping.get("phone"), "") ret_user['mobile'] = user.get(self.mapping.get("mobile"), "") ret.append(ret_user) return ret ############################################################# # server inf methods #############################################################
[docs] def getResolverId(self): """ getResolverId(LoginName) - returns the resolver identifier string - empty string if not exist """ return self.name
[docs] def getResolverType(self): return IdResolver.getResolverClassType()
@classmethod
[docs] def getResolverClassType(cls): return 'scimresolver'
@classmethod
[docs] def getResolverClassDescriptor(cls): ''' return the descriptor of the resolver, which is - the class name and - the config description :return: resolver description dict :rtype: dict ''' descriptor = {} typ = cls.getResolverClassType() descriptor['clazz'] = "useridresolver.SCIMIdResolver.IdResolver" descriptor['config'] = {'authserver' : 'string', 'resourceserver' : 'string', 'authclient' : 'string', 'authsecret' : 'string', 'mapping' : 'string' } return {typ : descriptor}
[docs] def getResolverDescriptor(self): return IdResolver.getResolverClassDescriptor()
[docs] def getConfigEntry(self, config, key, conf, required=True): ckey = key cval = "" if conf != "" or None: ckey = ckey + "." + conf if ckey in config: cval = config[ckey] if cval == "": if key in config: cval = config[key] if cval == "" and required == True: raise Exception("missing config entry: " + key) return cval
[docs] def loadConfig(self, config, conf): """ loadConfig(configDict) The UserIdResolver could be configured from the pylon app config """ self.name = conf self.auth_server = self.getConfigEntry(config, 'linotp.scimresolver.authserver', conf) self.resource_server = self.getConfigEntry(config, 'linotp.scimresolver.resourceserver', conf) self.auth_client = self.getConfigEntry(config, 'linotp.scimresolver.client', conf) self.auth_secret = self.getConfigEntry(config, 'linotp.scimresolver.secret', conf) self.mapping = json.loads(self.getConfigEntry(config, 'linotp.scimresolver.mapping', conf)) self.create_scim_object() return
if __name__ == "__main__": print " SCIMIdResolver - IdResolver class test " y = getResolverClass("SCIMIdResolver", "IdResolver")() y.loadConfig({ 'linotp.scimresolver.authserver' : 'http://osiam:8080/osiam-auth-server', 'linotp.scimresolver.resourceserver' : 'http://osiam:8080/osiam-resource-server', 'linotp.scimresolver.secret' : '40e919e3-0834-447a-b39c-d14329c99941', 'linotp.scimresolver.client' : 'puckel', 'linotp.scimresolver.mapping' : '{ "username" : "userName" , "userid" : "id"}'}, "") print "==== the complete userlist =======" print y.getUserList({}) print "==================================" user = "marissa" loginId = y.getUserId(user) print " %s - %s" % (user , loginId) print " reId - " + y.getResolverId() ret = y.getUserInfo(loginId) print ret