Source code for linotp.controllers.ocra

# -*- coding: utf-8 -*-
#
#    LinOTP - the open source solution for two factor authentication
#    Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
#    This file is part of LinOTP server.
#
#    This program is free software: you can redistribute it and/or
#    modify it under the terms of the GNU Affero General Public
#    License, version 3, as published by the Free Software Foundation.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the
#               GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#    E-mail: linotp@lsexperts.de
#    Contact: www.linotp.org
#    Support: www.lsexperts.de
#
"""
ocra controller - Interface for the Challenge Response Token (OCRA)
"""

import traceback
import logging
from urllib import urlencode


from pylons import request, response, config, tmpl_context as c
from linotp.model.meta import Session

from linotp.lib.base import BaseController
from linotp.lib.util  import check_session
from linotp.lib.util import get_client
from linotp.lib.error   import ParameterError

from linotp.lib.util    import getParam, getLowerParams
from linotp.lib.reply   import sendResult, sendError
from linotp.lib.reply   import sendQRImageResult


from linotp.lib.realm   import getDefaultRealm
from linotp.lib.user    import getUserFromRequest
from linotp.lib.user    import getUserInfo
from linotp.lib.user    import User

from linotp.lib.policy  import checkPolicyPre
from linotp.lib.policy  import PolicyException

from linotp.lib.token import getTokens4UserOrSerial
from linotp.lib.tokenclass import OcraTokenClass

from linotp.lib.token import checkSerialPass
from linotp.lib.user import  getUserFromParam
import webob



audit = config.get('audit')

log = logging.getLogger(__name__)

optional = True
required = False


[docs]class OcraController(BaseController): ''' The OcraController implements challenges/response tokens according to RFC 6287 ''' def __before__(self, action, **params): ''' Here we see, what action is to be called and check the authorization ''' log.debug("[__before__::%r] %r" % (action, params)) try: audit.initialize() c.audit['success'] = False c.audit['client'] = get_client() if action != "check_t": check_session() return response except webob.exc.HTTPUnauthorized as acc: ## the exception, when an abort() is called if forwarded log.error("[__before__::%r] webob.exception %r" % (action, acc)) log.error("[__before__] %s" % traceback.format_exc()) Session.rollback() Session.close() raise acc except Exception as exx: log.error("[__before__::%r] exception %r" % (action, exx)) log.error("[__before__] %s" % traceback.format_exc()) Session.rollback() Session.close() return sendError(response, exx, context='before') finally: log.debug("[__before__::%r] done" % (action)) def __after__(self): c.audit['administrator'] = getUserFromRequest(request).get("login") audit.log(c.audit) return response ########################################################
[docs] def request(self): """ method: ocra/request description: request a challenge for a user or for a serial number (token). arguments: * serial: (required - string) Serial number of the token, for which a challenge should be generated (either serial or user is required) * user: (required - string) The user for whose token a challenge should be generated If the user has more than one token, an error is returend. (either serial or user is required) * data: (required - String: URLendoced) These are the display data, that can be used to generate the challenge remark: the app will report a wrong qrcode, if the policy:: {'authentication' : qrtanurl=https://localhost } is not defined !! returns: A JSON respone:: { "version": "LinOTP 2.4", "jsonrpc": "2.0", "result": { "status": true, "value": false, }, "detail": { "transactionid" : TRANSAKTIONSID, "data" : DATAOBJECT, } } * transactionid: This is the transaction ID, that is used later for verifying the Return code /TAN. * data: This is an object (URL) which can be used to generate a QR-Code to be displayed to the QRTAN App """ res = {} description = 'ocra/request: request a challenge for a given user or token (serial). You must either provide a parameter "user" or a parameter "serial".' dataobj = "" try: param = getLowerParams(request.params) log.info("[request] saving default configuration: %r" % param) checkPolicyPre('ocra', "request") serial = getParam(param, 'serial', optional) user = getUserFromParam(param, optional) if user.isEmpty() and serial is None: ## raise exception log.error("[request] user or serial is required") raise ParameterError("Usage: %s" % description, id=77) message = getParam(param, 'data' , optional) if message is None: message = '' ## ocra token tokens = getTokens4UserOrSerial(user, serial) if len(tokens) > 1 : error = ('More than one token found: unable to create challenge ' 'for (u:%r,s:%r)!' % (user, serial)) log.error(error) raise Exception(error) if len(tokens) == 0: error = ('No token found: unable to create challenge for' ' (u:%r,s:%r)!' % (user, serial)) log.error(error) raise Exception(error) ocra = tokens[0] (transId, challenge, res, url) = ocra.challenge(message) u = urlencode({'u':str(url.encode("utf-8"))}) uInfo = {'tr': transId, 'ch' : challenge, 'me': str(message.encode("utf-8")), 'u': u[2:]} detail = {"transactionid" : transId, 'challenge' : challenge, 'message' : str(message.encode("utf-8")), 'url' : str(url.encode("utf-8")), } ## create the app_url from the data''' dataobj = 'lseqr://req?%s' % (str(urlencode(uInfo))) ## append the signature to the url ''' signature = {'si' : ocra.signData(dataobj)} uInfo['si'] = signature dataobj = '%s&%s' % (dataobj, str(urlencode(signature))) detail["data"] = dataobj c.audit['success'] = res #c.audit['info'] += "%s=%s, " % (k, value) Session.commit() qr = getParam(param, 'qr', optional) if qr is not None: param['alt'] = detail return sendQRImageResult(response, dataobj, param) else: return sendResult(response, res, 1, opt=detail) except PolicyException as pe: log.error("[request] policy failed: %r" % pe) log.error("[request] %s" % traceback.format_exc()) Session.rollback() return sendError(response, unicode(pe)) except Exception as exx: log.error("[request] failed: %r" % exx) log.error("[request] %s" % traceback.format_exc()) Session.rollback() return sendError(response, unicode(exx)) finally: Session.close() log.debug("[request] done") ## https://linotpserver/ocra/check_t?transactionid=TRANSACTIONID&pass=TAN
[docs] def check_t(self): """ method: orcra/check_t description: verify the response of the ocra token arguments: * transactionid: (required - string) Dies ist eine Transaktions-ID, die bei der Challenge ausgegeben wurde. * pass: (required - string) die response, die der OCRA Token auf Grund der Challenge berechnet hat returns: A JSON response:: { "version": "LinOTP 2.4", "jsonrpc": "2.0", "result": { "status": true, "value": { "failcount" : 3, "result": false } }, "id": 0 } exception: """ res = {} description = 'ocra/check_t: validate a token request.' try: param = getLowerParams(request.params) log.info("[check_t] check OCRA token: %r" % param) #checkPolicyPre('ocra', "check_t" ) passw = getParam(param, 'pass' , optional) if passw is None: ## raise exception''' log.error("[check_t] missing pass ") raise ParameterError("Usage: %s Missing parameter 'pass'." % description, id=77) transid = getParam(param, 'transactionid', optional) if transid is None: ## raise exception''' log.error("[check_t] missing transactionid, user or serial number of token") raise ParameterError("Usage: %s Missing parameter 'transactionid'." % description, id=77) ## if we have a transaction, get serial from this challenge value = {} ocraChallenge = OcraTokenClass.getTransaction(transid) if ocraChallenge is not None: serial = ocraChallenge.tokenserial tokens = getTokens4UserOrSerial(serial=serial) if len(tokens) == 0 or len(tokens) > 1: raise Exception('tokenmismatch for token serial: %s' % (unicode(serial))) theToken = tokens[0] tok = theToken.token desc = tok.get() realms = desc.get('LinOtp.RealmNames') if realms is None or len(realms) == 0: realm = getDefaultRealm() elif len(realms) > 0: realm = realms[0] userInfo = getUserInfo(tok.LinOtpUserid, tok.LinOtpIdResolver, tok.LinOtpIdResClass) user = User(login=userInfo.get('username'), realm=realm) (ok, opt) = checkSerialPass(serial, passw, user=user, options={'transactionid':transid}) failcount = theToken.getFailCount() value['result'] = ok value['failcount'] = int(failcount) else: ## no challenge found for this transid value['result'] = False value['failure'] = 'No challenge for transaction %r found'\ % transid c.audit['success'] = res #c.audit['info'] += "%s=%s, " % (k, value) Session.commit() return sendResult(response, value, 1) except Exception as e : log.error("[check_t] failed: %r" % e) log.error("[check_t] %s" % traceback.format_exc()) Session.rollback() return sendResult(response, unicode(e), 0) finally: Session.close() log.debug("[check_t] done")
''' https://linotpserver/ocra/checkstatus?transactionid=TRANSACTIONID https://linotpserver/ocra/checkstatus?serial=SERIENNUMMER https://linotpserver/ocra/checkstatus?user=BENUTZER '''
[docs] def checkstatus(self): """ method: orcra/checkstatus description: Methode zur assynchronen Ueberpruefungen eines Challenge Response Valiadation requests arguments: * transactionid: (required one of - string - (hex)) Dies ist eine Transaktions-ID, die bei der Challenge ausgegeben wurde. * serial: (required one of - string) die Serien Nummer des OCRA Token * user: (required one of - string) die Benutzer eines Tokens required is one of (user,serial,transactionid) returns: A JSON response:: { "version": "LinOTP 2.4", "jsonrpc": "2.0", "result": { "status": true, "value": [ { "serial": SERIENNUMMER1, "transactionid": TRANSACTIONID1, "received_tan": true, "valid_tan": true, "failcount": 0 }, { "serial": SERIENNUMMER1, "transactionid": TRANSACTIONID2, "received_tan": false, "valid_tan": false, "failcount": 0 }, { "serial": SERIENNUMMER2, "transactionid": TRANSACTIONID3, "received_tan": true, "valid_tan": false, "failcount": 2 }, ] }, "id": 0 } exception: """ res = {} description = 'ocra/checkstatus: check the token status - for assynchronous verification. Missing parameter: You need to provide one of the parameters "transactionid", "user" or "serial"' try: param = getLowerParams(request.params) log.debug("[checkstatus] check OCRA token status: %r" % param) checkPolicyPre('ocra', "status") transid = getParam(param, 'transactionid' , optional) user = getUserFromParam(param, optional) #user = getParam(param, 'user' ,optional) serial = getParam(param, 'serial' , optional) if transid is None and user.isEmpty() and serial is None: ## raise exception log.error("[ocra/checkstatus] : missing transactionid, user or serial number for token") raise ParameterError("Usage: %s" % description, id=77) tokens = [] serials = set() status = [] if serial is not None: serials.add(serial) ## if we have a transaction, get serial from this challenge if transid is not None : ocraChallenge = None try: ocraChallenge = OcraTokenClass.getTransaction(transid) except: pass if ocraChallenge is not None: serials.add(ocraChallenge.tokenserial) ## if we have a serial number of token if len(serials) > 0: for serial in serials: tokens.extend(getTokens4UserOrSerial(serial=serial)) ## if we have a user if user.isEmpty() == False: try: tokens.extend(getTokens4UserOrSerial(user=user)) except: log.warning("no token or user %r found!" % user) for token in tokens: if token.getType() == 'ocra': challenges = [] if transid is None: serial = token.getSerial() challenges = OcraTokenClass.getTransactions4serial(serial) else: challenges.append(OcraTokenClass.getTransaction(transid)) for challenge in challenges: stat = token.getStatus(challenge.transid) if stat is not None and len(stat) > 0: status.append(stat) res['values'] = status c.audit['success'] = res Session.commit() return sendResult(response, res, 1) except PolicyException as pe: log.error("[checkstatus] policy failed: %r" % pe) log.error("[checkstatus] %s" % traceback.format_exc()) Session.rollback() return sendError(response, unicode(pe)) except Exception as exx: log.error("[checkstatus] failed: %r" % exx) log.error("[checkstatus] %s" % traceback.format_exc()) Session.rollback() return sendResult(response, unicode(exx), 0) finally: Session.close() log.debug('[ocra/checkstatus] done')
[docs] def getActivationCode(self): ''' method: orcra/getActivationCode description: returns an valid example activcation code arguments: ./. returns: JSON with "activationcode": "JZXW4ZI=2A" ''' from linotp.lib.crypt import createActivationCode res = {} #description = 'ocra/getActivationCode' try: params = getLowerParams(request.params) log.debug("[getActivationCode]: %r" % params) checkPolicyPre('ocra', "activationcode") ac = str(params.get('activationcode')) activationCode = createActivationCode(acode=ac) res = {'activationcode':activationCode} Session.commit() return sendResult(response, res, 1) except PolicyException as pe: log.error("[getActivationCode] policy failed: %r" % pe) log.error("[getActivationCode] %s" % traceback.format_exc()) Session.rollback() return sendError(response, unicode(pe)) except Exception as exx: log.error("[getActivationCode] failed: %r" % exx) log.error("[getActivationCode] %s" % traceback.format_exc()) Session.rollback() return sendError(response, unicode(exx), 0) finally: Session.close() log.debug('[getActivationCode] done')
[docs] def calculateOtp(self): ''' ''' from linotp.lib.crypt import kdf2 from linotp.lib.ocra import OcraSuite from datetime import datetime from urlparse import urlparse from urlparse import parse_qs res = {} #description = 'ocra/calculateOtp: calculate the first otp from the given init2 response ' try: params = getLowerParams(request.params) log.debug("[calculateOtp]: %r" % params) checkPolicyPre('ocra', "calcOTP") sharedsecret = params.get('sharedsecret') activationcode = params.get('activationcode') nonce = params.get('nonce') ocrasuite = params.get('ocrasuite') challenge = params.get('challenge') counter = params.get('counter') ocrapin = params.get('ocrapin') nonce3 = params.get('no') ocrasuite3 = params.get('os') #serial3 = params.get('se') challenge = params.get('challenge') counter = params.get('counter') ocrapin = params.get('ocrapin') init1 = params.get('init1') init2 = params.get('init2') ## parse init1 ''' if init1 is not None: ## now parse the appurl for the ocrasuite ''' uri = urlparse(init1.replace('lseqr://', 'http://')) qs = uri.query qdict = parse_qs(qs) ocrasuite2 = qdict.get('os', None) if ocrasuite2 is not None and len(ocrasuite2) > 0: ocrasuite2 = ocrasuite2[0] if ocrasuite is None: ocrasuite = ocrasuite2 sharedsecret2 = qdict.get('sh', None) if sharedsecret2 is not None and len(sharedsecret2) > 0: sharedsecret2 = sharedsecret2[0] if sharedsecret is None: sharedsecret = sharedsecret2 ## parse init1 if init2 is not None: ## now parse the appurl for the ocrasuite uri = urlparse(init2.replace('lseqr://', 'http://')) qs = uri.query qdict = parse_qs(qs) challenge2 = qdict.get('ch', None) if challenge2 is not None and len(challenge2) > 0: challenge2 = challenge2[0] if challenge is None: challenge = challenge2 nonce2 = qdict.get('no', None) if nonce2 is not None and len(nonce2) > 0: nonce2 = nonce2[0] if nonce is None: nonce = nonce2 if ocrapin is None: ocrapin = '' if counter is None: counter = 0 if nonce3 is not None: nonce = unicode(nonce3) if ocrasuite3 is not None: ocrasuite = unicode(ocrasuite3) ## now we have all in place for the key derivation to create the new key ## sharedsecret, activationcode and nonce key_len = 20 if ocrasuite.find('-SHA256'): key_len = 32 elif ocrasuite.find('-SHA512'): key_len = 64 if sharedsecret is not None: sharedsecret = unicode(sharedsecret) if nonce is not None: nonce = unicode(nonce) if activationcode is not None: activationcode = unicode(activationcode) newkey = kdf2(sharedsecret, nonce, activationcode, len=key_len) ## hnewkey = binascii.hexlify(newkey) ocra = OcraSuite(ocrasuite) param = {} param['C'] = int(counter) param['Q'] = unicode(challenge) param['P'] = unicode(ocrapin) param['S'] = '' if ocra.T is not None: ## Default value for G is 1M, i.e., time-step size is one minute and the ## T represents the number of minutes since epoch time [UT]. now = datetime.now() stime = now.strftime("%s") itime = int(stime) param['T'] = itime data = ocra.combineData(**param) otp = ocra.compute(data, newkey) res = {'otp':otp} Session.commit() return sendResult(response, res, 1) except PolicyException as pe: log.error("[ocra/calculateOtp] policy failed: %r" % pe) log.error("[ocra/calculateOtp] %s" % traceback.format_exc()) Session.rollback() return sendError(response, pe) except Exception as e: log.error("[ocra/calculateOtp] failed: %r" % e) log.error("[ocra/calculateOtp] %s" % traceback.format_exc()) Session.rollback() return sendError(response, unicode(e), 0) finally: Session.close() log.debug('[ocra/calculateOtp] done') #eof###########################################################################