# -*- coding: utf-8 -*-
#
# LinOTP - the open source solution for two factor authentication
# Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
# This file is part of LinOTP server.
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public
# License, version 3, as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the
# GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
# E-mail: linotp@lsexperts.de
# Contact: www.linotp.org
# Support: www.lsexperts.de
#
""" contains several token api functions"""
import traceback
import string
import datetime
import sys
import re
import binascii
import os
import logging
try:
import json
except ImportError:
import simplejson as json
from sqlalchemy import or_, and_, not_
from sqlalchemy import func
from pylons import tmpl_context as c
from pylons.i18n.translation import _
from pylons import config
from linotp.lib.error import TokenAdminError
from linotp.lib.error import UserError
from linotp.lib.error import ParameterError
from linotp.lib.user import getUserId, getUserInfo
from linotp.lib.user import User, getUserRealms
from linotp.lib.user import check_user_password
from linotp.lib.util import getParam
from linotp.lib.util import generate_password
from linotp.lib.util import modhex_decode
from linotp.lib.realm import realm2Objects
from linotp.lib.realm import getRealms
import linotp
import linotp.lib.policy
from linotp import model
from linotp.model import Token, createToken, Realm, TokenRealm
from linotp.model.meta import Session
from linotp.model import Challenge
from linotp.lib.config import getFromConfig
from linotp.lib.resolver import getResolverObject
from linotp.lib.realm import createDBRealm, getRealmObject
from linotp.lib.realm import getDefaultRealm
from linotp.lib.util import get_client
log = logging.getLogger(__name__)
optional = True
required = False
ENCODING = "utf-8"
###############################################
[docs]def createTokenClassObject(token, typ=None):
'''
createTokenClassObject - create a token class object from a given type
:param token: the database refeneced token
:type token: database token
:param typ: type of to be created token
:type typ: string
:return: instance of the token class object
:rtype: token class object
'''
# if type is not given, we take it out of the token database object
if (typ is None):
typ = token.LinOtpTokenType
if typ == "":
typ = "hmac"
typ = typ.lower()
tok = None
# search which tokenclass should be created and create it!
tokenclasses = config['tokenclasses']
if tokenclasses.has_key(typ.lower()):
try:
token_class = tokenclasses.get(typ)
tok = newToken(token_class)(token)
except Exception as e:
log.debug('createTokenClassObject failed!')
raise TokenAdminError("createTokenClassObject failed: %r" % e, id=1609)
else:
log.error('[createTokenClassObject] typ %r not found in tokenclasses: %r' %
(typ, tokenclasses))
#
## we try to use the parent class, which is able to handle most of the
## administrative tasks. This will allow to unassigen and disable or delete
## this 'abandoned token'
#
from linotp.lib.tokenclass import TokenClass
tok = TokenClass(token)
log.error("[createTokenClassObject] failed: unknown token type %r. \
Using fallback 'TokenClass' for %r" % (typ, token))
return tok
[docs]def newToken(token_class):
'''
newTokenClass - return a token class, which could be used as a constructor
:param token_class: string representation of the token class name
:type token_class: string
:return: token class
:rtype: token class
'''
ret = ""
attribute = ""
## prepare the lookup
parts = token_class.split('.')
package_name = '.'.join(parts[:-1])
class_name = parts[-1]
if sys.modules.has_key(package_name):
mod = sys.modules.get(package_name)
else:
mod = __import__(package_name, globals(), locals(), [class_name])
try:
klass = getattr(mod, class_name)
attrs = ["getType", "checkOtp"]
for att in attrs:
attribute = att
getattr(klass, att)
ret = klass
except:
raise NameError(
"IdResolver AttributeError: " + package_name + "." + class_name
+ " instance has no attribute '" + attribute + "'")
return ret
[docs]def get_token_type_list():
'''
get_token_type_list - returns the list of the available tokentypes like hmac, spass, totp...
:return: list of token types
:rtype : list
'''
try:
# from linotp.lib.config import getGlobalObject
tokenclasses = config['tokenclasses']
except Exception as e:
log.debug('get_token_type_list failed!')
raise TokenAdminError("get_token_type_list failed: %r" % e, id=1611)
token_type_list = tokenclasses.keys()
return token_type_list
[docs]def getRealms4Token(user, tokenrealm=None):
'''
get the realm objects of a user or from the tokenrealm defintion,
which could be a list of realms or a single realm
- helper method to enhance the code readablility
:param user: the user wich defines the set of realms
:param tokenrealm: a string or a list of realm strings
:return: the list of realm objects
'''
realms = []
if user is not None and user.login != "" :
## the getUserRealms should return the default realm if realm was empty
realms = getUserRealms(user)
## hack: sometimes the realm of the user is not in the
## realmDB - so check and add
for r in realms:
realmObj = getRealmObject(name=r)
if realmObj is None:
createDBRealm(r)
if tokenrealm is not None:
# tokenrealm can either be a string or a list
log.debug("[getRealms4Token] tokenrealm given (%r). We will add the "
"new token to this realm" % tokenrealm)
if isinstance(tokenrealm, str):
log.debug("[getRealms4Token] String: adding realm: %r" % tokenrealm)
realms.append(tokenrealm)
elif isinstance(tokenrealm, list):
for tr in tokenrealm:
log.debug("[getRealms4Token] List: adding realm: %r" % tr)
realms.append(tr)
realmList = realm2Objects(realms)
return realmList
[docs]def get_tokenserial_of_transaction(transId):
'''
get the serial number of a token from a challenge state / transaction
:param transId: the state / transaction id
:return: the serial number or None
'''
serial = None
challenges = Session.query(Challenge)\
.filter(Challenge.transid == u'' + transId).all()
if len(challenges) == 0:
log.info('no challenge found for tranId %r' % (transId))
return None
elif len(challenges) > 1:
log.info('multiple challenges found for tranId %r' % (transId))
return None
serial = challenges[0].tokenserial
return serial
[docs]def initToken(param, user, tokenrealm=None):
'''
initToken - create a new token or update a token
:param param: the list of provided parameters
in the list the serialnumber is required,
the token type default ist hmac
:param user: the token owner
:param tokenrealm: the realms, to which the token belongs
:return: tuple of success and token object
'''
log.debug("[initToken] begin. create token with param %r for user %r and\
tokenrealm %r" % (param, user, tokenrealm))
token = None
tokenObj = None
typ = getParam(param, "type", optional)
if typ is None:
typ = "hmac"
#serial = getParam(param, "serial", required)
serial = param.get('serial', None)
if serial is None:
prefix = param.get('prefix', None)
serial = genSerial(typ, prefix)
# if a token was initialized for a user, the param "realm" might be contained.
# otherwise - without a user the param tokenrealm could be contained.
log.debug("[initToken] initilizing token %r for user %r " % (serial, user.login))
## create a list of the found db tokens - no token class objects
toks = getTokens4UserOrSerial(None, serial, _class=False)
tokenNum = len(toks)
tokenclasses = config['tokenclasses']
if tokenNum == 0: ## create new a one token
## check if this token is in the list of available tokens
if not tokenclasses.has_key(typ.lower()):
log.error('[initToken] typ %r not found in tokenclasses: %r' %
(typ, tokenclasses))
raise TokenAdminError("[initToken] failed: unknown token type %r" % typ , id=1610)
token = createToken(serial)
elif tokenNum == 1: # update if already there
token = toks[0]
# prevent from changing the token type
old_typ = token.LinOtpTokenType
if old_typ.lower() != typ.lower():
msg = 'token %r already exist with type %r. Can not initialize token with new type %r' % (serial, old_typ, typ)
log.error('[initToken] %s' % msg)
raise TokenAdminError("initToken failed: %s" % msg)
## prevent update of an unsupported token type
if not tokenclasses.has_key(typ.lower()):
log.error('[initToken] typ %r not found in tokenclasses: %r' %
(typ, tokenclasses))
raise TokenAdminError("[initToken] failed: unknown token type %r" % typ , id=1610)
else: ## something wrong
if tokenNum > 1:
raise TokenAdminError("multiple tokens found - cannot init!", id=1101)
else:
raise TokenAdminError("cannot init! Unknown error!", id=1102)
## if there is a realm as parameter, we assign the token to this realm
if 'realm' in param:
## if we get a undefined tokenrealm , we create a list
if tokenrealm is None:
tokenrealm = []
## if we get a tokenrealm as string, we make an array out of this
elif type(tokenrealm) in [str, unicode]:
tokenrealm = [tokenrealm]
## and append our parameter realm
tokenrealm.append(param.get('realm'))
## get the RealmObjects of the user and the tokenrealms
realms = getRealms4Token(user, tokenrealm)
token.setRealms(realms)
## on behalf of the type, the class is created
tokenObj = createTokenClassObject(token, typ)
if tokenNum == 0:
## if this token is a newly created one, we have to setup the defaults,
## which lateron might be overwritten by the tokenObj.update(params)
tokenObj.setDefaults()
tokenObj.update(param)
if user is not None and user.login != "" :
tokenObj.setUser(user, report=True)
try:
token.storeToken()
except Exception as e:
log.error('[initToken] token create failed!')
log.error("[initToken] %r" % (traceback.format_exc()))
raise TokenAdminError("token create failed %r" % e, id=1112)
log.debug("[initToken] end. created tokenObject %r and returning status %r "
% (True, tokenObj))
return (True, tokenObj)
[docs]def getRolloutToken4User(user=None, serial=None, tok_type=u'ocra'):
if (user is None or user.isEmpty()) and serial is None:
return None
serials = []
tokens = []
if user is not None and user.isEmpty() == False:
resolverUid = user.resolverUid
v = None
k = None
for k in resolverUid:
v = resolverUid.get(k)
user_id = v
# in the database could be tokens of ResolverClass:
# useridresolver. or useridresolveree.
# so we have to make sure
# - there is no 'useridresolveree' in the searchterm and
# - there is a wildcard search: second replace
# Remark: when the token is loaded the response to the
# resolver class is adjusted
user_resolver = k.replace('useridresolveree.', 'useridresolver.')
user_resolver = user_resolver.replace('useridresolver.', 'useridresolver%.')
''' coout tokens: 0 1 or more '''
tokens = Session.query(Token).filter(Token.LinOtpTokenType == unicode(tok_type))\
.filter(Token.LinOtpIdResClass.like(unicode(user_resolver)))\
.filter(Token.LinOtpUserid == unicode(user_id))
elif serial is not None:
tokens = Session.query(Token).filter(Token.LinOtpTokenType == unicode(tok_type))\
.filter(Token.LinOtpTokenSerialnumber == unicode(serial))
for token in tokens:
info = token.LinOtpTokenInfo
if len(info) > 0:
tinfo = json.loads(info)
rollout = tinfo.get('rollout', None)
if rollout is not None:
serials.append(token.LinOtpTokenSerialnumber)
if len(serials) > 1:
raise Exception('multiple tokens found in rollout state: %s'
% unicode(serials))
if len(serials) == 1:
serial = serials[0]
return serial
[docs]def setRealms(serial, realmList):
# set the tokenlist of DB tokens
tokenList = getTokens4UserOrSerial(None, serial, _class=False)
if len(tokenList) == 0:
log.error("[setRealms] No token with serial %r found." % serial)
raise TokenAdminError("setRealms failed. No token with serial %s found"
% serial, id=1119)
realmObjList = realm2Objects(realmList)
for token in tokenList:
token.setRealms(realmObjList)
return len(tokenList)
[docs]def getTokenRealms(serial):
'''
This function returns a list of the realms of a token
'''
tokenList = getTokens4UserOrSerial(None, serial, _class=False)
if len(tokenList) == 0:
log.error("[getTokenRealms] No token with serial %r found." % serial)
raise TokenAdminError("getTokenRealms failed. No token with serial %s found" % serial, id=1119)
token = tokenList[0]
return token.getRealmNames()
[docs]def getRealmsOfTokenOrUser(token):
'''
This returns the realms of either the token or
of the user of the token.
'''
serial = token.getSerial()
realms = getTokenRealms(serial)
if len(realms) == 0:
uid, resolver, resolverClass = token.getUser()
log.debug("[getRealmsOfTokenOrUser] %r, %r, %r" % (uid, resolver, resolverClass))
# No realm and no User, this is the case in /validate/check_s
if resolver.find('.') >= 0:
resotype, resoname = resolver.rsplit('.', 1)
realms = getUserRealms(User("dummy_user", "", resoname))
log.debug("[getRealmsOfTokenOrUser] the token %r "
"is in the following realms: %r" % (serial, realms))
return realms
[docs]def getTokenInRealm(realm, active=True):
'''
This returns the number of tokens in one realm.
You can either query only active token or also disabled tokens.
'''
if active:
sqlQuery = Session.query(TokenRealm, Realm, Token).filter(and_(
TokenRealm.realm_id == Realm.id,
Realm.name == u'' + realm,
Token.LinOtpIsactive == True,
TokenRealm.token_id == Token.LinOtpTokenId)).count()
else:
sqlQuery = Session.query(TokenRealm, Realm).filter(and_(
TokenRealm.realm_id == Realm.id,
Realm.name == realm)).count()
return sqlQuery
[docs]def getTokenNumResolver(resolver=None, active=True):
'''
This returns the number of the (active) tokens
if no resolver is passed, the overall token number is returned,
if a resolver is passed, the token number within this resolver is returned
if active is set to false, ALL tokens are returned
'''
if resolver is None:
if active:
sqlQuery = Session.query(Token).filter(Token.LinOtpIsactive == True).count()
else:
sqlQuery = Session.query(Token).count()
return sqlQuery
else:
# in the database could be tokens of ResolverClass:
# useridresolver. or useridresolveree.
# so we have to make sure
# - there is no 'useridresolveree' in the searchterm and
# - there is a wildcard search: second replace
# Remark: when the token is loaded the response to the
# resolver class is adjusted
resolver = resolver.resplace('useridresolveree.', 'useridresolver.')
resolver = resolver.resplace('useridresolver.', 'useridresolver%.')
if active:
sqlQuery = Session.query(Token).filter(and_(Token.LinOtpIdResClass.like(resolver), Token.LinOtpIsactive == True)).count()
else:
sqlQuery = Session.query(Token).filter(Token.LinOtpIdResClass.like(resolver)).count()
return sqlQuery
[docs]def getAllTokenUsers():
'''
return a list of all users
'''
users = {}
sqlQuery = Session.query(Token)
for token in sqlQuery:
userInfo = {}
log.debug("[getAllTokenUsers] user serial (serial): %r" % token.LinOtpTokenSerialnumber)
serial = token.LinOtpTokenSerialnumber
userId = token.LinOtpUserid
resolver = token.LinOtpIdResolver
resolverC = token.LinOtpIdResClass
if len(userId) > 0 and len(resolver) > 0:
userInfo = getUserInfo(userId, resolver, resolverC)
if len(userId) > 0 and len(userInfo) == 0:
userInfo['username'] = u'/:no user info:/'
if len(userInfo) > 0:
users[serial] = userInfo
return users
[docs]def getTokens4UserOrSerial(user=None, serial=None, forUpdate=False, _class=True):
tokenList = []
tokenCList = []
tok = None
if serial is None and user is None:
log.warning("[getTokens4UserOrSerial] missing user or serial")
return tokenList
if (serial is not None):
log.debug("[getTokens4UserOrSerial] getting token object "
"with serial: %r" % serial)
## SAWarning of non unicode type
serial = linotp.lib.crypt.uencode(serial)
sqlQuery = Session.query(Token).filter(
Token.LinOtpTokenSerialnumber == serial)
if forUpdate == True:
sqlQuery = Session.query(Token).with_lockmode("update").filter(
Token.LinOtpTokenSerialnumber == serial)
#for token in Session.query(Token).filter(Token.LinOtpTokenSerialnumber == serial):
for token in sqlQuery:
log.debug("[getTokens4UserOrSerial] user "
"serial (serial): %r" % token.LinOtpTokenSerialnumber)
tokenList.append(token)
if user is not None:
log.debug("[getTokens4UserOrSerial] getting token object 4 user: %r" % user)
if (user.isEmpty() == False):
# the upper layer will catch / at least should
(uid, resolver, resolverClass) = getUserId(user)
# in the database could be tokens of ResolverClass:
# useridresolver. or useridresolveree.
# so we have to make sure
# - there is no 'useridresolveree' in the searchterm and
# - there is a wildcard search: second replace
# Remark: when the token is loaded the response to the
# resolver class is adjusted
resolverClass = resolverClass.replace('useridresolveree.', 'useridresolver.')
resolverClass = resolverClass.replace('useridresolver.', 'useridresolver%.')
sqlQuery = Session.query(model.Token).filter(
model.Token.LinOtpUserid == uid).filter(
model.Token.LinOtpIdResClass.like(resolverClass))
if forUpdate == True:
sqlQuery = Session.query(model.Token).with_lockmode("update").filter(
model.Token.LinOtpUserid == uid).filter(
model.Token.LinOtpIdResClass.like(resolverClass))
for token in sqlQuery:
log.debug("[getTokens4UserOrSerial] user serial (user): %r"
% token.LinOtpTokenSerialnumber)
tokenList.append(token)
if _class == True:
for tok in tokenList:
tokenCList.append(createTokenClassObject(tok))
return tokenCList
else:
return tokenList
[docs]def getTokensOfType(typ=None, realm=None, assigned=None):
'''
This function returns a list of token objects of the following type.
here we need to create the token list.
1. all types (if typ==None)
2. realms
3. assigned or unassigned tokens (1/0)
TODO: rename function to "getTokens"
'''
tokenList = []
log.debug("[getTokensOfType] searching tokens type=%r, realm=%r, assigned=%r" % (typ, realm, assigned))
sqlQuery = Session.query(Token)
if typ is not None:
# filter for type
sqlQuery = sqlQuery.filter(func.lower(Token.LinOtpTokenType) == typ.lower())
if assigned is not None:
# filter if assigned or not
if "0" == unicode(assigned):
sqlQuery = sqlQuery.filter(or_(Token.LinOtpUserid == None,
Token.LinOtpUserid == ""))
elif "1" == unicode(assigned):
sqlQuery = sqlQuery.filter(func.length(Token.LinOtpUserid) > 0)
else:
log.warning("[getTokensOfType] assigned value not in [0,1] %r" % assigned)
if realm is not None:
# filter for the realm
sqlQuery = sqlQuery.filter(and_(func.lower(Realm.name) == realm.lower(),
TokenRealm.realm_id == Realm.id,
TokenRealm.token_id == Token.LinOtpTokenId)).distinct()
for token in sqlQuery:
log.debug("[getTokensOfType] adding token with serial %r" % token.LinOtpTokenSerialnumber)
# the token is the database object, but we want an instance of the tokenclass!
tokenList.append(createTokenClassObject(token))
return tokenList
[docs]def setDefaults(token):
## set the defaults
token.LinOtpOtpLen = int(getFromConfig("DefaultOtpLen", 6))
token.LinOtpCountWindow = int(getFromConfig("DefaultCountWindow", 15))
token.LinOtpMaxFail = int(getFromConfig("DefaultMaxFailCount", 15))
token.LinOtpSyncWindow = int(getFromConfig("DefaultSyncWindow", 1000))
token.LinOtpTokenType = u"HMAC"
[docs]def isTokenOwner(serial, user):
ret = False
userid = ""
idResolver = ""
idResolverClass = ""
log.debug("[isTokenOwner] entering function isTokenOwner")
if user is not None and (user.isEmpty() == False):
# the upper layer will catch / at least should
(userid, idResolver, idResolverClass) = getUserId(user)
if len(userid) + len(idResolver) + len(idResolverClass) == 0:
log.info("[isTokenOwner] no user found %r", user.login)
raise TokenAdminError("no user found %s" % user.login, id=1104)
toks = getTokens4UserOrSerial(None, serial)
if len(toks) > 1:
log.info("[isTokenOwner] multiple tokens found for user %r" % user.login)
raise TokenAdminError("multiple tokens found!", id=1101)
if len(toks) == 0:
log.info("[isTokenOwner] no tokens found for user %r", user.login)
raise TokenAdminError("no token found!", id=1102)
token = toks[0]
(uuserid, uidResolver, uidResolverClass) = token.getUser()
if uidResolver == idResolver:
if uidResolverClass == idResolverClass:
if uuserid == userid:
ret = True
return ret
[docs]def tokenExist(serial):
'''
returns true if the token exists
'''
log.debug("[tokenExist] checking if Token %r exists" % serial)
if serial:
toks = getTokens4UserOrSerial(None, serial)
return (len(toks) > 0)
else:
# If we have no serial we return false anyway!
log.debug("[tokenExist] returning false anyway")
return False
[docs]def hasOwner(serial):
'''
returns true if the token is owned by any user
'''
ret = False
log.debug('[hasOwner] entering function hasOwner()')
toks = getTokens4UserOrSerial(None, serial)
if len(toks) > 1:
log.info("[hasOwner] multiple tokens found with serial %r" % serial)
raise TokenAdminError("multiple tokens found!", id=1101)
if len(toks) == 0:
log.info("[hasOwner] no token found with serial %r" % serial)
raise TokenAdminError("no token found!", id=1102)
token = toks[0]
(uuserid, uidResolver, uidResolverClass) = token.getUser()
if len(uuserid) + len(uidResolver) + len(uidResolverClass) > 0:
ret = True
return ret
[docs]def getTokenOwner(serial):
'''
returns the user object, to which the token is assigned.
the token is idetified and retirved by it's serial number
:param serial: serial number of the token
:return: user object
'''
log.debug("[getTokenOwner] getting token owner for serial: %r" % serial)
token = None
toks = getTokens4UserOrSerial(None, serial)
if len(toks) > 0:
token = toks[0]
user = get_token_owner(token)
return user
[docs]def get_token_owner(token):
"""
provide the owner as a user object for a given tokenclass obj
:param token: tokenclass object
:return: user object
"""
user = User()
if token is None:
## for backward compatibility, we return here an empty user
return user
serial = token.getSerial()
log.debug("[get_token_owner] token found: %r" % token)
uid, resolver, resolverClass = token.getUser()
userInfo = getUserInfo(uid, resolver, resolverClass)
log.debug("[get_token_owner] got the owner %r, %r, %r"
% (uid, resolver, resolverClass))
realms = getUserRealms(User(uid, "", resolverClass.split(".")[-1]))
log.debug("[get_token_owner] got this realms: %r" % realms)
# if there are several realms, than we need to find out, which one!
if len(realms) > 1:
t_realms = getTokenRealms(serial)
common_realms = list(set(realms).intersection(t_realms))
if len(common_realms) > 1:
raise Exception(_("get_token_owner: The user %s/%s and the token"
" %s is located in several realms: "
"%s!" % (uid, resolverClass, serial, common_realms)))
realm = common_realms[0]
elif len(realms) == 0:
raise Exception(_("get_token_owner: The user %s in the resolver"
" %s for token %s could not be found in any "
"realm!" % (uid, resolverClass, serial)))
else:
realm = realms[0]
user.realm = realm
user.login = userInfo.get('username')
user.conf = resolverClass
log.debug("[get_token_owner] found the user %r and the realm %r as "
"owner of token %r" % (user.login, user.realm, serial))
return user
[docs]def getTokenType(serial):
'''
Returns the tokentype of a given serial number
:param serial: the serial number of the to be searched token
'''
log.debug("[getTokenType] getting token type for serial: %r" % serial)
toks = getTokens4UserOrSerial(None, serial, _class=False)
typ = ""
for tok in toks:
typ = tok.LinOtpTokenType
log.debug("[getTokenType] the token is of type: %r" % typ)
return typ
[docs]def check_serial(serial):
'''
This checks, if a serial number is already contained.
The function returns a tuple:
(result, new_serial)
If the serial is already contained a new, modified serial new_serial is returned.
result: bool: True if the serial does not already exist.
'''
# serial does not exist, yet
result = True
new_serial = serial
log.debug("[check_serial] check if token %r already exists" % serial)
i = 0
while len(getTokens4UserOrSerial(None, new_serial)) > 0:
# as long as we find a token, modify the serial:
i = i + 1
result = False
new_serial = "%s_%02i" % (serial, i)
return (result, new_serial)
[docs]def auto_assignToken(passw, user, pin="", param=None):
'''
This function is called to auto_assign a token, when the
user enters an OTP value of an not assigned token.
'''
ret = False
auto = False
if param is None:
param = {}
# Fixme: circle dependency
try:
auto, _otplen = linotp.lib.policy.get_autoassignment(user)
except Exception as e:
log.error("[auto_assignToken] %r" % e)
# check if autoassignment is configured
if not auto:
log.debug("[auto_assignToken] not autoassigment configured")
return False
# check if user has a token
# TODO: this may dependend on a policy definition
tokens = getTokens4UserOrSerial(user, "")
if len(tokens) > 0:
log.debug("[auto_assignToken] no auto_assigment for user %r@%r. He "
"already has some tokens." % (user.login, user.realm))
return False
matching_token_count = 0
token = None
pin = ""
## get all tokens of the users realm, which are not assigned
tokens = getTokensOfType(typ=None, realm=user.realm, assigned="0")
for token in tokens:
(res, pin, otp) = token.splitPinPass(passw)
if res >= 0:
r = token.check_otp_exist(otp=otp,
window=token.getOtpCountWindow())
if r >= 0:
matching_token_count += 1
if matching_token_count != 1:
log.warning("[auto_assignToken] %d tokens with "
"the given OTP value found.", matching_token_count)
return False
success = check_user_password(user.login, user.realm, pin)
if success is None:
log.error("[auto_assignToken] User %r@%r failed to authenticate against userstore" % (user.login, user.realm))
return False
serial = token.getSerial()
log.debug("[auto_assignToken] found serial number: %r" % serial)
# should the password of the autoassignement be used as pin??
if True == linotp.lib.policy.ignore_autoassignment_pin(user):
pin = None
# if found, assign the found token to the user.login
try:
assignToken(serial, user, pin)
c.audit['serial'] = serial
c.audit['info'] = "Token auto assigned"
c.audit['token_type'] = token.getType()
ret = True
except Exception as e:
log.error("[auto_assignToken] Failed to assign token: %r" % e)
return False
return ret
[docs]def assignToken(serial, user, pin, param=None):
'''
assignToken - used to assign and to unassign token
'''
if param is None:
param = {}
log.debug('[assignToken] entering function assignToken()')
toks = getTokens4UserOrSerial(None, serial)
#toks = Session.query(Token).filter(Token.LinOtpTokenSerialnumber == serial)
if len(toks) > 1:
log.warning("[assignToken] multiple tokens found with serial: %r" % serial)
raise TokenAdminError("multiple tokens found!", id=1101)
if len(toks) == 0:
log.warning("[assignToken] no tokens found with serial: %r" % serial)
raise TokenAdminError("no token found!", id=1102)
token = toks[0]
if (user.login == ""):
report = False
else:
report = True
token.setUser(user, report)
## set the Realms of the Token
realms = getRealms4Token(user)
token.setRealms(realms)
if pin is not None:
token.setPin(pin, param)
## reset the OtpCounter
token.setFailCount(0)
try:
token.storeToken()
except Exception as e:
log.error('[assign Token] update Token DB failed')
raise TokenAdminError("Token assign failed for %s/%s : %r"
% (user.login, serial, e), id=1105)
log.debug("[assignToken] successfully assigned token with serial "
"%r to user %r" % (serial, user.login))
return True
[docs]def unassignToken(serial, user, pin):
'''
unassignToken - used to assign and to unassign token
'''
log.debug('[unassignToken] entering function unassignToken()')
toks = getTokens4UserOrSerial(None, serial)
#toks = Session.query(Token).filter(Token.LinOtpTokenSerialnumber == serial)
if len(toks) > 1:
log.warning("[unassignToken] multiple tokens found with serial: %r" % serial)
raise TokenAdminError("multiple tokens found!", id=1101)
if len(toks) == 0:
log.warning("[unassignToken] no tokens found with serial: %r" % serial)
raise TokenAdminError("no token found!", id=1102)
token = toks[0]
u = User('', '', '')
token.setUser(u, True)
token.setPin(pin)
## reset the OtpCounter
token.setFailCount(0)
try:
token.storeToken()
except Exception as e:
log.error('[unassignToken] update token DB failed')
raise TokenAdminError("Token assign failed for %r/%r: %r"
% (user, serial, e), id=1105)
log.debug("[unassignToken] successfully unassigned token with serial %r"
% serial)
return True
[docs]def checkSerialPass(serial, passw, options=None, user=None):
'''
This function checks the otp for a given serial
@attention: the parameter user must be set, as the pin policy==1 will verify the user pin
'''
log.debug("[checkSerialPass] checking for serial %r"
% (serial))
tokenList = getTokens4UserOrSerial(None, serial, forUpdate=True)
if passw is None:
## other than zero or one token should not happen, as serial is unique
if len(tokenList) == 1:
theToken = tokenList[0]
tok = theToken.token
realms = tok.getRealmNames()
if realms is None or len(realms) == 0:
realm = getDefaultRealm()
elif len(realms) > 0:
realm = realms[0]
userInfo = getUserInfo(tok.LinOtpUserid, tok.LinOtpIdResolver, tok.LinOtpIdResClass)
user = User(login=userInfo.get('username'), realm=realm)
if theToken.is_challenge_request(passw, user, options=options):
(res, opt) = linotp.lib.validate.create_challenge(tokenList[0], options)
else:
raise ParameterError("Missing parameter: pass", id=905)
else:
raise Exception('No token found: unable to create challenge for %s' % serial)
else:
log.debug("[checkSerialPass] checking len(pass)=%r for serial %r"
% (len(passw), serial))
(res, opt) = checkTokenList(tokenList, passw, user=user, options=options)
return (res, opt)
[docs]def checkYubikeyPass(passw):
'''
Checks the password of a yubikey in Yubico mode (44,48), where the first
12 or 16 characters are the tokenid
:param passw: The password that consist of the static yubikey prefix and the otp
:type passw: string
:return: True/False and the User-Object of the token owner
:rtype: dict
'''
opt = None
res = False
tokenList = []
# strip the yubico OTP and the PIN
modhex_serial = passw[:-32][-16:]
try:
serialnum = "UBAM" + modhex_decode(modhex_serial)
except TypeError as exx:
log.error("Failed to convert serialnumber: %r" % exx)
return res, opt
## build list of possible yubikey tokens
serials = [serialnum]
for i in range(1, 3):
serials.append("%s_%s" % (serialnum, i))
for serial in serials:
tokens = getTokens4UserOrSerial(serial=serial)
tokenList.extend(tokens)
if len(tokenList) == 0:
c.audit['action_detail'] = ("The serial %s could not be found!"
% serialnum)
return res, opt
## FIXME if the Token has set a PIN and the User does not want to enter the PIN
## for authentication, we need to do something different here...
## and avoid PIN checking in __checkToken.
## We could pass an "option" to __checkToken.
(res, opt) = checkTokenList(tokenList, passw)
# Now we need to get the user
if res is not False and 'serial' in c.audit:
serial = c.audit.get('serial', None)
if serial is not None:
user = getTokenOwner(serial)
c.audit['user'] = user.login
c.audit['realm'] = user.realm
opt = {}
opt['user'] = user.login
opt['realm'] = user.realm
return res, opt
[docs]def checkUserPass(user, passw, options=None):
'''
:param user: the to be identified user
:param passw: the identifiaction pass
:param options: optional parameters, which are provided
to the token checkOTP / checkPass
:return: tuple of True/False and optional information
'''
log.debug("[checkUserPass] entering function checkUserPass(%r)" % (user.login))
# the upper layer will catch / at least should ;-)
opt = None
serial = None
resolverClass = None
uid = None
if user is not None and (user.isEmpty() == False):
# the upper layer will catch / at least should
try:
(uid, resolver, resolverClass) = getUserId(user)
except:
passOnNoUser = "PassOnUserNotFound"
passOn = getFromConfig(passOnNoUser, False)
if False != passOn and "true" == passOn.lower():
c.audit['action_detail'] = "authenticated by PassOnUserNotFound"
return (True, opt)
else:
c.audit['action_detail'] = "User not found"
return (False, opt)
tokenList = getTokens4UserOrSerial(user, serial, forUpdate=True)
if len(tokenList) == 0:
c.audit['action_detail'] = "User has no tokens assigned"
# here we check if we should to autoassign and try to do it
log.debug("[checkUserPass] about to check auto_assigning")
auto_assign_return = auto_assignToken(passw, user)
if auto_assign_return == True:
# We can not check the token, as the OTP value is already used!
# But we will authenticate the user....
return (True, opt)
passOnNoToken = "PassOnUserNoToken"
passOn = getFromConfig(passOnNoToken, False)
if passOn != False and "true" == passOn.lower():
c.audit['action_detail'] = "authenticated by PassOnUserNoToken"
return (True, opt)
## Check if there is an authentication policy passthru
from linotp.lib.policy import get_auth_passthru
if get_auth_passthru(user):
log.debug("[checkUserPass] user %r has no token. Checking for "
"passthru in realm %r" % (user.login, user.realm))
y = getResolverObject(resolverClass)
c.audit['action_detail'] = "Authenticated against Resolver"
if y.checkPass(uid, passw):
return (True, opt)
## Check if there is an authentication policy passOnNoToken
from linotp.lib.policy import get_auth_passOnNoToken
if get_auth_passOnNoToken(user):
log.info("[checkUserPass] user %r has not token. PassOnNoToken set - authenticated!")
c.audit['action_detail'] = "Authenticated by passOnNoToken policy"
return (True, opt)
return (False, opt)
if passw is None:
raise ParameterError(u"Missing parameter:pass", id=905)
(res, opt) = checkTokenList(tokenList, passw, user, options=options)
log.debug("[checkUserPass] return of __checkTokenList: %r " % (res,))
return (res, opt)
[docs]def checkTokenList(tokenList, passw, user=User(), options=None):
'''
identify a matching token and test, if the token is valid, locked ..
This function is called by checkSerialPass and checkUserPass to
:param tokenList: list of identified tokens
:param passw: the provided passw (mostly pin+otp)
:param user: the identified use - as class object
:param option: additonal parameters, which are passed to the token
:return: tuple of boolean and optional response
'''
log.debug("[__checkTokenList] checking tokenlist: %r" % (tokenList))
res = False
reply = None
tokenclasses = config['tokenclasses']
## add the user to the options, so that every token
## could see the user
if options:
options['user'] = user
else:
options = { 'user' : user }
b = getFromConfig("FailCounterIncOnFalsePin", "False")
b = b.lower()
## if there has been one token in challenge mode, we only handle challenges
challenge_tokens = []
pinMatchingTokenList = []
invalidTokenlist = []
validTokenList = []
auditList = []
chall_reply = None
import linotp.lib.policy
pin_policies = linotp.lib.policy.get_pin_policies(user) or []
for token in tokenList:
audit = {}
audit['serial'] = token.getSerial()
audit['token_type'] = token.getType()
audit['weight'] = 0
log.debug("[__checkTokenList] Found user with loginId %r: %r:\n"
% (token.getUserId(), token.getSerial()))
## check if the token is the list of supported tokens
## if not skip to the next token in list
typ = token.getType()
if not tokenclasses.has_key(typ.lower()):
log.error('[initToken] typ %r not found in tokenclasses: %r' %
(typ, tokenclasses))
continue
## now check if the token is in the same realm as the user
if user is not None:
t_realms = token.token.getRealmNames()
u_realm = user.getRealm()
if (len(t_realms) > 0 and len(u_realm) > 0 and
u_realm.lower() not in t_realms) :
continue
tok_va = linotp.lib.validate.ValidateToken(token, context=c)
## in case of a failure during checking token, we log the error and
## continue with the next one
try:
(ret, reply) = tok_va.checkToken(passw, user, options=options)
except Exception as exx:
log.error("checking token %r failed: %r" % (token, exx))
ret = -1
(cToken, pToken, iToken, vToken) = tok_va.get_verification_result()
## if we have a challenge, preserve the challenge response
if len(cToken) > 0:
challenge_tokens.extend(cToken)
chall_reply = reply
audit['action_detail'] = 'challenge created'
audit['weight'] = 20
# this means, the resolver password was wrong
if len(pToken) == 1 and 1 in pin_policies:
audit['action_detail'] = "wrong user password %r" % (ret)
audit['weight'] = 10
elif len(iToken) == 1: ## this means the pin is wrong
## check, if we should increment
# do not overwrite other error details!
audit['action_detail'] = "wrong otp pin %r" % (ret)
audit['weight'] = 15
if b == "true":
# We do not have a complete list of all invalid tokens, if
# FailCounterIncOnFalsePin is False!
# So we need the auditList!
invalidTokenlist.extend(iToken)
elif len(pToken) == 1 : ## pin matches but the otp is wrong
pinMatchingTokenList.extend(pToken)
audit['action_detail'] = "wrong otp value"
audit['weight'] = 25
#any valid otp increments, independend of the tokens state !!
elif len(vToken) > 0:
audit['weight'] = 30
matchinCounter = ret
#any valid otp increments, independend of the tokens state !!
token.incOtpCounter(matchinCounter)
if (token.isActive() == True):
if token.getFailCount() < token.getMaxFailCount():
if token.check_auth_counter():
if token.check_validity_period():
token.inc_count_auth()
token.inc_count_auth_success()
validTokenList.extend(vToken)
else:
audit['action_detail'] = "validity period mismatch"
else:
audit['action_detail'] = "Authentication counter exceeded"
else:
audit['action_detail'] = "Failcounter exceeded"
else:
audit['action_detail'] = "Token inactive"
# add the audit information to the auditList
auditList.append(audit)
# compose one audit entry from all token audit information
if len(auditList) > 0:
# sort the list for the value of the key "weight"
sortedAuditList = sorted(auditList, key=lambda audit_entry: audit_entry.get("weight", 0))
highest_audit = sortedAuditList[-1]
c.audit['action_detail'] = highest_audit.get('action_detail', '')
# check how many highest_audit values entries exist!
highest_list = filter(lambda audit_entry: audit_entry.get("weight", 0) == highest_audit.get("weight", 0), sortedAuditList)
if len(highest_list) == 1:
c.audit['serial'] = highest_audit.get('serial', '')
c.audit['token_type'] = highest_audit.get('token_type', '')
else:
# multiple tokens that might contain "wrong otp value" or "wrong otp pin"
c.audit['serial'] = ''
c.audit['token_type'] = ''
## handle the processing of challenge tokens
if len(challenge_tokens) == 1:
challenge_token = challenge_tokens[0]
(_res, reply) = linotp.lib.validate.create_challenge(challenge_token, options=options)
return (False, reply)
elif len(challenge_tokens) > 1:
raise Exception("processing of multiple challenges is not supported!")
log.debug("[checkTokenList] Number of valid tokens found "
"(validTokenNum): %d" % len(validTokenList))
res = finish_check_TokenList(validTokenList, pinMatchingTokenList,
invalidTokenlist, user)
return (res, reply)
[docs]def finish_check_TokenList(validTokenList, pinMatchingTokenList,
invalidTokenlist, user):
validTokenNum = len(validTokenList)
if validTokenNum > 1:
c.audit['action_detail'] = "Multiple token found!"
if user:
log.error("[__checkTokenList] multiple token match error: "
"Several Tokens matching with the same OTP PIN and OTP "
"for user %r. Not sure how to authenticate", user.login)
raise UserError("multiple token match error", id= -33)
##return jsonError(-36,"multiple token match error",0)
elif validTokenNum == 1:
token = validTokenList[0]
if user:
log.info("[__checkTokenList] user %r@%r successfully authenticated."
% (user.login, user.realm))
else:
log.info("[__checkTokenList] serial %r successfully authenticated."
% c.audit.get('serial'))
token.statusValidationSuccess()
return True
elif validTokenNum == 0:
if user:
log.warning("[__checkTokenList] user %r@%r failed to authenticate."
% (user.login, user.realm))
else:
log.warning("[__checkTokenList] serial %r failed to authenticate."
% c.audit.get('serial'))
pinMatching = False;
# check, if there have been some tokens
# where the pin matched (but OTP failed
# and increment only these
for tok in pinMatchingTokenList:
tok.incOtpFailCounter()
tok.statusValidationFail()
tok.inc_count_auth()
pinMatching = True
if pinMatching == False:
for tok in invalidTokenlist:
tok.incOtpFailCounter()
tok.statusValidationFail()
return False
[docs]def get_multi_otp(serial, count=0, epoch_start=0, epoch_end=0, curTime=None):
'''
This function returns a list of OTP values for the given Token.
Please note, that this controller needs to be activated and
that the tokentype needs to support this function.
method
get_multi_otp - get the list of OTP values
parameter
serial - the serial number of the token
count - number of the <count> next otp values (to be used with event or timebased tokens)
epoch_start - unix time start date (used with timebased tokens)
epoch_end - unix time end date (used with timebased tokens)
curTime - used for selftest
return
dictionary of otp values
'''
ret = {"result" : False}
log.debug("[get_multi_otp] retrieving OTP values for token %r" % serial)
toks = getTokens4UserOrSerial(None, serial)
if len(toks) > 1:
log.error("[get_multi_otp] multiple tokens with serial %r found - cannot get OTP!" % serial)
raise TokenAdminError("multiple tokens found - cannot get OTP!", id=1201)
if len(toks) == 0:
log.warning("[getOTP] there is no token with serial %r" % serial)
ret["error"] = "No Token with serial %s found." % serial
if len(toks) == 1:
token = toks[0]
log.debug("[get_multi_otp] getting multiple otp values for token %r. curTime=%r" % (token, curTime))
# if the token does not support getting the OTP value, res==False is returned
(res, error, otp_dict) = token.get_multi_otp(count=count, epoch_start=epoch_start, epoch_end=epoch_end, curTime=curTime)
log.debug("[get_multi_otp] received %r, %r, %r" % (res, error, otp_dict))
if res == True:
ret = otp_dict
ret["result"] = True
else:
ret["error"] = error
return ret
[docs]def getOtp(serial, curTime=None):
'''
This function returns the current OTP value for a given Token.
Please note, that this controller needs to be activated and
that the tokentype needs to support this function.
method
getOtp - get the current OTP value
parameter
serial - serialnumber for token
curTime - used for self test
return
tuple with (res, pin, otpval, passw)
'''
log.debug("[getOtp] retrieving OTP value for token %r" % serial)
toks = getTokens4UserOrSerial(None, serial)
if len(toks) > 1:
raise TokenAdminError("multiple tokens found - cannot get OTP!", id=1101)
if len(toks) == 0:
log.warning("[getOTP] there is no token with serial %r" % serial)
return (-1, "", "", "")
if len(toks) == 1:
token = toks[0]
# if the token does not support getting the OTP value, a -2 is returned.
return token.getOtp(curTime=curTime)
[docs]def get_token_by_otp(token_list=None, otp="", window=10, typ=u"HMAC", realm=None, assigned=None):
'''
method
get_token_by_otp - from the given token list this function returns
the token, that generates the given OTP value
:param token_list: - the list of token objects to be investigated
:param otpval: - the otp value, that needs to be found
:param window: - the window of search
:param assigned: - or unassigned tokens (1/0)
:return: returns the token object.
'''
result_token = None
resultList = []
log.debug("[get_token_by_otp] entering function. Searching for otp=%r" % otp)
if token_list is None:
token_list = getTokensOfType(typ, realm, assigned)
for token in token_list:
log.debug("[get_token_by_otp] checking token %r" % token.getSerial())
r = token.check_otp_exist(otp=otp, window=window)
log.debug("[get_token_by_otp] result = %d" % int(r))
if r >= 0:
resultList.append(token)
if len(resultList) == 1:
result_token = resultList[0]
elif len(resultList) > 1:
raise TokenAdminError("get_token_by_otp: multiple tokens are matching this OTP value!", id=1200)
return result_token
[docs]def get_serial_by_otp(token_list=None, otp="", window=10, typ=None, realm=None, assigned=None):
'''
Returns the serial for a given OTP value and the user
(serial, user)
:param otp: - the otp value to be searched
:param window: - how many OTPs should be calculated per token
:param typ: - The tokentype
:param realm: - The realm in which to search for the token
:param assigned: - search either in assigned (1) or not assigend (0) tokens
:return: the serial for a given OTP value and the user
'''
serial = ""
username = ""
resolverClass = ""
token = get_token_by_otp(token_list, otp, window, typ, realm, assigned)
if token is not None:
serial = token.getSerial()
uid, resolver, resolverClass = token.getUser()
userInfo = getUserInfo(uid, resolver, resolverClass)
log.debug("[get_serial_by_otp] userinfo for token: %r" % userInfo)
username = userInfo.get("username", "")
return serial, username, resolverClass
[docs]def removeToken(user=None, serial=None):
if (user is None or user.isEmpty() == True) and (serial is None):
log.warning("[removeToken] Parameter user or serial required!")
raise ParameterError("Parameter user or serial required!", id=1212)
log.debug("[removeToken] for serial: %r, user: %r" % (serial, user))
tokenList = getTokens4UserOrSerial(user, serial,
forUpdate=True, _class=False)
serials = []
tokens = []
token_ids = []
try:
for token in tokenList:
ser = token.getSerial()
serials.append(ser)
token_ids.append(token.LinOtpTokenId)
tokens.append(token)
## we cleanup the challenges
challenges = set()
for serial in serials:
serial = linotp.lib.crypt.uencode(serial)
challenges.update(linotp.lib.validate.get_challenges(serial=serial))
for chall in challenges:
Session.delete(chall)
## due to legacy SQLAlchemy it could happen that the
## foreign key relation could not be deleted
## so we do this manualy
for t_id in token_ids:
Session.query(TokenRealm).filter(TokenRealm.token_id == t_id).delete()
Session.commit()
for token in tokens:
Session.delete(token)
except Exception as e:
log.error('[removeToken] update token DB failed')
raise TokenAdminError("removeToken: Token update failed: %r" % e, id=1132)
return len(tokenList)
[docs]def setMaxFailCount(maxFail, user, serial):
if (user is None) and (serial is None):
log.warning("[setMaxFailCount] Parameter user or serial required!")
raise ParameterError("Parameter user or serial required!", id=1212)
log.debug("[setMaxFailCount] for serial: %r, user: %r" % (serial, user))
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.setMaxFail(maxFail)
return len(tokenList)
[docs]def enableToken(enable, user, serial):
if (user is None) and (serial is None):
log.warning("[enableToken] parameter serial or user missing.")
raise ParameterError("Parameter user or serial required!", id=1212)
log.debug("[enableToken] enable=%r, user=%r, serial=%r" % (enable, user, serial))
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.enable(enable)
return len(tokenList)
[docs]def copyTokenPin(serial_from, serial_to):
'''
This function copies the token PIN from one token to the other token.
This can be used for workflows like lost token.
In fact the PinHash and the PinSeed need to be transferred
returns:
1 : success
-1: no source token
-2: no destination token
'''
log.debug("[copyTokenPin] copying PIN from token %r to token %r" % (serial_from, serial_to))
tokens_from = getTokens4UserOrSerial(None, serial_from)
tokens_to = getTokens4UserOrSerial(None, serial_to)
if len(tokens_from) != 1:
log.error("[copyTokenPin] not a unique token to copy from found")
return -1
if len(tokens_to) != 1:
log.error("[copyTokenPin] not a unique token to copy to found")
return -2
pinhash, seed = tokens_from[0].getPinHashSeed()
tokens_to[0].setPinHashSeed(pinhash, seed)
return 1
[docs]def copyTokenUser(serial_from, serial_to):
'''
This function copies the user from one token to the other
This can be used for workflows like lost token
returns:
1: success
-1: no source token
-2: no destination token
'''
log.debug("[copyTokenUser] copying user from token %r to token %r" % (serial_from, serial_to))
tokens_from = getTokens4UserOrSerial(None, serial_from)
tokens_to = getTokens4UserOrSerial(None, serial_to)
if len(tokens_from) != 1:
log.error("[copyTokenUser] not a unique token to copy from found")
return -1
if len(tokens_to) != 1:
log.error("[copyTokenUser] not a unique token to copy to found")
return -2
uid, ures, resclass = tokens_from[0].getUser()
tokens_to[0].setUid(uid, ures, resclass)
copyTokenRealms(serial_from, serial_to)
return 1
[docs]def copyTokenRealms(serial_from, serial_to):
realmlist = getTokenRealms(serial_from)
setRealms(serial_to, realmlist)
[docs]def losttoken(serial, new_serial="", password="", default_validity=0):
"""
This is the workflow to handle a lost token
:param serial: Token serial number
:param new_serial: new serial number
:param password: new password
:param default_validity: set the token to be valid
:return: result dictionary
"""
res = {}
if new_serial == "":
new_serial = "lost%s" % serial
user = getTokenOwner(serial)
log.error("[losttoken] doing lost token for serial %r and user %r@%r"
% (serial, user.login, user.realm))
if user.login == "" or user.login is None:
err = "You can only define a lost token for an assigned token."
log.warning("[losttoken] %s" % err)
raise Exception(err)
pol = linotp.lib.policy.get_client_policy(get_client(),
scope="enrollment", realm=user.realm,
user=user.login, userObj=user)
pw_len = linotp.lib.policy.getPolicyActionValue(pol, "lostTokenPWLen")
validity = linotp.lib.policy.getPolicyActionValue(pol, "lostTokenValid",
max=False)
contents = linotp.lib.policy.getPolicyActionValue(pol,
"lostTokenPWContents", String=True)
log.debug("losttoken: length: %r, "
"validity: %r, contents: %r" % (pw_len, validity, contents))
if validity == -1:
validity = 10
if 0 != default_validity:
validity = default_validity
if pw_len == -1:
pw_len = 10
character_pool = "%s%s%s" % (string.ascii_lowercase,
string.ascii_uppercase, string.digits)
if contents != "":
character_pool = ""
if "c" in contents:
character_pool += string.ascii_lowercase
if "C" in contents:
character_pool += string.ascii_uppercase
if "n" in contents:
character_pool += string.digits
if "s" in contents:
character_pool += "!#$%&()*+,-./:;<=>?@[]^_"
if password == "":
password = generate_password(size=pw_len, characters=character_pool)
res['serial'] = new_serial
(ret, tokenObj) = initToken({ "otpkey" : password,
"serial" : new_serial,
"type" : "pw",
"description" : "temporary replacement for %s" % serial
}, User('', '', ''))
res['init'] = ret
if True == ret:
res['user' ] = copyTokenUser(serial, new_serial)
res['pin'] = copyTokenPin(serial, new_serial)
# set validity period
end_date = (datetime.date.today()
+ datetime.timedelta(days=validity)).strftime("%d/%m/%y")
end_date = "%s 23:59" % end_date
tokens = getTokens4UserOrSerial(User('', '', ''), new_serial)
for tok in tokens:
tok.set_validity_period_end(end_date)
# fill results
res['valid_to'] = "xxxx"
res['password'] = password
res['end_date'] = end_date
# disable token
res['disable'] = enableToken(False, User('', '', ''), serial)
return res
[docs]def setPin(pin, user, serial, param=None):
'''
set the PIN
'''
if param is None:
param = {}
log.debug("[setPin] calling setPin.")
if (user is None) and (serial is None):
log.warning("[setPin] Parameter user or serial required!")
raise ParameterError("Parameter user or serial required!", id=1212)
if (user is not None):
log.info("[setPin] setting Pin for user %r@%r" % (user.login, user.realm))
if (serial is not None):
log.info("[setPin] setting Pin for token with serial %r" % serial)
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.setPin(pin, param)
return len(tokenList)
[docs]def setOtpLen(otplen, user, serial):
if (user is None) and (serial is None):
log.warning("[setOtpLen] Parameter user or serial required!")
raise ParameterError("Parameter user or serial required!", id=1212)
if (serial is not None):
log.debug("[setOtpLen] setting OTP length for serial %r" % serial)
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.setOtpLen(otplen)
return len(tokenList)
[docs]def setHashLib(hashlib, user, serial):
'''
sets the Hashlib in the tokeninfo
'''
if user is None and serial is None:
log.warning("[setHashLib] Parameter user or serial required!")
raise ParameterError("Parameter user or serial required!", id=1212)
if serial is not None:
log.debug("[setHashLib] setting hashlib for serial %r" % serial)
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.setHashLib(hashlib)
return len(tokenList)
[docs]def setCountAuth(count, user, serial, max=False, success=False):
'''
sets either of the counters:
count_auth
count_auth_max
count_auth_success
count_auth_success_max
'''
if user is None and serial is None:
log.warning("[setCountAuth] Parameter user or serial required!")
raise ParameterError("Parameter user or serial required!", id=1212)
if serial is not None:
log.debug("[setCountAuth] setting authcount for serial %r" % serial)
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
if max:
if success:
token.set_count_auth_success_max(count)
else:
token.set_count_auth_max(count)
else:
if success:
token.set_count_auth_success(count)
else:
token.set_count_auth(count)
return len(tokenList)
[docs]def addTokenInfo(info, value, user, serial):
'''
sets an abitrary Tokeninfo field
'''
if user is None and serial is None:
log.warning("[setTokenInfo] Parameter user or serial required!")
raise ParameterError("Parameter user or serial required!", id=1212)
if serial is not None:
log.debug("[setTokenInfo] setting tokeninfo %r for serial %r" % (info, serial))
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.addToTokenInfo(info, value)
return len(tokenList)
###############################################################################
## LinOtpTokenPinUser
###############################################################################
[docs]def setPinUser(userPin, serial):
user = None
if serial is None:
log.warning("[setPinUser] Parameter serial required!")
raise ParameterError("Parameter 'serial' is required!", id=1212)
log.debug("[setPin] setting Pin for serial %r" % serial)
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.setUserPin(userPin)
return len(tokenList)
###############################################################################
## LinOtpTokenPinSO
###############################################################################
[docs]def setPinSo(soPin, serial):
user = None
if serial is None:
log.warning("[setPinSo] Parameter serial required!")
raise ParameterError("Parameter 'serial' is required!", id=1212)
log.debug("[setPinSo] setting Pin for serial %r" % serial)
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.setSoPin(soPin)
return len(tokenList)
[docs]def setSyncWindow(syncWindow, user, serial):
if user is None and serial is None:
log.warning("[setSyncWindow] Parameter serial or user required!")
raise ParameterError("Parameter user or serial required!", id=1212)
log.debug("[setSyncWindow] setting syncwindow for serial %r" % serial)
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.setSyncWindow(syncWindow)
return len(tokenList)
[docs]def setCounterWindow(countWindow, user, serial):
if user is None and serial is None:
log.warning("[setCounterWindow] Parameter serial or user required!")
raise ParameterError("Parameter user or serial required!", id=1212)
log.debug("[setCounterWindow] setting count window for serial %r" % serial)
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.setCounterWindow(countWindow)
return len(tokenList)
[docs]def setDescription(description, user, serial):
if user is None and serial is None:
log.warning("[setDescription] Parameter serial or user required!")
raise ParameterError("Parameter user or serial required!", id=1212)
log.debug("[setDescription] setting count window for serial %r" % serial)
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.setDescription(description)
return len(tokenList)
[docs]def resyncToken(otp1, otp2, user, serial, options=None):
ret = False
if (user is None) and (serial is None):
log.warning("[resyncToken] Parameter serial or user required!")
raise ParameterError("Parameter user or serial required!", id=1212)
log.debug("[resyncToken] resync token with serial %r" % serial)
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
res = token.resync(otp1, otp2, options)
if res == True:
ret = True
return ret
[docs]def resetToken(user=None, serial=None):
if (user is None) and (serial is None):
log.warning("[resetToken] Parameter serial or user required!")
raise ParameterError("Parameter user or serial required!", id=1212)
log.debug("[resetToken] reset token with serial %r" % serial)
tokenList = getTokens4UserOrSerial(user, serial)
for token in tokenList:
token.addToSession(Session)
token.reset()
return len(tokenList)
def _gen_serial(prefix, tokennum, min_len=8):
'''
helper to create a hex digit string
:param prefix: the prepended prefix like LSGO
:param tokennum: the token number counter (int)
:param min_len: int, defining the length of the hex string
:return: hex digit string
'''
h_serial = ''
num_str = '%.4d' % tokennum
h_len = min_len - len(num_str)
if h_len > 0:
h_serial = binascii.hexlify(os.urandom(h_len)).upper()[0:h_len]
return "%s%s%s" % (prefix, num_str, h_serial)
[docs]def genSerial(tokenType=None, prefix=None):
'''
generate a serial number similar to the one generated in the manage web gui
:param tokenType: the token type prefix is done by a lookup on the tokens
:return: serial number
'''
if tokenType is None:
tokenType = 'LSUN'
tokenprefixes = config['tokenprefixes']
if prefix is None:
prefix = tokenType.upper()
if tokenType.lower() in tokenprefixes:
prefix = tokenprefixes.get(tokenType.lower())
## now search the number of ttypes in the token database
tokennum = Session.query(Token).filter(
Token.LinOtpTokenType == u'' + tokenType).count()
serial = _gen_serial(prefix, tokennum + 1)
## now test if serial already exists
while True:
numtokens = Session.query(Token).filter(
Token.LinOtpTokenSerialnumber == u'' + serial).count()
if numtokens == 0:
## ok, there is no such token, so we're done
break
## else - rare case:
## we add the numtokens to the number of existing tokens with serial
serial = _gen_serial(prefix, tokennum + numtokens)
return serial
[docs]def getTokenConfig(tok, section=None):
'''
getTokenConfig - return the config definition
of a dynamic token
:param tok: token type (shortname)
:type tok: string
:param section: subsection of the token definition - optional
:type section: string
:return: dict - if nothing found an empty dict
:rtype: dict
'''
res = {}
g = config['pylons.app_globals']
tokenclasses = g.tokenclasses
if tok in tokenclasses.keys():
tclass = tokenclasses.get(tok)
tclt = newToken(tclass)
# check if we have a policy in the token definition
if hasattr(tclt, 'getClassInfo'):
res = tclt.getClassInfo(section, ret={})
return res
### TODO: move TokenIterator to dedicated file
[docs]class TokenIterator(object):
'''
TokenIterator class - support a smooth iterating through the tokens
'''
def __init__(self, user, serial, page=None, psize=None, filter=None,
sort=None, sortdir=None, filterRealm=None, user_fields=None,
params=None):
'''
constructor of Tokeniterator, which gathers all conditions to build
a sqalchemy query - iterator
:param user: User object - user provides as well the searchfield entry
:type user: User class
:param serial: serial number of a token
:type serial: string
:param page: page number
:type page: int
:param psize: how many entries per page
:type psize: int
:param filter: additional condition
:type filter: string
:param sort: sort field definition
:type sort: string
:param sortdir: sort direction: ascending or descending
:type sortdir: string
:param filterRealm: restrict the set of token to those in the filterRealm
:type filterRealm: string or list
:param user_fields: list of additional fields from the user owner
:type user_fields: array
:param params: list of additional request parameters - currently not used
:type params: dict
:return: - nothing / None
'''
log.debug('[TokenIterator::init] begin. start creating TokenIterator \
class with parameters: user:%r, serial:%r, page=%r, psize:%r, \
filter:%r, sort:%r, sortdir:%r, filterRealm:%r' %
(user, serial, page, psize, filter, sort, sortdir, filterRealm))
if params is None:
params = {}
self.page = 1
self.pages = 1
self.tokens = 0
self.user_fields = user_fields
if self.user_fields == None:
self.user_fields = []
condition = None
ucondition = None
scondition = None
r_condition = None
if type(filterRealm) in (str, unicode):
filterRealm = filterRealm.split(',')
if type(filterRealm) in [list]:
s_realms = []
for f in filterRealm:
## support for multiple realm filtering in the ui
## as a coma separated string
for s_realm in f.split(','):
s_realms.append(s_realm.strip())
filterRealm = s_realms
## create a list of all realms, which are allowed to be searched
## based on the list of the existing ones
valid_realms = []
realms = getRealms().keys()
if '*' in filterRealm:
valid_realms.append("*")
else:
for realm in realms:
if realm in filterRealm:
realm = linotp.lib.crypt.uencode(realm)
valid_realms.append(realm)
if serial is not None:
## check if the requested serial is in the realms of the admin (filterRealm)
log.debug('[TokenIterator::init] start search for serial: >%r<' % (serial))
allowed = False
if filterRealm == ['*']:
allowed = True
else:
realms = getTokenRealms(serial)
for realm in realms:
if realm in filterRealm:
allowed = True
if allowed == True:
scondition = and_(Token.LinOtpTokenSerialnumber == serial)
if user.isEmpty() == False and user is not None:
log.debug('[TokenIterator::init] start search for username: >%r<'
% (user))
if user.login is not None and (user.login) > 0 :
loginUser = user.login.lower()
loginUser = loginUser.replace('"', '')
loginUser = loginUser.replace("'", '')
searchType = "any"
## search for a 'blank' user
if len(loginUser) == 0 and len(user.login) > 0:
searchType = "blank"
elif loginUser == "/:no user:/" or loginUser == "/:none:/":
searchType = "blank"
elif loginUser == "/:no user info:/":
searchType = "wildcard"
elif "*" in loginUser or "." in loginUser:
searchType = "wildcard"
else:
## no blank and no wildcard search
searchType = "exact"
if searchType == "blank":
log.debug('[TokenIterator::init] search for empty user: >%r<' % (user.login))
ucondition = and_(or_(Token.LinOtpUserid == u'',
Token.LinOtpUserid == None))
if searchType == "exact":
log.debug('[TokenIterator::init] search for exact user: %r' % (user))
serials = []
users = []
## if search for a realmuser 'user@realm' we can take the
## realm from the argument
if len(user.realm) > 0:
users.append(user)
else:
for realm in valid_realms:
users.append(User(user.login, realm))
for usr in users:
try:
tokens = getTokens4UserOrSerial(user=usr, _class=False)
for tok in tokens:
serials.append(tok.LinOtpTokenSerialnumber)
except UserError as ex:
## we get an exception if the user is not found
log.debug('[TokenIterator::init] no exact user: %r'
% (user))
log.debug('[TokenIterator::init] %r' % ex)
if len(serials) > 0:
## if tokens found, search for their serials
ucondition = and_(Token.LinOtpTokenSerialnumber.in_(serials))
else:
## if no token is found, block search for user
## and return nothing
ucondition = and_(or_(Token.LinOtpUserid == u'',
Token.LinOtpUserid == None))
## handle case, when nothing found in former cases
if searchType == "wildcard":
log.debug('[TokenIterator::init] wildcard search: %r' % (user))
serials = []
users = getAllTokenUsers()
logRe = None
lU = loginUser.replace('*', '.*')
#lU = lU.replace('..', '.')
logRe = re.compile(lU)
for ser in users:
userInfo = users.get(ser)
tokenUser = userInfo.get('username').lower()
try:
if logRe.match(u'' + tokenUser) is not None:
serials.append(ser)
except Exception as e:
log.error('error no express %r ' % e)
## to prevent warning, we check is serials are found
## SAWarning: The IN-predicate on
## "Token.LinOtpTokenSerialnumber" was invoked with an
## empty sequence. This results in a contradiction, which
## nonetheless can be expensive to evaluate. Consider
## alternative strategies for improved performance.
if len(serials) > 0:
ucondition = and_(Token.LinOtpTokenSerialnumber.in_(serials))
else:
ucondition = and_(or_(Token.LinOtpUserid == u'',
Token.LinOtpUserid == None))
if filter is None:
condition = None
elif filter in ['/:active:/', '/:enabled:/',
'/:token is active:/', '/:token is enabled:/' ]:
condition = and_(Token.LinOtpIsactive == True)
elif filter in ['/:inactive:/', '/:disabled:/',
'/:token is inactive:/', '/:token is disabled:/']:
condition = and_(Token.LinOtpIsactive == False)
else:
## search in other colums
filter = linotp.lib.crypt.uencode(filter)
condition = or_(Token.LinOtpTokenDesc.contains(filter),
Token.LinOtpIdResClass.contains(filter),
Token.LinOtpTokenSerialnumber.contains(filter),
Token.LinOtpUserid.contains(filter),
Token.LinOtpTokenType.contains(filter))
###################################################################
## The condition for only getting certain realms!
if '*' in valid_realms:
log.debug("[TokenIterator::init] wildcard for realm '*' found."
" Tokens of all realms will be displayed")
elif len(valid_realms) > 0:
log.debug("[TokenIterator::init] adding filter condition"
" for realm %r" % valid_realms)
## get all matching realms
realm_id_tuples = Session.query(Realm.id).\
filter(Realm.name.in_(valid_realms)).all()
realm_ids = set()
for realm_tuple in realm_id_tuples:
realm_ids.add(realm_tuple[0])
## get all tokenrealm ids
token_id_tuples = Session.query(TokenRealm.token_id).\
filter(TokenRealm.realm_id.in_(realm_ids)).all()
token_ids = set()
for token_tuple in token_id_tuples:
token_ids.add(token_tuple[0])
## define the token id condition
r_condition = and_(Token.LinOtpTokenId.in_(token_ids))
elif ("''" in filterRealm or '""' in filterRealm or
"/:no realm:/" in filterRealm):
log.debug("[TokenIterator::init] search for all tokens, which are"
" in no realm")
## get all tokenrealm ids
token_id_tuples = Session.query(TokenRealm.token_id).all()
token_ids = set()
for token_tuple in token_id_tuples:
token_ids.add(token_tuple[0])
## define the token id not condition
r_condition = and_(not_(Token.LinOtpTokenId.in_(token_ids)))
## create the final condition as AND of all conditions
condTuple = ()
for conn in (condition, ucondition, scondition, r_condition):
if type(conn).__name__ != 'NoneType':
condTuple += (conn,)
condition = and_(*condTuple)
order = Token.LinOtpTokenDesc
## o LinOtp.TokenId: 17943
## o LinOtp.TokenInfo: ""
## o LinOtp.TokenType: "spass"
## o LinOtp.TokenSerialnumber: "spass0000FBA3"
## o User.description: "Cornelius Koelbel,cornelius.koelbel@lsexperts.de,local,"
## o LinOtp.IdResClass: "useridresolver.PasswdIdResolver.IdResolver._default_Passwd_"
## o User.username: "koelbel"
## o LinOtp.TokenDesc: "Always Authenticate"
## o User.userid: "1000"
## o LinOtp.IdResolver: "/etc/passwd"
## o LinOtp.Isactive: true
if sort == "TokenDesc":
order = Token.LinOtpTokenDesc
elif sort == "TokenId":
order = Token.LinOtpTokenId
elif sort == "TokenType":
order = Token.LinOtpTokenType
elif sort == "TokenSerialnumber":
order = Token.LinOtpTokenSerialnumber
elif sort == "TokenType":
order = Token.LinOtpTokenType
elif sort == "IdResClass":
order = Token.LinOtpIdResClass
elif sort == "IdResolver":
order = Token.LinOtpIdResolver
elif sort == "Userid":
order = Token.LinOtpUserid
elif sort == "FailCount":
order = Token.LinOtpFailCount
elif sort == "Userid":
order = Token.LinOtpUserid
elif sort == "Isactive":
order = Token.LinOtpIsactive
## care for the result sort order
if sortdir is not None and sortdir == "desc":
order = order.desc()
else:
order = order.asc()
## care for the result pageing
if page is None:
self.toks = Session.query(Token).filter(condition).order_by(order).distinct()
self.tokens = self.toks.count()
log.debug("[TokenIterator] DB-Query returned # of objects: %i" % self.tokens)
self.pagesize = self.tokens
self.it = iter(self.toks)
return
try:
if psize is None:
pagesize = int(getFromConfig("pagesize", 50))
else:
pagesize = int(psize)
except:
pagesize = 20
try:
thePage = int (page) - 1
except:
thePage = 0
if thePage < 0:
thePage = 0
start = thePage * pagesize
stop = (thePage + 1) * pagesize
self.toks = Session.query(Token).filter(condition).order_by(order).distinct()
self.tokens = self.toks.count()
log.debug("[TokenIterator::init] DB-Query returned # of objects: %i" % self.tokens)
self.page = thePage + 1
fpages = float(self.tokens) / float(pagesize)
self.pages = int(fpages)
if fpages - int(fpages) > 0:
self.pages = self.pages + 1
self.pagesize = pagesize
self.toks = self.toks.slice(start, stop)
self.it = iter(self.toks)
log.debug('[TokenIterator::init] end. Token iterator created: %r' % \
(self.it))
return
[docs] def getResultSetInfo(self):
resSet = {"pages" : self.pages,
"pagesize" : self.pagesize,
"tokens" : self.tokens,
"page" : self.page}
return resSet
[docs] def getUserDetail(self, tok):
userInfo = {}
uInfo = {}
userInfo["User.description"] = u''
userInfo["User.userid"] = u''
userInfo["User.username"] = u''
for field in self.user_fields:
userInfo["User.%s" % field] = u''
if tok.LinOtpUserid:
#userInfo["User.description"] = u'/:no user info:/'
userInfo["User.userid"] = u'/:no user info:/'
userInfo["User.username"] = u'/:no user info:/'
uInfo = getUserInfo(tok.LinOtpUserid, tok.LinOtpIdResolver, tok.LinOtpIdResClass)
if uInfo is not None and len(uInfo) > 0:
if uInfo.has_key("description"):
description = uInfo.get("description")
if isinstance(description, str):
userInfo["User.description"] = description.decode(ENCODING)
else:
userInfo["User.description"] = description
if uInfo.has_key("userid"):
userid = uInfo.get("userid")
if isinstance(userid, str):
userInfo["User.userid"] = userid.decode(ENCODING)
else:
userInfo["User.userid"] = userid
if uInfo.has_key("username"):
username = uInfo.get("username")
if isinstance(username, str):
userInfo["User.username"] = username.decode(ENCODING)
else:
userInfo["User.username"] = username
for field in self.user_fields:
fieldvalue = uInfo.get(field, "")
if isinstance(fieldvalue, str):
userInfo["User.%s" % field] = fieldvalue.decode(ENCODING)
else:
userInfo["User.%s" % field] = fieldvalue
return (userInfo, uInfo)
[docs] def next(self):
log.debug("[next] TokenIterator finds next token")
tok = self.it.next()
desc = tok.get_vars(save=True)
''' add userinfo to token description '''
(userInfo, ret) = self.getUserDetail(tok)
desc.update(userInfo)
return desc
def __iter__(self):
log.debug("[__iter__] TokenIterator")
return self
#eof###########################################################################