# -*- coding: utf-8 -*-
#
# LinOTP - the open source solution for two factor authentication
# Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
# This file is part of LinOTP server.
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public
# License, version 3, as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the
# GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
# E-mail: linotp@lsexperts.de
# Contact: www.linotp.org
# Support: www.lsexperts.de
#
'''
OcraSuite = CryptoFunction(K, DataInput)
CryptoFunction = HOTP-H-n
H in { SHA-1, SHA256, SHA512 }
n - truncation 4-10 or 0 (= no truncation)
DataInput = OCRASuite | 00 | C | Q | P | S | T
OCRASuite - mode of operation
C - Counter (optional)
Q - Challenge
P - hashed password/pin (optional)
S - session information (optional)
T - timestamp (optional)
[C] | QFxx | [PH | Snnn | TG] : Challenge-Response computation
[C] | QFxx | [PH | TG] : Plain Signature computation
Each input that is used for the computation is represented by a
single letter (except Q) as and they are separated by a hyphen.
+ Q: Challenge +
The input for challenge is further qualified by the formats supported
by the client for challenge question(s). Supported values can be:
+------------------+-------------------+
| Format (F) | Up to Length (xx) |
+------------------+-------------------+
| A (alphanumeric) | 04-64 |
| N (numeric) | 04-64 |
| H (hexadecimal) | 04-64 |
+------------------+-------------------+
Table 2: Challenge Format Table
The default challenge format is N08, numeric and up to 8 digits.
+ P: Pin +
The input for P is further qualified by the hash function used for
the PIN/password. Supported values for hash function can be:
Hash function (H) - SHA1, SHA256, SHA512.
The default hash function for P is SHA1.
+ S: Session +
The input for S is further qualified by the length of the session
data in bytes. The client and server could agree to any length but
the typical values are:
Length (nnn) - 064, 128, 256, and 512.
The default length is 064 bytes.
The input for timestamps is further qualified by G, size of the time-
step. G can be specified in number of seconds, minutes, or hours:
+--------------------+------------------------------+
| Time-Step Size (G) | Examples |
+--------------------+------------------------------+
| [1-59]S | number of seconds, e.g., 20S |
| [1-59]M | number of minutes, e.g., 5M |
| [0-48]H | number of hours, e.g., 24H |
+--------------------+------------------------------+
Table 3: Time-step Size Table
Default value for G is 1M, i.e., time-step size is one minute and the
T represents the number of minutes since epoch time [UT].
OCRASuite = 'Algorithm:CryptoFunction:DataInput'
"OcraSuite-1:HOTP-SHA512-8:C-QN08-PSHA1"
OcraSuite-1: OcraSuite Version 1
HOTP-SHA512-8: HTOP mit SHA512 verkuerzt auf 8 Ziffern
C-QN08-PSHA1:
C: mit counter
QN08: numerische Challenge bis zu 8 Ziffern
PSHA1: SHA1 des Passworts
"OcraSuite-1:HOTP-SHA256-6:QA10-T1M"
OcraSuite-1: OcraSuite Version 1
HOTP-SHA256-6: HTOP mit SHA256 verkuerzt auf 6 Ziffern
QA10-T1M:
QA10: alphanumerische Challenge bis zu 10 Zeichen.
T1M: Timestamp Counter (time step = 1 minute)
'ocrasuite' in unit tests:
OcraSuite-1:HOTP-SHA1-6:QN08
OcraSuite-1:HOTP-SHA256-8:QA08
OcraSuite-1:HOTP-SHA256-8:QN08-PSHA1
OcraSuite-1:HOTP-SHA512-8:C-QN08
OcraSuite-1:HOTP-SHA256-8:C-QN08-PSHA1
OcraSuite-1:HOTP-SHA512-8:QN08-T1M
OcraSuite-1:HOTP-SHA512-8:QA10-T1M
When computing a response, the concatenation order is always the
following:
C | OTHER-PARTY-GENERATED-CHALLENGE-QUESTION | YOUR-GENERATED-CHALLENGE-QUESTION | P| S | T
If a value is empty (i.e., a certain input is not used in the
computation) then the value is simply not represented in the string.
The counter on the token or client MUST be incremented every time a
new computation is requested by the user. The server's counter value
MUST only be incremented after a successful OcraSuite authentication.
CLIENT SERVER
(PROVER) VERIFIER)
| |
| Verifier sends challenge to prover |
| Challenge = Q |
|<---------------------------------------|
| |
| Prover Computes Response |
| R = OcraSuite(K, {[C] | Q | [P | S | T]}) |
| Prover sends Response = R |
|--------------------------------------->|
| |
| Verifier Validates Response |
| If Response is valid, Server sends OK |
| If Response is not, Server sends NOK |
|<---------------------------------------|
| |
CLIENT SERVER
(PROVER) (VERIFIER)
| |
| 1. Client sends client-challenge |
| QC = Client-challenge |
|------------------------------------------------->|
| |
| 2. Server computes server-response |
| and sends server-challenge |
| RS = OcraSuite(K, [C] | QC | QS | [S | T]) |
| QS = Server-challenge |
| Response = RS, QS |
|<-------------------------------------------------|
| |
| 3. Client verifies server-response |
| and computes client-response |
| OcraSuite(K, [C] | QC | QS | [S | T]) != RS -> STOP |
| RC = OcraSuite(K, [C] | QS | QC | [P | S | T]) |
| Response = RC |
|------------------------------------------------->|
| |
| 4. Server verifies client-response |
| OcraSuite(K, [C] | QS | QC | [P|S|T]) != RC -> STOP |
| Response = OK |
|<-------------------------------------------------|
| |
'''
import binascii
from datetime import datetime
import hashlib
import hmac
import re
from linotp.lib.crypt import urandom
## for the hmac algo, we have to check the python version
import sys
(ma, mi, _, _, _,) = sys.version_info
pver = float(int(ma) + int(mi) * 0.1)
import logging
log = logging.getLogger(__name__)
[docs]def is_int(v):
try:
int(v)
return True
except ValueError:
return False
[docs]def truncated_value(h):
bytes = map(ord, h)
offset = bytes[-1] & 0xf
v = (bytes[offset] & 0x7f) << 24 | (bytes[offset + 1] & 0xff) << 16 | \
(bytes[offset + 2] & 0xff) << 8 | (bytes[offset + 3] & 0xff)
return v
[docs]def dec(h, p):
v = unicode(truncated_value(h))
if len(v) < p: v = (p - len(v)) * "0" + v
return v[len(v) - p:]
[docs]def int2beint64(i):
hex_counter = hex(long(i))[2:-1]
hex_counter = '0' * (16 - len(hex_counter)) + hex_counter
bin_counter = binascii.unhexlify(hex_counter)
return bin_counter
[docs]def bytearray_to_bytes(a_bytearray):
return bytes([a_byte for a_byte in a_bytearray])
PERIODS = { 'H': 3600, 'M': 60, 'S': 1 }
[docs]class OcraSuite():
'''
OCRA-1:HOTP-SHA1-6:QN08
OCRA-1:HOTP-SHA256-8:QA08
OCRA-1:HOTP-SHA256-8:QN08-PSHA1
OCRA-1:HOTP-SHA512-8:C-QN08
OCRA-1:HOTP-SHA256-8:C-QN08-PSHA1
OCRA-1:HOTP-SHA512-8:QN08-T1M
OCRA-1:HOTP-SHA512-8:QA10-T1M
'''
def __init__(self, ocrasuite, secretObject=None):
self.secretObj = secretObject
self.C = None
self.Q = None
self.P = None
self.S = None
self.T = None
self.ocrasuite_description = ocrasuite
(version, crypto, caller) = ocrasuite.split(':')
## version check
if version.upper() != 'OCRA-1':
raise Exception('unsupported ocra version')
## crypto algo
(hotpStr, hash, trunc) = crypto.split('-')
if hotpStr.upper() != 'HOTP':
raise Exception('unsupported hash version: %s' % (unicode(hotpStr)))
self.hash_algo = self._getCrypto(hash.lower())
self.truncation = self._getTruncation(trunc)
## communication QA10-T1M or C-QN08-PSHA1 . . . QA99??
params = caller.split('-')
for param in params:
## set and verify the counter
if param[0] == 'C':
self.C = param
elif param[0] == 'Q':
## verify the challenge description
self.Q = ('N', 8)
if len(param[1:]) > 0:
if param[1] not in 'ANH':
raise ValueError
length = int(param[2:])
## the spec says only max 64
if length < 4 or length > 64:
raise ValueError
self.Q = (param[1], length)
elif param[0] == 'S':
## verify and set session description
self.S = 64
S = param[1:]
if S not in ['064', '128', '256', '512']:
raise ValueError, ('Unknown session length %s' % (S))
self.S = int(S)
elif param[0] == 'P':
## verify and set Pin hash algo
self.P = self._getCrypto(param[1:] or 'SHA1')
elif param[0] == 'T':
## verify and set timestep parameter
complement = param[1:] or '1M'
try:
length = 0
if not re.match('^(\d+[HMS])+$', complement):
raise ValueError
parts = re.findall('\d+[HMS]', complement)
for part in parts:
period = part[-1]
quantity = int(part[:-1])
length += quantity * PERIODS[period]
self.T = length
except ValueError:
raise ValueError, ('Invalid timestamp descriptor', complement)
[docs] def compute(self, data, key=None):
'''
Compute an HOTP digest using the given key and data input and
following the current crypto function description.
'''
h_data = binascii.hexlify(data)
try:
data_input = bytearray(str(self.ocrasuite_description + '\0'))
for d in data:
data_input.append(d)
except Exception as e:
log.error('Failed to encode data %r: \n%r' % (e, h_data))
## call the secret object to get the object
## convert it to binary
## from linotp.lib.crypt import SecretObj
h = None
if pver <= 2.6:
data_input = str(data_input)
if self.secretObj is not None:
h = self.secretObj.hmac_digest(data_input, self.hash_algo)
else:
bkey = key
'''' akey = binascii.hexlify(bkey) '''
h = hmac.new(bkey, data_input, self.hash_algo).digest()
if self.truncation:
ret = dec(h, self.truncation)
else:
ret = str(truncated_value(h))
return ret
[docs] def signData(self, data, key=None):
h = None
if key is None:
h = self.secretObj.hmac_digest(data, self.hash_algo)
else:
h = hmac.new(key, data, self.hash_algo).digest()
h_out = binascii.hexlify(h)
return h_out
def _getCrypto(self, description):
'''
Convert the name of a hash algorithm as described in the OATH
specifications, to a python object handling the digest algorithm
interface
'''
algo = getattr(hashlib, description.lower(), None)
if not callable(algo):
raise ValueError, ('Unknown hash algorithm', description)
return algo
def _getTruncation(self, trunc):
truncation_length = 8
try:
truncation_length = int(trunc)
if truncation_length < 0 or truncation_length > 10:
raise ValueError
except ValueError:
raise ValueError, ('Invalid truncation length', trunc)
return truncation_length
###############################################################################
## runtime method
###############################################################################
[docs] def combineData(self, C=None, Q=None, P=None, P_digest=None, S=None, T=None, T_precomputed=None, Qsc=None):
datainput = ''
if self.C is not None:
datainput += self._addCounter(C)
if Q is not None:
datainput += self._addChallenge(Q)
if self.P is not None:
datainput += self._addPin(P, P_digest)
if self.S is not None:
datainput += self._addSession(S)
if self.T is not None:
datainput += self._addTimeStr(T, T_precomputed)
#log.error('datainput: %s' % binascii.hexlify(datainput))
return datainput
def _addCounter(self, C):
datainput = ''
if self.C:
try:
C = int(C)
if C < 0 or C > 2 ** 64:
raise Exception()
except:
raise ValueError, ('Invalid counter value', C)
datainput = int2beint64(int(C))
return datainput
def _addChallenge(self, Q):
datainput = ''
if self.Q:
max_length = self.Q[1]
## do some sanity checks
if Q is None or len(Q) == 0:
raise ValueError('challenge is empty : %s' % (str(Q)))
if type(Q) == unicode:
## this might raise an ascii conversion exception
Q = str(Q)
if not isinstance(Q, str):
raise ValueError('challenge is no string: %s' % (str(Q)))
if len(Q) > max_length:
raise ValueError('challenge is to long: %s' % (str(Q)))
if self.Q[0] == 'N' and not Q.isdigit():
raise ValueError('challenge is not digits only: %s' % (Q))
if self.Q[0] == 'A' and not Q.isalnum():
raise ValueError('challenge is not alpha-numeric: %s' % (Q))
if self.Q[0] == 'H':
try:
int(Q, 16)
except ValueError:
raise ValueError('challenge is hex only: %s' % (Q))
## now encode the challenge acordingly
if self.Q[0] == 'N':
Q = hex(int(Q))[2:]
Q += '0' * (len(Q) % 2)
Q = Q.decode('hex')
elif self.Q[0] == 'H':
Q = Q.decode('hex')
elif self.Q[0] == 'A':
## nothing to do - take and append
pass
datainput = Q
datainput += '\0' * (128 - len(Q))
#log.error("Q %s" % (binascii.hexlify(datainput)))
return datainput
def _addPin(self, P=None, P_digest=None):
datainput = ''
if self.P:
if P_digest:
if len(P) == self.P.digest_size:
datainput += P_digest
elif len(P) == 2 * self.P.digest_size:
datainput += P_digest.decode('hex')
else:
raise ValueError, ('Pin/Password digest invalid', P_digest)
elif P is None:
raise ValueError, 'Pin/Password missing'
else:
datainput = self.P(P).digest()
#log.error("P %s" % (binascii.hexlify(datainput)))
return datainput
def _addSession(self, S):
datainput = ''
if self.S:
if S is None or len(S) > self.S:
raise ValueError, 'session'
datainput = S
datainput += '\0' * (self.S - len(S))
log.error("S %s" % (binascii.hexlify(datainput)))
return datainput
def _addTimeStr(self, T=None, T_precomputed=None):
datainput = ''
if self.T:
if T_precomputed is not None and is_int(T_precomputed):
#t = time.gmtime(T_precomputed*60)
#timestr = time.strftime('%Y-%m-%d:%H:%M:%S',t)
data = int2beint64(int(T_precomputed))
elif is_int(T):
#t = time.gmtime(T)
#timestr = time.strftime('%Y-%m-%d:%H:%M:%S',t)
data = int2beint64(int(T / self.T))
else:
raise ValueError, 'time format error'
datainput += data
#log.error("T %s" % (binascii.hexlify(datainput)))
return datainput
[docs] def data2hashChallenge(self, data):
c_type = self.Q[0]
c_len = self.Q[1]
challenge_bin = self.hash_algo(data)
challenge_hex = challenge_bin.hexdigest()
if c_type == 'A':
challenge = challenge_hex[:c_len]
elif c_type == 'H':
challenge = challenge_hex[:c_len * 2]
elif c_type == 'N':
challenge_num = int(challenge_hex[:c_len], 16)
challenge = unicode(challenge_num)
if len(challenge) < c_len:
challenge += '\0' * (c_len - len(challenge))
challenge = challenge[:c_len]
return unicode(challenge)
[docs] def data2rawChallenge(self, data):
c_type = self.Q[0]
c_len = self.Q[1]
challenge = ''
chall = self.data2randomChallenge(data)
if c_type == 'A':
for c in data:
if c.isalnum() and ord(c) < 128:
challenge += c
elif c_type == 'N':
for c in data:
if c.isdigit():
challenge += c
elif c_type == 'H':
for c in data:
if c.isdigit() or c.lower() in ['a', 'b', 'c', 'd', 'e', 'f']:
challenge += c
for i in range(0, 4):
if len(challenge) < c_len:
challenge += '0'
c_ = len(challenge)
challenge += chall[c_:]
challenge = challenge[:c_len]
return unicode(challenge)
[docs] def data2randomChallenge(self, data):
'''
build a random challenge according to the challenge definition
'''
alphnum = 'abcdefghijklmnopqrstuvwxyz' + 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' + '0123456789'
digits = '0123456789'
hex = digits + 'abcdef'
challenge = ''
c_type = self.Q[0]
c_len = self.Q[1]
if c_type == 'A':
for c in range(0, c_len):
challenge += urandom.choice(alphnum)
elif c_type == 'N':
for c in range(0, c_len):
challenge += urandom.choice(digits)
elif c_type == 'H':
for c in range(0, c_len):
challenge += urandom.choice(hex)
challenge = challenge[:c_len]
return unicode(challenge)
[docs] def checkOtp(self, passw, counter, window, ocraChallenge, pin='', options=None, timeshift=0):
'''
check the given passw
:param passw: the otp to verified
:param counter: the start counter from the token
:param window: the range, within the counter should be checked
:param challenge: the ocra challenge, which goes into the otp calculation
:param pin: the ocra token pin
:param options: support to identifies nonsequential otp verification
:param timeshif: for timebased tokens we support time offsets
:return: counter of match - otherwise -1
'''
ret = -1
## std counter for tokens w.o. timer or counter
start = counter
end = counter + 1
step = 1
## callculate the start for the timer based tokens
if self.T is not None:
ttime = datetime.now()
ftime = ttime.strftime("%s")
otime = int(ftime) + timeshift
step = 1 #self.T
start = otime - (window * self.T)
end = otime + (window * self.T)
## counter preserves the last access time, so we dont allow lookup in the past
if start < counter:
## support the assynchronous handling of transactions
if options is not None and options.has_key('transactionid'):
pass
else:
start = counter
## callculate the start for the counter based tokens
if self.C is not None:
start = counter
end = counter + window
if options is not None and options.has_key('transactionid'):
## in case of a provided transactionid, we scroll back in counter
## to support asynchronous transaction verification
if counter > window / 2:
start = counter - window / 2
else:
start = 0
sdate = datetime.fromtimestamp(start)
edate = datetime.fromtimestamp(end)
log.debug('[OcraSuite:checkOtp] lookup for timerange: %r - %r '
% (sdate, edate))
## finally do the check of the otps
session = ''
if ocraChallenge.has_key('session'):
session = ocraChallenge.get('session')
## required - will raise exception, if not present
challenge = ocraChallenge.get('challenge')
idx = challenge.find(':')
if idx != -1:
challenge = challenge[idx + 1:]
param = {}
param['Q'] = unicode(challenge)
param['P'] = unicode(pin)
param['S'] = unicode(session)
for count in range(start, end, step):
if self.C is not None:
param['C'] = count
if self.T is not None:
param['T'] = count
c_data = self.combineData(**param)
otp = self.compute(c_data)
if passw == otp:
ret = count
break
## support some logging at the end
if ret == -1:
if self.T is not None:
sdate = datetime.fromtimestamp(start)
edate = datetime.fromtimestamp(end)
log.info('[OcraSuite:checkOtp] failed for otp val %r :(exp %r)'
' for timerange: %r - %r ' % (otp, passw, sdate, edate))
else:
log.info('[OcraSuite:checkOtp] failed for otp val %r :(exp %r)'
' for range: %r - %r' % (otp, passw, start, end))
return ret
[docs]def main():
import struct
#ocrasuite = 'OcraSuite-1:HOTP-SHA256-8:C-QN08-S128-PSHA1'
ocrasuite = 'OCRA-1:HOTP-SHA256-8:QA08'
key = '3132333435363738393031323334353637383930313233343536373839303132'.decode('hex')
pin = '1234'
session = 'Kontonummer:1234568|BLZ:5675522|Betrag:1234343434,99'
challenge = '12345678'
counter = 11
param = {}
param['C'] = counter
param['Q'] = challenge
param['P'] = pin
param['S'] = session
ocra = OcraSuite(ocrasuite)
data = ocra.combineData(**param)
otp = ocra.compute(key, data)
data = struct.pack(">Q", counter)
print "data %r" % binascii.hexlify(data)
print "otp %r" % otp
if __name__ == '__main__':
'''
devel hook - to be removed later
'''
main()
#eof###########################################################################