Source code for useridresolver.PasswdIdResolver

# -*- coding: utf-8 -*-

#
#    LinOTP - the open source solution for two factor authentication
#    Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
#    This file is part of LinOTP userid resolvers.
#
#    This program is free software: you can redistribute it and/or
#    modify it under the terms of the GNU Affero General Public
#    License, version 3, as published by the Free Software Foundation.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the
#               GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#    E-mail: linotp@lsexperts.de
#    Contact: www.linotp.org
#    Support: www.lsexperts.de
#
"""This module implements the communication interface
   for resolvin user info to the /etc/passwd user base

PasswdIdResolver.IdResolver class
    implements the UserIdResolver for local /etc/passwd lookup

Remarks:
    Don't use this as an enterprise solution!

"""

import re
import logging


from UserIdResolver import UserIdResolver
from UserIdResolver import getResolverClass


log = logging.getLogger(__name__)
ENCODING = "utf-8"


[docs]def tokenise(r): def _(s): ret = None st = s.strip() m = re.match("^" + r, st) if m: ret = (st[:m.end()].strip(), st[m.end():].strip()) #ret[0].strip() ## remove ws #ret[1].strip() return ret return _
[docs]class IdResolver (UserIdResolver): fields = {"username": 1, "userid": 1, "description": 0, "phone": 0, "mobile": 0, "email": 0, "givenname": 0, "surname": 0, "gender": 0 } searchFields = { "username": "text", "userid": "numeric", "description": "text", "email": "text" } sF = { "username": 0, "cryptpass": 1, "userid": 2, "description": 4, "email": 4, } @classmethod
[docs] def setup(cls, config=None, cache_dir=None): ''' this setup hook is triggered, when the server starts to serve the first request :param config: the linotp config :type config: the linotp config dict ''' log.info("Setting up the PasswdResolver") return
def __init__(self): """ simple constructor """ self.name = "etc-passwd" self.fileName = "" self.name = "P" self.nameDict = {} self.descDict = {} self.reversDict = {} self.passDict = {} self.officePhoneDict = {} self.homePhoneDict = {} self.surnameDict = {} self.givennameDict = {} self.emailDict = {}
[docs] def close(self): """ request hook - to close down resolver object """ return
[docs] def loadFile(self): """ init loads the /etc/passwd user and uid as a dict for / user loginname lookup """ if (self.fileName == ""): self.fileName = "/etc/passwd" log.info('[loadFile] loading users from file %s' % (self.fileName)) fileHandle = open(self.fileName, "r") line = fileHandle.readline() ID = self.sF["userid"] NAME = self.sF["username"] PASS = self.sF["cryptpass"] DESCRIPTION = self.sF["description"] while line: line = line.strip() if len(line) == 0: continue fields = line.split(":", 7) self.nameDict["%s" % fields[NAME]] = fields[ID] ## for speed reason - build a revers lookup self.reversDict[fields[ID]] = "%s" % fields[NAME] ## for full info store the line self.descDict[fields[ID]] = fields ## store the crypted password self.passDict[fields[ID]] = fields[PASS] ## store surname, givenname and phones descriptions = fields[DESCRIPTION].split(",") name = descriptions[0] names = name.split(' ', 1) self.givennameDict[fields[ID]] = names[0] self.surnameDict[fields[ID]] = "" self.officePhoneDict[fields[ID]] = "" self.homePhoneDict[fields[ID]] = "" self.emailDict[fields[ID]] = "" if len(names) >= 2: self.surnameDict[fields[ID]] = names[1] if len(descriptions) >= 4: self.officePhoneDict[fields[ID]] = descriptions[2] self.homePhoneDict[fields[ID]] = descriptions[3] if len(descriptions) >= 5: for field in descriptions[4:]: # very basic e-mail regex email_match = re.search('.+@.+\..+', field) if email_match: self.emailDict[fields[ID]] = email_match.group(0) """ print ">>" + key[0] + "<< " + key[2] """ line = fileHandle.readline()
[docs] def checkPass(self, uid, password): """ This function checks the password for a given uid. - returns true in case of success - false if password does not match We do not support shadow passwords at the moment. so the seconds column of the passwd file needs to contain the crypted password """ import crypt log.info("[checkPass] checking password for user uid %s" % uid) cryptedpasswd = self.passDict[uid] log.debug("[checkPass] We found the crypted pass %s for uid %s" % (cryptedpasswd, uid)) if cryptedpasswd: if cryptedpasswd == 'x' or cryptedpasswd == '*': err = "Sorry, currently no support for shadow passwords" log.error("[checkPass] %s " % err) raise NotImplementedError(err) cp = crypt.crypt(password, cryptedpasswd) log.debug("[checkPass] crypted pass is %s" % cp) if crypt.crypt(password, cryptedpasswd) == cryptedpasswd: log.info("[checkPass] successfully authenticated user uid %s" % uid) return True else: log.warning("[checkPass] user uid %s failed to authenticate" % uid) return False else: log.warning("[checkPass] Failed to verify password. " "No crypted password found in file") return False
[docs] def getUserInfo(self, userId, no_passwd=False): """ get some info about the user as we only have the loginId, we have to traverse the dict for the value :param userId: the to be searched user :param no_passwd: retrun no password :return: dict of user info """ ret = {} if userId in self.reversDict: fields = self.descDict.get(userId) for key in self.sF: if no_passwd and key == "cryptpass": continue index = self.sF[key] ret[key] = fields[index] ret['givenname'] = self.givennameDict.get(userId) ret['surname'] = self.surnameDict.get(userId) ret['phone'] = self.homePhoneDict.get(userId) ret['mobile'] = self.officePhoneDict.get(userId) ret['email'] = self.emailDict.get(userId) return ret
[docs] def getUsername(self, userId): ''' ## TODO: why does this return bool :param userId: the user to be searched :return: true, if a user id exists ''' return userId in self.reversDict
[docs] def getUserId(self, LoginName): """ search the user id from the login name :param LoginName: the login of the user :return: the userId """ # We need the encoding, to be also able to read usernames # with Umlauts from files. if type(LoginName) == unicode: LoginName = LoginName.encode(ENCODING) if LoginName in self.nameDict.keys(): #log.debug("YES YES YES YES") return self.nameDict[LoginName] else: return ""
[docs] def getSearchFields(self, searchDict=None): """ show, which search fields this userIdResolver supports TODO: implementation is not completed :param searchDict: fields, which should be queried :return: dict of all searchFields """ if searchDict != None: for search in searchDict: pattern = searchDict[search] log.debug("[getSearchFields] searching for %s:%s", search, pattern) return self.searchFields
[docs] def getUserList(self, searchDict): """ get a list of all users matching the search criteria of the searchdict :param searchDict: dict of search expressions """ ret = [] ## first check if the searches are in the searchDict for l in self.descDict: line = self.descDict[l] ok = True for search in searchDict: if not search in self.searchFields: ok = False break pattern = searchDict[search] log.debug("[getUserList] searching for %s:%s", search, pattern) if search == "username": ok = self.checkUserName(line, pattern) elif search == "userid": ok = self.checkUserId(line, pattern) elif search == "description": ok = self.checkDescription(line, pattern) elif search == "email": ok = self.checkEmail(line, pattern) if ok != True: break if ok == True: uid = line[self.sF["userid"]] info = self.getUserInfo(uid, no_passwd=True) ret.append(info) return ret
[docs] def checkUserName(self, line, pattern): """ check for user name """ username = line[self.sF["username"]] ret = self.stringMatch(username, pattern) return ret
[docs] def checkDescription(self, line, pattern): description = line[self.sF["description"]] ret = self.stringMatch(description, pattern) return ret
[docs] def checkEmail(self, line, pattern): email = line[self.sF["email"]] ret = self.stringMatch(email, pattern) return ret
[docs] def stringMatch(self, cString, cPattern): ret = False e = s = "" if type(cString) == unicode: cString = cString.encode(ENCODING) if type(cPattern) == unicode: cPattern = cPattern.encode(ENCODING) string = cString.lower() pattern = cPattern.lower() if pattern.startswith("*"): e = "e" pattern = pattern[1:] if pattern.endswith("*"): s = "s" pattern = pattern[:-1] if (e == "e" and s == "s"): if string.find(pattern) != -1: return True elif (e == "e"): if string.endswith(pattern): return True elif (s == "s"): if string.startswith(pattern): return True else: if string == pattern: return True return ret
[docs] def checkUserId(self, line, pattern): """ check for the userId """ ret = False try: cUserId = int(line[self.sF["userid"]]) except: return ret (op, val) = tokenise(">=|<=|>|<|=|between")(pattern) if op == "between": (lVal, hVal) = val.split(",", 2) try: ilVal = int(lVal.strip()) ihVal = int(hVal.strip()) if ihVal < ilVal: v = ihVal ihVal = ilVal ilVal = v except: return ret if (cUserId <= ihVal and cUserId >= ilVal): ret = True else: try: ival = int(val) except: return ret if op == "=": if (cUserId == ival): ret = True elif op == ">": if (cUserId > ival): ret = True elif op == ">=": if (cUserId >= ival): ret = True elif op == "<": if (cUserId < ival): ret = True elif op == "<=": if (cUserId < ival): ret = True return ret ############################################################# # server info methods #############################################################
[docs] def getResolverId(self): """ getResolverId(LoginName) - returns the resolver identifier string - empty string if not exist """ return self.fileName
@classmethod
[docs] def getResolverClassType(cls): return 'passwdresolver'
[docs] def getResolverType(self): return IdResolver.getResolverClassType()
@classmethod
[docs] def getResolverClassDescriptor(cls): ''' return the descriptor of the resolver, which is - the class name and - the config description :return: resolver description dict :rtype: dict ''' descriptor = {} typ = cls.getResolverClassType() descriptor['clazz'] = "useridresolver.PasswdIdResolver.IdResolver" descriptor['config'] = {'fileName': 'string'} return {typ: descriptor}
[docs] def getResolverDescriptor(self): return IdResolver.getResolverClassDescriptor()
[docs] def getConfigEntry(self, config, key, conf, required=True): ckey = key cval = "" if conf != "" or None: ckey = ckey + "." + conf if ckey in config: cval = config[ckey] if cval == "": if key in config: cval = config[key] if cval == "" and required == True: raise Exception("missing config entry: " + key) return cval
[docs] def loadConfig(self, config, conf): """ loadConfig(configDict) The UserIdResolver could be configured from the pylon app config - here this could be the passwd file , whether it is /etc/passwd or /etc/shadow """ self.fileName = self.getConfigEntry(config, 'linotp.passwdresolver.fileName', conf) self.loadFile() return self
if __name__ == "__main__": print " PasswdIdResolver - IdResolver class test " y = getResolverClass("PasswdIdResolver", "IdResolver")() y.loadConfig({'linotp.passwdresolver.fileName': '/etc/passwd'}, "") x = getResolverClass("PasswdIdResolver", "IdResolver")() x.loadConfig({'linotp.passwdresolver.fileName': '/etc/meinpass'}, "") print "======/etc/meinpass==========" print x.getUserList({'username': '*', "userid": ">= 1000"}) print "======/etc/passwd==========" print y.getUserList({'username': '*', "userid": ">= 1000"}) print "================" user = "koelbel" loginId = y.getUserId(user) print " %s - %s" % (user, loginId) print " reId - " + y.getResolverId() ret = y.getUserInfo(loginId) print "result %r" % ret ret = y.getSearchFields() #ret["username"]="^bea*" search = { "userid": " between 1000, 1005", # "username":"^bea*", #"description":"*Audio*", # "descriptio":"*Winkler*", # "userid":" <=1003", } # ret = y.getUserList(search) print ret