Source code for linotp.lib.tokens.totptoken

# -*- coding: utf-8 -*-
#
#    LinOTP - the open source solution for two factor authentication
#    Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
#    This file is part of LinOTP server.
#
#    This program is free software: you can redistribute it and/or
#    modify it under the terms of the GNU Affero General Public
#    License, version 3, as published by the Free Software Foundation.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the
#               GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#    E-mail: linotp@lsexperts.de
#    Contact: www.linotp.org
#    Support: www.lsexperts.de
#
"""This file containes the dynamic time based hmac token implementation"""

import logging
import time
import math
import datetime

import traceback


from linotp.lib.HMAC    import HmacOtp
from linotp.lib.util    import getParam
from linotp.lib.util    import generate_otpkey
from linotp.lib.config  import getFromConfig


optional = True
required = False

from linotp.lib.tokenclass import TokenClass
from linotp.lib.tokens.hmactoken import HmacTokenClass

keylen = {'sha1' : 20,
          'sha256' : 32,
          'sha512' : 64
          }

log = logging.getLogger(__name__)



###############################################
###############################################
"""
TOTP Algorithm

   This variant of the HOTP algorithm specifies the calculation of a
   one-time password value, based on a representation of the counter as
   a time factor.

4.1.  Notations

   - X represents the time step in seconds (default value X = 30
   seconds) and is a system parameter;

   - T0 is the Unix time to start counting time steps (default value is
   0, Unix epoch) and is also a system parameter.

4.2.  Description

   Basically, we define TOTP as TOTP = HOTP(K, T) where T is an integer
   and represents the number of time steps between the initial counter
   time T0 and the current Unix time (i.e. the number of seconds elapsed
   since midnight UTC of January 1, 1970).

   More specifically T = (Current Unix time - T0) / X where:

   - X represents the time step in seconds (default value X = 30
   seconds) and is a system parameter;

   - T0 is the Unix time to start counting time steps (default value is
   0, Unix epoch) and is also a system parameter;

   - The default floor function is used in the computation.  For
   example, with T0 = 0 and time step X = 30, T = 1 if the current Unix
   time is 59 seconds and T = 2 if the current Unix time is 60 seconds.

M'Raihi, et al.          Expires March 12, 2011                 [Page 5]

Internet-Draft                HOTPTimeBased               September 2010


"""

###############################################
[docs]class TimeHmacTokenClass(HmacTokenClass): resyncDiffLimit = 3 def __init__(self, aToken): ''' constructor - create a token object :param aToken: instance of the orm db object :type aToken: orm object ''' log.debug("[init] begin. Create a token object with: a_token %r" % (aToken)) TokenClass.__init__(self, aToken) self.setType(u"TOTP") self.hKeyRequired = True ''' timeStep defines the granularity: ''' self.timeStep = getFromConfig("totp.timeStep", 30) ''' window size in seconds: 30 seconds with as step width of 30 seconds results in a window of 1 which is one attempt ''' self.timeWindow = getFromConfig("totp.timeWindow", 180) '''the time shift is specified in seconds - and could be positive and negative ''' self.timeShift = getFromConfig("totp.timeShift", 0) '''we support various hashlib methods, but only on create which is effectively set in the update ''' self.hashlibStr = getFromConfig("totp.hashlib", u'sha1') log.debug("[init] end. Token object created") return @classmethod
[docs] def getClassType(cls): ''' getClassType - return the token type shortname :return: 'totp' :rtype: string ''' return "totp"
@classmethod
[docs] def getClassPrefix(cls): return "TOTP"
@classmethod
[docs] def getClassInfo(cls, key=None, ret='all'): ''' getClassInfo - returns a subtree of the token definition :param key: subsection identifier :type key: string :param ret: default return value, if nothing is found :type ret: user defined :return: subsection if key exists or user defined :rtype: s.o. ''' log.debug("[getClassInfo] begin. Get class render info for section: key %r, ret %r " % (key, ret)) res = { 'type' : 'totp', 'title' : 'HMAC Time Token', 'description' : ('time based otp token using the hmac algorithm'), 'init' : {'page' : {'html' : 'totptoken.mako', 'scope' : 'enroll', }, 'title' : {'html' : 'totptoken.mako', 'scope' : 'enroll.title', }, }, 'config' : { 'page' : {'html' : 'totptoken.mako', 'scope' : 'config', }, 'title' : {'html' : 'totptoken.mako', 'scope' : 'config.title', }, }, 'selfservice' : { 'enroll' : {'page' : {'html' : 'totptoken.mako', 'scope' : 'selfservice.enroll', }, 'title' : { 'html' : 'totptoken.mako', 'scope' : 'selfservice.title.enroll', }, }, }, 'policy' : { 'selfservice' : { 'totp_timestep': { 'type':'int', 'value' : [30, 60], 'desc' : 'Specify the time step of the timebased OTP token.' }, 'totp_hashlib' : {'type':'int', 'value' : [1, 2], 'desc' : 'Specify the hashlib to be used. Can be sha1 (1) or sha2-256 (2).' }, }, }, } if key is not None and res.has_key(key): ret = res.get(key) else: if ret == 'all': ret = res log.debug("[getClassInfo] end. Returned the configuration section: ret %r " % (ret)) return ret
[docs] def update(self, param): ''' update - process the initialization parameters :param param: dict of initialization parameters :type param: dict :return: nothing ''' log.debug("[update] begin. Process the initialization parameters: param %r" % (param)) ## check for the required parameters val = getParam(param, "hashlib", optional) if val is not None: self.hashlibStr = val else: self.hashlibStr = 'sha1' otpKey = '' if (self.hKeyRequired == True): genkey = int(getParam(param, "genkey", optional) or 0) if 1 == genkey: # if hashlibStr not in keylen dict, this will raise an Exception otpKey = generate_otpkey(keylen.get(self.hashlibStr)) del param['genkey'] else: # genkey not set: check otpkey is given # this will raise an exception if otpkey is not present otpKey = getParam(param, "otpkey", required) # finally set the values for the update param['otpkey'] = otpKey param['hashlib'] = self.hashlibStr val = getParam(param, "otplen", optional) if val is not None: self.setOtpLen(int(val)) else: self.setOtpLen(getFromConfig("DefaultOtpLen")) val = getParam(param, "timeStep", optional) if val is not None: self.timeStep = val val = getParam(param, "timeWindow", optional) if val is not None: self.timeWindow = val val = getParam(param, "timeShift", optional) if val is not None: self.timeShift = val HmacTokenClass.update(self, param) self.addToTokenInfo("timeWindow", self.timeWindow) self.addToTokenInfo("timeShift", self.timeShift) self.addToTokenInfo("timeStep", self.timeStep) self.addToTokenInfo("hashlib", self.hashlibStr) log.debug("[update] end. Processing the initialization parameters done.") return
[docs] def check_otp_exist(self, otp, window=10): ''' checks if the given OTP value is/are values of this very token. This is used to autoassign and to determine the serial number of a token. :param otp: the to be verified otp value :type otp: string :param window: the lookahead window for the counter :type window: int :return: counter or -1 if otp does not exist :rtype: int ''' log.debug("[check_otp_exist] begin. checks if the given OTP value exists: otp %r, window %r " % (otp, window)) res = -1 try: counter = int(self.token.LinOtpCount) except ValueError as ex: log.warning("[check_otp_exist] a value error occurred while converting: counter %r : ValueError: %r ret: %r " % (self.token.LinOtpCount, ex, res)) return res res = self.checkOtp(otp, counter, range) return res
def _time2counter_(self, T0, timeStepping=60): rnd = 0.5 counter = int((T0 / timeStepping) + rnd) return counter def _counter2time_(self, counter, timeStepping=60): rnd = 0.5 T0 = (float(counter) - rnd) * timeStepping return T0 def _getTimeFromCounter(self, counter, timeStepping=30, rnd=1): idate = int(counter - rnd) * timeStepping ddate = datetime.datetime.fromtimestamp(idate / 1.0) return ddate
[docs] def time2float(self, curTime): ''' time2float - convert a datetime object or an datetime sting into a float s. http://bugs.python.org/issue12750 :param curTime: time in datetime format :type curTime: datetime object :return: time as float :rtype: float ''' log.debug("[time2float] begin. Convert a datetime object: curTime %r" % (curTime)) dt = datetime.datetime.now() if type(curTime) == datetime.datetime: dt = curTime elif type(curTime) == unicode: if '.' in curTime: tFormat = "%Y-%m-%d %H:%M:%S.%f" else: tFormat = "%Y-%m-%d %H:%M:%S" try: dt = datetime.datetime.strptime(curTime, tFormat) except Exception as ex: log.error('[time2float] Error during conversion of datetime: %r' % (ex)) log.error("[time2float] %r" % traceback.format_exc()) raise Exception(ex) else: log.error("[time2float] invalid curTime: %s. You need to specify a datetime.datetime" % type(curTime)) raise Exception("[time2float] invalid curTime: %s. You need to specify a datetime.datetime" % type(curTime)) td = (dt - datetime.datetime(1970, 1, 1)) ## for python 2.6 compatibility, we have to implement 2.7 .total_seconds():: ## TODO: fix to float!!!! tCounter = ((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) * 1.0) / 10 ** 6 log.debug("[time2float] end. Datetime object converted: tCounter %r" % (tCounter)) return tCounter
[docs] def checkOtp(self, anOtpVal, counter, window, options=None): ''' checkOtp - validate the token otp against a given otpvalue :param anOtpVal: the to be verified otpvalue @type anOtpVal: string :param counter: the counter state, that should be verified :type counter: int :param window: the counter +window, which should be checked :type window: int :param options: the dict, which could contain token specific info :type options: dict :return: the counter state or -1 :rtype: int ''' log.debug("[checkOtp] begin. Validate the token otp: anOtpVal: %r ,\ counter: %r,window: %r, options: %r " % (anOtpVal, counter, window, options)) try: otplen = int(self.token.LinOtpOtpLen) except ValueError as e: raise e secretHOtp = self.token.getHOtpKey() self.hashlibStr = self.getFromTokenInfo("hashlib", self.hashlibStr) timeStepping = int(self.getFromTokenInfo("timeStep", self.timeStep)) window = int(self.getFromTokenInfo("timeWindow", self.timeWindow)) shift = int(self.getFromTokenInfo("timeShift", self.timeShift)) ## oldCounter we have to remove one, as the normal otp handling will increment oCount = self.getOtpCount() - 1 initTime = -1 if options != None and type(options) == dict: initTime = int(options.get('initTime', -1)) if oCount < 0: oCount = 0 log.debug("[checkOTP] timestep: %i, timeWindow: %i, timeShift: %i" % (timeStepping, window, shift)) inow = int(time.time()) T0 = time.time() + shift if initTime != -1: T0 = int(initTime) log.debug("[checkOTP] T0 : %i" % T0) counter = self._time2counter_(T0, timeStepping=timeStepping) otime = self._getTimeFromCounter(oCount, timeStepping=timeStepping) ttime = self._getTimeFromCounter(counter, timeStepping=timeStepping) log.debug("[checkOTP] last log: %r :: %r" % (oCount, otime)) log.debug("[checkOTP] counter : %r :: %r <==> %r" % (counter, ttime, datetime.datetime.now())) log.debug("[checkOTP] shift : %r " % (shift)) hmac2Otp = HmacOtp(secretHOtp, counter, otplen, self.getHashlib(self.hashlibStr)) res = hmac2Otp.checkOtp(anOtpVal, int (window / timeStepping), symetric=True) log.debug("[checkOTP] comparing the result %i to the old counter %i." % (res, oCount)) if res != -1 and oCount != 0 and res <= oCount: if initTime == -1: log.warning("[checkOTP] a previous OTP value was used again!\n former tokencounter: %i, presented counter %i" % (oCount, res)) res = -1 return res if -1 == res : ## autosync: test if two consecutive otps have been provided res = self.autosync(hmac2Otp, anOtpVal) if res != -1: ## on success, we have to save the last attempt self.setOtpCount(counter) # # here we calculate the new drift/shift between the server time and the tokentime # tokentime = self._counter2time_(res, timeStepping) tokenDt = datetime.datetime.fromtimestamp(tokentime / 1.0) nowDt = datetime.datetime.fromtimestamp(inow / 1.0) lastauth = self._counter2time_(oCount, timeStepping) lastauthDt = datetime.datetime.fromtimestamp(lastauth / 1.0) log.debug("[checkOTP] last auth : %r" % (lastauthDt)) log.debug("[checkOTP] tokentime : %r" % (tokenDt)) log.debug("[checkOTP] now : %r" % (nowDt)) log.debug("[checkOTP] delta : %r" % (tokentime - inow)) new_shift = (tokentime - inow) log.debug("[checkOTP] the counter %r matched. New shift: %r" % (res, new_shift)) self.addToTokenInfo('timeShift', new_shift) log.debug("[checkOtp] end. otp verification result was: res %r" % (res)) return res
[docs] def autosync(self, hmac2Otp, anOtpVal): ''' auto - sync the token based on two otp values - internal method to realize the autosync within the checkOtp method :param hmac2Otp: the hmac object (with reference to the token secret) :type hmac2Otp: hmac object :param anOtpVal: the actual otp value :type anOtpVal: string :return: counter or -1 if otp does not exist :rtype: int ''' log.debug("[autosync] begin. Autosync the token, based on: hmac2Otp: %r, anOtpVal: %r" % (hmac2Otp, anOtpVal)) res = -1 autosync = False try: async = getFromConfig("AutoResync") if async is None: autosync = False elif "true" == async.lower(): autosync = True elif "false" == async.lower(): autosync = False except Exception as e: log.error('autosync check failed %r' % e) return res ' if autosync is not enabled: do nothing ' if False == autosync: return res info = self.getTokenInfo(); syncWindow = self.getSyncWindow() #check if the otpval is valid in the sync scope res = hmac2Otp.checkOtp(anOtpVal, syncWindow, symetric=True) log.debug("[autosync] found otpval %r in syncwindow (%r): %r" % (anOtpVal, syncWindow, res)) #if yes: if res != -1: # if former is defined if (info.has_key("otp1c")): #check if this is consecutive otp1c = info.get("otp1c"); otp2c = res log.debug("[autosync] otp1c: %r, otp2c: %r" % (otp1c, otp2c)) diff = math.fabs(otp2c - otp1c) if (diff > self.resyncDiffLimit): res = -1 else: T0 = time.time() timeStepping = int(self.getFromTokenInfo("timeStep")) counter = int((T0 / timeStepping) + 0.5) shift = otp2c - counter info["timeShift"] = shift self.setTokenInfo(info) ## now clean the resync data del info["otp1c"] self.setTokenInfo(info) else: log.debug("[autosync] setting otp1c: %s" % res) info["otp1c"] = res self.setTokenInfo(info) res = -1 if res == -1: msg = "call was not successful" else: msg = "call was successful" log.debug("[autosync] end. %r: res %r" % (msg, res)) return res
[docs] def resync(self, otp1, otp2, options=None): ''' resync the token based on two otp values - external method to do the resync of the token :param otp1: the first otp value :type otp1: string :param otp2: the second otp value :type otp2: string :param options: optional token specific parameters :type options: dict or None :return: counter or -1 if otp does not exist :rtype: int ''' log.debug("[resync] .begin. Resync the token based on: %r, anOtpVal: %r, options: %r" % (otp1, otp2, options)) ret = False try: otplen = int(self.token.LinOtpOtpLen) except ValueError: return ret secretHOtp = self.token.getHOtpKey() self.hashlibStr = self.getFromTokenInfo("hashlib", 'sha1') timeStepping = int(self.getFromTokenInfo("timeStep", 30)) shift = int(self.getFromTokenInfo("timeShift", 0)) try: window = int(self.token.LinOtpSyncWindow) * timeStepping except: window = 10 * timeStepping log.debug("[resync] timestep: %r, syncWindow: %r, timeShift: %r" % (timeStepping, window, shift)) T0 = time.time() + shift log.debug("[resync] T0 : %i" % T0) counter = int((T0 / timeStepping) + 0.5) # T = (Current Unix time - T0) / timeStepping log.debug("[resync] counter (current time): %i" % counter) oCount = self.getOtpCount() log.debug("[resync] tokenCounter: %r" % oCount) log.debug("[resync] now checking window %s, timeStepping %s" % (window, timeStepping)) # check 2nd value hmac2Otp = HmacOtp(secretHOtp, counter, otplen, self.getHashlib(self.hashlibStr)) log.debug("[resync] %s in otpkey: %s " % (otp2, secretHOtp)) res2 = hmac2Otp.checkOtp(otp2, int (window / timeStepping), symetric=True) #TEST -remove the 10 log.debug("[resync] res 2: %r" % res2) # check 1st value hmac2Otp = HmacOtp(secretHOtp, counter - 1, otplen, self.getHashlib(self.hashlibStr)) log.debug("[resync] %s in otpkey: %s " % (otp1, secretHOtp)) res1 = hmac2Otp.checkOtp(otp1, int (window / timeStepping), symetric=True) #TEST -remove the 10 log.debug("[resync] res 1: %r" % res1) if res1 < oCount: # A previous OTP value was used again! log.warning("[resync] a previous OTP value was used again! tokencounter: %i, presented counter %i" % (oCount, res1)) res1 = -1 if res1 != -1 and res1 + 1 == res2: # here we calculate the new drift/shift between the server time and the tokentime tokentime = (res2 + 0.5) * timeStepping currenttime = T0 - shift new_shift = (tokentime - currenttime) log.debug("[resync] the counters %r and %r matched. New shift: %r" % (res1, res2, new_shift)) self.addToTokenInfo('timeShift', new_shift) # The OTP value that was used for resync must not be used again! self.setOtpCount(res2 + 1) ret = True if ret == True: msg = "resync was successful" else: msg = "resync was not successful" log.debug("[resync] end. %s: ret: %r" % (msg, ret)) return ret
[docs] def getSyncTimeOut(self): ''' get the token sync timeout value :return: timeout value in seconds :rtype: int ''' timeOut = int(getFromConfig("AutoResyncTimeout", 5 * 60)) return timeOut
[docs] def getOtp(self, curTime=None): ''' get the next OTP value :return: next otp value :rtype: string ''' log.debug("[getOtp] begin. Get the next OTP value for: curTime: %r" % (curTime)) res = (-1, 0, 0, 0) otplen = int(self.token.LinOtpOtpLen) secretHOtp = self.token.getHOtpKey() self.hashlibStr = self.getFromTokenInfo("hashlib", "sha1") timeStepping = int(self.getFromTokenInfo("timeStep", 30)) shift = int(self.getFromTokenInfo("timeShift", 0)) hmac2Otp = HmacOtp(secretHOtp, self.getOtpCount(), otplen, self.getHashlib(self.hashlibStr)) tCounter = self.time2float(datetime.datetime.now()) if curTime: tCounter = self.time2float(curTime) ## we don't need to round here as we have alread float counter = int(((tCounter - shift) / timeStepping)) otpval = hmac2Otp.generate(counter=counter, inc_counter=False) pin = self.token.getPin() combined = "%s%s" % (otpval, pin) if getFromConfig("PrependPin") == "True" : combined = "%s%s" % (pin, otpval) log.debug("[getOtp] end. Return opt is: (pin: %r, otpval: %r, combined: %r) " % (pin, otpval, combined)) return (1, pin, otpval, combined)
[docs] def get_multi_otp(self, count=0, epoch_start=0, epoch_end=0, curTime=None): ''' return a dictionary of multiple future OTP values of the HOTP/HMAC token :param count: how many otp values should be returned :type count: int :return: tuple of status: boolean, error: text and the OTP dictionary ''' log.debug("[get_multi_otp] begin. Get a dictionary of multiple future OTP values for: count: %r, epoch_start: %r, epoch_end: %r, curTime: %r" % (count, epoch_start, epoch_end, curTime)) otp_dict = {"type" : "TOTP", "otp": {}} ret = False error = "No count specified" try: otplen = int(self.token.LinOtpOtpLen) except ValueError: return ret secretHOtp = self.token.getHOtpKey() self.hashlibStr = self.getFromTokenInfo("hashlib", "sha1") timeStepping = int(self.getFromTokenInfo("timeStep", 30)) shift = int(self.getFromTokenInfo("timeShift", 0)) hmac2Otp = HmacOtp(secretHOtp, self.getOtpCount(), otplen, self.getHashlib(self.hashlibStr)) tCounter = self.time2float(datetime.datetime.now()) if curTime: tCounter = self.time2float(curTime) ## we don't need to round here as we have alread float counter = int(((tCounter - shift) / timeStepping)) otp_dict["shift"] = shift otp_dict["timeStepping"] = timeStepping if count > 0: for i in range(0, count): otpval = hmac2Otp.generate(counter=counter + i, inc_counter=False) timeCounter = ((counter + i) * timeStepping) + shift otp_dict["otp"][ counter + i] = { 'otpval' : otpval, 'time' : datetime.datetime.fromtimestamp(timeCounter).strftime("%Y-%m-%d %H:%M:%S"), } ret = True log.debug("[get_multi_otp] end. dictionary of multiple future OTP is: otp_dict: %r - status: %r - error %r" % (ret, error, otp_dict)) return (ret, error, otp_dict)