Source code for linotp.lib.tokens.remotetoken

# -*- coding: utf-8 -*-
#
#    LinOTP - the open source solution for two factor authentication
#    Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
#    This file is part of LinOTP server.
#
#    This program is free software: you can redistribute it and/or
#    modify it under the terms of the GNU Affero General Public
#    License, version 3, as published by the Free Software Foundation.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the
#               GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#    E-mail: linotp@lsexperts.de
#    Contact: www.linotp.org
#    Support: www.lsexperts.de
#
"""This file file contains the Remote token class"""

import logging
import copy
import traceback


import httplib2
import urllib
try:
    import json
except ImportError:
    import simplejson as json

from linotp.lib.config import getFromConfig
from linotp.lib.util import getParam
from linotp.lib.validate import split_pin_otp
from linotp.lib.validate import check_pin

optional = True
required = False

from linotp.lib.tokenclass import TokenClass


log = logging.getLogger(__name__)

###############################################


[docs]class RemoteTokenClass(TokenClass): """ The Remote token forwards an authentication request to another LinOTP server. The request can be forwarded to a user on the other server or to a serial number on the other server. The PIN can be checked on the local LinOTP server or on the remote server. Using the Remote token you can assign one physical token to many different users. """ def __init__(self, aToken): """ constructor - create a token class object with it's db token binding :param aToken: the db bound token """ TokenClass.__init__(self, aToken) self.setType(u"remote") self.remoteServer = "" self.remoteLocalCheckpin = None self.remoteSerial = None self.remoteUser = None self.remoteRealm = None self.remoteResConf = None self.mode = ['authenticate', 'challenge'] @classmethod
[docs] def getClassType(cls): """ return the class type identifier """ return "remote"
@classmethod
[docs] def getClassPrefix(cls): """ return the token type prefix """ return "LSRE"
@classmethod
[docs] def getClassInfo(cls, key=None, ret='all'): """ getClassInfo - returns a subtree of the token definition :param key: subsection identifier :type key: string :param ret: default return value, if nothing is found :type ret: user defined :return: subsection if key exists or user defined :rtype: s.o. """ log.debug("[getClassInfo] begin. Get class render info for section: " "key %r, ret %r " % (key, ret)) res = {'type': 'remote', 'title': 'Remote Token', 'description': ('REMOTE token to forward the authentication' ' request to another LinOTP server'), 'init': {'page': {'html': 'remotetoken.mako', 'scope': 'enroll', }, 'title': {'html': 'remotetoken.mako', 'scope': 'enroll.title', }, }, 'config': {'page': {'html': 'remotetoken.mako', 'scope': 'config', }, 'title': {'html': 'remotetoken.mako', 'scope': 'config.title', }, }, 'selfservice': {}, 'policy': {}, } if key is not None and key in res: ret = res.get(key) else: if ret == 'all': ret = res log.debug("[getClassInfo] end. Returned the configuration section: " "ret %r " % (ret)) return ret
[docs] def update(self, param): """ second phase of the init process - updates parameters :param param: the request parameters :return: - nothing - """ self.remoteServer = getParam(param, "remote.server", required) # if another OTP length would be specified in /admin/init this would # be overwritten by the parent class, which is ok. self.setOtpLen(6) val = getParam(param, "remote.local_checkpin", optional) if val is not None: self.remoteLocalCheckpin = val val = getParam(param, "remote.serial", optional) if val is not None: self.remoteSerial = val val = getParam(param, "remote.user", optional) if val is not None: self.remoteUser = val val = getParam(param, "remote.realm", optional) if val is not None: self.remoteRealm = val val = getParam(param, "remote.resConf", optional) if val is not None: self.remoteResConf = val TokenClass.update(self, param) self.addToTokenInfo("remote.server", self.remoteServer) self.addToTokenInfo("remote.serial", self.remoteSerial) self.addToTokenInfo("remote.user", self.remoteUser) self.addToTokenInfo("remote.local_checkpin", self.remoteLocalCheckpin) self.addToTokenInfo("remote.realm", self.remoteRealm) self.addToTokenInfo("remote.resConf", self.remoteResConf) return
[docs] def check_pin_local(self): """ lookup if pin should be checked locally or on remote host :return: bool """ local_check = False if 1 == int(self.getFromTokenInfo("remote.local_checkpin")): local_check = True log.debug(" local checking pin? %r" % local_check) return local_check
[docs] def authenticate(self, passw, user, options=None): """ do the authentication on base of password / otp and user and options, the request parameters. Here we contact the other LinOTP server to validate the OtpVal. :param passw: the password / otp :param user: the requesting user :param options: the additional request parameters :return: tupple of (success, otp_count - 0 or -1, reply) """ log.debug("authenticate") res = False otp_counter = -1 reply = None otpval = passw ## should we check the pin localy?? if self.check_pin_local(): (res, pin, otpval) = split_pin_otp(self, passw, user, options=options) res = TokenClass.checkPin(self, pin) if res is False: return (res, otp_counter, reply) (res, otp_count, reply) = self.do_request(otpval, user=user) return (res, otp_count, reply)
[docs] def is_challenge_request(self, passw, user, options=None): """ This method checks, if this is a request, that triggers a challenge. It depends on the way, the pin is checked - either locally or remote :param passw: password, which might be pin or pin+otp :type passw: string :param user: The user from the authentication request :type user: User object :param options: dictionary of additional request parameters :type options: dict :return: true or false """ request_is_valid = False if self.check_pin_local(): pin_match = check_pin(self, passw, user=user, options=options) if pin_match is True: request_is_valid = True return request_is_valid
[docs] def do_request(self, passw, transactionid=None, user=None): """ run the http request against the remote host :param passw: the password which should be checked on the remote host :param transactionid: provided, if this is a challenge response :param user: the requesting user - used if no remote serial or remote user is provided :return: Tuple of (success, otp_count= -1 or 0, reply=remote response) """ reply = {} otpval = passw.encode("utf-8") remoteServer = self.getFromTokenInfo("remote.server") or "" remoteServer = remoteServer.encode("utf-8") ## in preparation of the ability to reloacte linotp urls, ## we introduce the remote url path remotePath = self.getFromTokenInfo("remote.path") or "" remotePath = remotePath.strip().encode('utf-8') remoteSerial = self.getFromTokenInfo("remote.serial") or "" remoteSerial = remoteSerial.encode('utf-8') remoteUser = self.getFromTokenInfo("remote.user") or "" remoteUser = remoteUser.encode('utf-8') remoteRealm = self.getFromTokenInfo("remote.realm") or "" remoteRealm = remoteRealm.encode('utf-8') remoteResConf = self.getFromTokenInfo("remote.resConf") or "" remoteResConf = remoteResConf.encode('utf-8') ssl_verify = getFromConfig("remote.verify_ssl_certificate", False) or False if type(ssl_verify) in [str, unicode]: if ssl_verify.lower() == "true": ssl_verify = True else: ssl_verify = False ## here we also need to check for remote.user and so on.... log.debug("[checkOtp] checking OTP len:%r remotely on server: %r," " serial: %r, user: %r" % (len(otpval), remoteServer, remoteSerial, remoteUser)) params = {} if len(remoteSerial) > 0: params['serial'] = remoteSerial if len(remotePath) == 0: remotePath = "/validate/check_s" elif len(remoteUser) > 0: params['user'] = remoteUser params['realm'] = remoteRealm params['resConf'] = remoteResConf if len(remotePath) == 0: remotePath = "/validate/check" else: ## There is no remote.serial and no remote.user, so we will ## try to pass the requesting user. if user is None: log.warning("FIXME: We do not know the user at the moment!") else: params['user'] = user.login params['realm'] = user.realm params['pass'] = otpval if transactionid is not None: params['state'] = transactionid ## use a POST request to check the token data = urllib.urlencode(params) request_url = "%s%s" % (remoteServer, remotePath) try: ## prepare the submit and receive headers headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} ## submit the request try: ## is httplib compiled with ssl? http = httplib2.Http(disable_ssl_certificate_validation= not(ssl_verify)) except TypeError as exx: ## not so on squeeze: ## TypeError: __init__() got an unexpected keyword argument ## 'disable_ssl_certificate_validation' log.warning("httplib2 'disable_ssl_certificate_validation' " "attribute error: %r" % exx) ## so we run in fallback mode http = httplib2.Http() (resp, content) = http.request(request_url, method="POST", body=data, headers=headers) result = json.loads(content) log.debug(result) status = result['result']['status'] log.debug(status) if True == status: if True == result['result']['value']: res = True otp_count = 0 if "detail" in result: reply = copy.deepcopy(result["detail"]) otp_count = -1 res = False except Exception as exx: log.error("[do_request] [RemoteToken] Error getting response from " "remote Server (%r): %r" % (request_url, exx)) log.error("[do_request] %r" % (traceback.format_exc())) return (res, otp_count, reply)
[docs] def checkResponse4Challenge(self, user, passw, options=None, challenges=None): ''' This method verifies if the given ``passw`` matches any existing ``challenge`` of the token. It then returns the new otp_counter of the token and the list of the matching challenges. In case of success the otp_counter needs to be >= 0. The matching_challenges is passed to the method :py:meth:`~linotp.lib.tokenclass.TokenClass.challenge_janitor` to clean up challenges. :param user: the requesting user :type user: User object :param passw: the password (pin+otp) :type passw: string :param options: additional arguments from the request, which could be token specific :type options: dict :param challenges: A sorted list of valid challenges for this token. :type challenges: list :return: tuple of (otpcounter and the list of matching challenges) ''' otp_counter = -1 transid = None matching_challenge = None reply = None matching_challenges = [] if 'transactionid' in options or 'state' in options: ## fetch the transactionid transid = options.get('transactionid', options.get('state', None)) if transid is not None: ## in case of a local pin check, we the transaction is a local one ## and we must not forward this!! if self.check_pin_local(): ## check if transaction id is in list of challengens for challenge in challenges: if challenge.transid == transid: matching_challenge = challenge break if matching_challenge is not None: (res, otp_counter, reply) = \ self.do_request(passw, user=user) ## everything is ok, we remove the challenge if res is True and otp_counter >= 0: matching_challenges.append(matching_challenge) ## in case of remote check pin, we just forward everything else: (res, otp_counter, reply) = \ self.do_request(passw, transactionid=transid, user=user) return (otp_counter, matching_challenges)
[docs] def checkPin(self, pin, options=None): """ check the pin - either remote or localy - in case of remote, we return true, as the the splitPinPass will put the passw then in the otpVal """ res = True ## only, if pin should be checked localy if self.check_pin_local(): res = TokenClass.checkPin(self, pin) return res
[docs] def splitPinPass(self, passw): """ Split the PIN and the OTP value. Only if it is locally checked and not remotely. :param passw: the password with pin and otp :return: tupple of the (success, pin and otpvalue) """ res = 0 local_check = self.check_pin_local() if local_check: (res, pin, otpval) = TokenClass.splitPinPass(self, passw) else: pin = "" otpval = passw log.debug("[splitPinPass] [remotetoken] returnung (len:%r) (len:%r)" % (len(pin), len(otpval))) return (res, pin, otpval) ###eof#########################################################################