Source code for linotp.controllers.validate

# -*- coding: utf-8 -*-
#
#    LinOTP - the open source solution for two factor authentication
#    Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
#    This file is part of LinOTP server.
#
#    This program is free software: you can redistribute it and/or
#    modify it under the terms of the GNU Affero General Public
#    License, version 3, as published by the Free Software Foundation.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the
#               GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#    E-mail: linotp@lsexperts.de
#    Contact: www.linotp.org
#    Support: www.lsexperts.de
#

"""
validate controller - to check the authentication request
"""





import logging
import webob

from pylons import tmpl_context as c
from pylons import request, response, config
from pylons.controllers.util import abort
from linotp.lib.base import BaseController

from linotp.lib.util import get_client
from linotp.lib.util import  getParam
from linotp.lib.user import  getUserFromParam
from linotp.lib.realm import  getDefaultRealm
from linotp.lib.user import  getUserInfo
from linotp.lib.user import  getUserId
from linotp.lib.user    import User

from linotp.lib.config  import getFromConfig

from linotp.lib.token import checkUserPass, checkSerialPass
from linotp.lib.token import get_tokenserial_of_transaction

from linotp.lib.reply import sendResult, sendError
from linotp.lib.reply import sendQRImageResult

from linotp.model.meta import Session
from linotp.lib.selftest import isSelfTest
from linotp.lib.policy import check_user_authorization
from linotp.lib.policy import check_auth_tokentype
from linotp.lib.policy import check_auth_serial
from linotp.lib.policy import AuthorizeException
from linotp.lib.policy import set_realm
from linotp.lib.policy import is_auth_return

from linotp.lib.token import checkYubikeyPass
from linotp.lib.token import getTokens4UserOrSerial

from linotp.lib.error import ParameterError

import traceback

audit = config.get('audit')

optional = True
required = False

log = logging.getLogger(__name__)


#from paste.debug.profile import profile_decorator


[docs]class ValidateController(BaseController): ''' The linotp.controllers are the implementation of the web-API to talk to the LinOTP server. The ValidateController is used to validate the username with its given OTP value. An Authentication module like pam_linotp2 or rlm_linotp2 uses this ValidateController. The functions of the ValidateController are invoked like this https://server/validate/<functionname> The functions are described below in more detail. ''' def __before__(self, action, **params): log.debug("[__before__::%r] %r" % (action, params)) try: audit.initialize() c.audit['client'] = get_client() return response except Exception as exx: log.error("[__before__::%r] exception %r" % (action, exx)) log.error("[__before__] %s" % traceback.format_exc()) Session.rollback() Session.close() return sendError(response, exx, context='before') finally: log.debug("[__before__::%r] done" % (action)) def __after__(self, action, **params): audit.log(c.audit) return response def _check(self, param): ''' basic check function, that can be used by different controllers :param param: dict of all caller parameters :type param: dict :return: Tuple of True or False and opt :rtype: Tuple(boolean, opt) ''' opt = None options = {} ## put everythin in the options but the user, pass, init options.update(param) for para in ["pass", "user", "init"]: if options.has_key(para): del options[para] passw = getParam(param, "pass", optional) user = getUserFromParam(param, optional) # support for ocra application challenge verification challenge = getParam(param, "challenge", optional) if challenge is not None: options = {} options['challenge'] = challenge c.audit['user'] = user.login realm = user.realm or getDefaultRealm() c.audit['realm'] = realm # AUTHORIZATION Pre Check # we need to overwrite the user.realm in case the user does not exist in the original realm (setrealm-policy) user.realm = set_realm(user.login, realm, exception=True) check_user_authorization(user.login, user.realm, exception=True) if isSelfTest() == True: initTime = getParam(param, "init", optional) if initTime is not None: if options is None: options = {} options['initTime'] = initTime (ok, opt) = checkUserPass(user, passw, options=options) c.audit['success'] = ok if ok: # AUTHORIZATION post check check_auth_tokentype(c.audit['serial'], exception=True, user=user) check_auth_serial(c.audit['serial'], exception=True, user=user) # add additional details if is_auth_return(ok, user=user): if opt == None: opt = {} if ok: opt['realm'] = c.audit.get('realm') opt['user'] = c.audit.get('user') opt['tokentype'] = c.audit.get('token_type') opt['serial'] = c.audit.get('serial') else: opt['error'] = c.audit.get('action_detail') return (ok, opt) # @profile_decorator(log_file="/tmp/validate.prof")
[docs] def check(self): ''' This function is used to validate the username and the otp value/password. method: validate/check arguments: * user: The username or loginname * pass: The password that consist of a possible fixed password component and the OTP value * realm (optional): An optional realm to match the user to a useridresolver * challenge (optional): optional challenge + otp verification for challenge response token. This indicates, that tis request is a challenge request. * data (optional): optional challenge + otp verification for challenge response token. This indicates, that tis request is a challenge request. * state (optional): The optional id to respond to a previous challenge. * transactionid (optional): The optional id to respond to a previous challenge. returns: JSON response:: { "version": "LinOTP 2.4", "jsonrpc": "2.0", "result": { "status": true, "value": false }, "id": 0 } If ``status`` is ``true`` the request was handled successfully. If ``value`` is ``true`` the user was authenticated successfully. ''' param = {} ok = False opt = None try: param.update(request.params) # prevent the detection if a user exist # by sending a request w.o. pass parameter try: (ok, opt) = self._check(param) except (AuthorizeException, ParameterError) as exx: log.warning("[check] authorization failed for validate/check: %r" % exx) c.audit['success'] = False c.audit['info'] = unicode(exx) ok = False if is_auth_return(ok): if opt == None: opt = {} opt['error'] = c.audit.get('info') Session.commit() qr = getParam(param, 'qr', optional) if qr is not None and opt is not None and opt.has_key('message'): try: dataobj = opt.get('message') param['alt'] = "%s" % opt return sendQRImageResult(response, dataobj, param) except Exception as exc: log.warning("failed to send QRImage: %r " % exc) return sendQRImageResult(response, opt, param) else: return sendResult(response, ok, 0, opt=opt) except Exception as exx: log.error("[check] validate/check failed: %r" % exx) log.error("[check] %s" % traceback.format_exc()) # If an internal error occurs or the SMS gateway did not send the SMS, we write this to the detail info. c.audit['info'] = unicode(exx) Session.rollback() return sendError(response, u"validate/check failed: %s" % unicode(exx), 0) finally: Session.close() log.debug('[check] done')
[docs] def check_yubikey(self): ''' This function is used to validate the output of a yubikey method: validate/check_yubikey :param pass: The password that consist of the static yubikey prefix and the otp :type pass: string :return: JSON Object returns: JSON response:: { "version": "LinOTP 2.4", "jsonrpc": "2.0", "result": { "status": true, "value": false }, "detail" : { "username": username, "realm": realm }, "id": 0 } ''' param = request.params passw = getParam(param, "pass", required) try: ok = False try: ok, opt = checkYubikeyPass(passw) c.audit['success'] = ok except AuthorizeException as exx: log.warning("[check_yubikey] authorization failed for validate/check_yubikey: %r" % exx) c.audit['success'] = False c.audit['info'] = unicode(exx) ok = False Session.commit() return sendResult(response, ok, 0, opt=opt) except Exception as exx: log.error("[check_yubikey] validate/check_yubikey failed: %r" % exx) log.error("[check_yubikey] %s" % traceback.format_exc()) c.audit['info'] = unicode(exx) Session.rollback() return sendError(response, u"validate/check_yubikey failed: %s" % unicode(exx), 0) finally: Session.close() log.debug('[check_yubikey] done')
[docs] def check_url(self): ''' This function works with pam_url. ''' ok = False param = {} try: param.update(request.params) try: (ok, opt) = self._check(param) except AuthorizeException as acc: log.warning("[check_url] authorization failed for validate/check_url: %r" % acc) c.audit['success'] = False c.audit['action_detail'] = unicode(acc) ok = False Session.commit() response.headers['blablafoo'] = 'application/json' ## TODO: this code seems not to be finished if not ok: abort(403) else: return "Preshared Key Todo" except webob.exc.HTTPUnauthorized as acc: ## the exception, when an abort() is called if forwarded log.error("[__before__::%r] webob.exception %r" % acc) log.error("[__before__] %s" % traceback.format_exc()) Session.rollback() raise acc except Exception as exx: log.error("[check_url] validate/check_url failed: %r" % exx) log.error("[check_url] %s" % traceback.format_exc()) Session.rollback() return sendError(response, u"validate/check_url failed: %s" % unicode(exx), 0) finally: Session.close() log.debug("[check_url] done")
[docs] def samlcheck(self): ''' This function is used to validate the username and the otp value/password in a SAML environment. If ``linotp.allowSamlAttributes = True`` then the attributes of the authenticated users are also contained in the response. method: validate/samlcheck arguments: * user: username / loginname * pass: the password that consists of a possible fixes password component and the OTP value * realm: optional realm to match the user to a useridresolver returns: JSON response ''' try: opt = None param = request.params (ok, opt) = self._check(param) attributes = {} if True == ok: allowSAML = False try: allowSAML = getFromConfig("allowSamlAttributes") except: log.warning("[samlcheck] Calling controller samlcheck. But allowSamlAttributes == False.") if "True" == allowSAML: ## Now we get the attributes of the user user = getUserFromParam(param, optional) (uid, resId, resIdC) = getUserId(user) userInfo = getUserInfo(uid, resId, resIdC) #users = getUserList({ 'username':user.getUser()} , user) log.debug("[samlcheck] getting attributes for: %s@%s" % (user.getUser(), user.getRealm())) res = userInfo for key in ['username', 'surname', 'mobile', 'phone', 'givenname', 'email']: if key in res: attributes[key] = res[key] Session.commit() return sendResult(response, { 'auth': ok, 'attributes' : attributes } , 0, opt) except Exception as exx: log.error("[samlcheck] validate/check failed: %r" % exx) log.error("[samlcheck] %s" % traceback.format_exc()) Session.rollback() return sendError(response, "validate/samlcheck failed: %s" % unicode(exx), 0) finally: Session.close() log.debug('[samlcheck] done')
def check_t(self): param = {} value = {} ok = False opt = None try: param.update(request.params) passw = getParam(param, "pass", required) transid = param.get('state', None) if transid is not None: param['transactionid'] = transid del param['state'] if transid is None: transid = param.get('transactionid', None) if transid is None: raise Exception("missing parameter: state or transactionid!") serial = get_tokenserial_of_transaction(transId=transid) if serial is None: value['value'] = False value['failure'] = 'No challenge for transaction %r found'\ % transid else: param['serial'] = serial tokens = getTokens4UserOrSerial(serial=serial) if len(tokens) == 0 or len(tokens) > 1: raise Exception('tokenmismatch for token serial: %s' % (unicode(serial))) theToken = tokens[0] tok = theToken.token realms = tok.getRealmNames() if realms is None or len(realms) == 0: realm = getDefaultRealm() elif len(realms) > 0: realm = realms[0] userInfo = getUserInfo(tok.LinOtpUserid, tok.LinOtpIdResolver, tok.LinOtpIdResClass) user = User(login=userInfo.get('username'), realm=realm) (ok, opt) = checkSerialPass(serial, passw, user=user, options=param) value['value'] = ok failcount = theToken.getFailCount() value['failcount'] = int(failcount) c.audit['success'] = ok #c.audit['info'] += "%s=%s, " % (k, value) Session.commit() qr = getParam(param, 'qr', optional) if qr is not None and opt is not None and opt.has_key('message'): try: dataobj = opt.get('message') param['alt'] = "%s" % opt return sendQRImageResult(response, dataobj, param) except Exception as exc: log.warning("failed to send QRImage: %r " % exc) return sendQRImageResult(response, opt, param) else: return sendResult(response, value, 1, opt=opt) except Exception as exx: log.error("[check_t] validate/check_t failed: %r" % exx) log.error("[check_t] %s" % traceback.format_exc()) c.audit['info'] = unicode(exx) Session.rollback() return sendError(response, "validate/check_t failed: %s" % unicode(exx), 0) finally: Session.close() log.debug('[check_t] done')
[docs] def check_s(self): ''' This function is used to validate the serial and the otp value/password. method: validate/check_s arguments: * serial: the serial number of the token * pass: the password that consists of a possible fixes password component and the OTP value returns: JSON response ''' param = {} param.update(request.params) options = {} options.update(param) for k in ['user', 'serial', "pass", "init"]: if k in options: del options[k] if 'init' in param: if isSelfTest() == True: options['initTime'] = param.get('init') try: passw = getParam(param, "pass", optional) serial = getParam(param, 'serial', optional) if serial is None: user = getParam(param, 'user', optional) if user is not None: user = getUserFromParam(param, optional) toks = getTokens4UserOrSerial(user=user) if len(toks) == 0: raise Exception("No token found!") elif len(toks) > 1: raise Exception("More than one token found!") else: tok = toks[0].token desc = tok.get() realms = desc.get('LinOtp.RealmNames') if realms is None or len(realms) == 0: realm = getDefaultRealm() elif len(realms) > 0: realm = realms[0] userInfo = getUserInfo(tok.LinOtpUserid, tok.LinOtpIdResolver, tok.LinOtpIdResClass) user = User(login=userInfo.get('username'), realm=realm) serial = tok.getSerial() c.audit['serial'] = serial if isSelfTest() == True: initTime = getParam(param, "init", optional) if initTime is not None: if options is None: options = {} options['initTime'] = initTime (ok, opt) = checkSerialPass(serial, passw, options=options) c.audit['success'] = ok Session.commit() qr = getParam(param, 'qr', optional) if qr is not None and opt is not None and opt.has_key('message'): try: dataobj = opt.get('message') param['alt'] = "%s" % opt return sendQRImageResult(response, dataobj, param) except Exception as exc: log.warning("failed to send QRImage: %r " % exc) return sendQRImageResult(response, opt, param) else: return sendResult(response, ok, 0, opt=opt) except Exception as exx: log.error("[check_s] validate/check_s failed: %r" % exx) log.error("[check_s] %s" % traceback.format_exc()) c.audit['info'] = unicode(exx) Session.rollback() return sendError(response, "validate/check_s failed: %s" % unicode(exx), 0) finally: Session.close() log.debug('[check_s] done')
[docs] def simplecheck(self): ''' This function is used to validate the username and the otp value/password. method: validate/simplecheck arguments: * user: username / loginname * pass: the password that consists of a possible fixes password component and the OTP value * realm: additional realm to match the user to a useridresolver returns: Simple ascii response: :-) in case of success :-( in case of failed authentication :-/ in case of any error ''' opt = None param = request.params res = [] try: try: (ok, opt) = self._check(param) except AuthorizeException as e: log.warning("[simplecheck] validate/simplecheck: %r" % e) c.audit['success'] = False c.audit['action_detail'] = unicode(e) ok = False Session.commit() if ok == True: ret = u":-)" else: ret = u":-(" res.append(ret) if opt != None: if 'state' in opt or 'transactionid' in opt: stat = opt.get('transactionid') or opt.get('state') res.append(stat) if "data" in opt or "message" in opt: msg = opt.get('data') or opt.get('message') res.append(msg) return " ".join(res).strip() except Exception as exx: log.error("[simplecheck] failed: %r" % exx) log.error("[simplecheck] %s" % traceback.format_exc()) Session.rollback() return u":-/" finally: Session.close() log.debug('[simplecheck] done')
def ok(self): return sendResult(response, "TRUE", 0) def fail(self): return sendResult(response, "FALSE", 0)
[docs] def smspin(self): ''' This function is used in conjunction with an SMS token: the user authenticates with user and pin (pass) and will receive on his mobile an OTP as message method: validate/smspin arguments: * user: username / loginname * pass: the password that consists of a possible fixed password * realm: additional realm to match the user to a useridresolver returns: JSON response ''' ret = False param = request.params state = '' message = 'No sms message defined!' try: user = getUserFromParam(param, optional) c.audit['user'] = user.login c.audit['realm'] = user.realm or getDefaultRealm() c.audit['success'] = 0 (ret, opt) = self._check(param) ## here we build some backward compatibility if type(opt) is dict: state = opt.get('state', '') or '' message = opt.get('message', '') or 'No sms message defined!' # sucessfull submit if (message in ['sms with otp already submitted', 'sms submitted'] and len(state) > 0): ret = True c.audit['success'] = 1 # sending sms failed should be an error elif message in ['sending sms failed']: ret = True c.audit['success'] = 0 # anything else is an exception else: raise Exception(message) Session.commit() return sendResult(response, ret, opt) except Exception as exx: log.error("[smspin] validate/smspin failed: %r" % exx) log.error("[smspin] %s" % traceback.format_exc()) # If an internal error occurs or the SMS gateway did not send the SMS, we write this to the detail info. c.audit['info'] = unicode(exx) Session.rollback() return sendError(response, "validate/smspin failed: %s" % unicode(exx), 0) finally: Session.close() log.debug("[smspin] done") #eof###########################################################################