# -*- coding: utf-8 -*-
#
# LinOTP - the open source solution for two factor authentication
# Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
# This file is part of LinOTP server.
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public
# License, version 3, as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the
# GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
# E-mail: linotp@lsexperts.de
# Contact: www.linotp.org
# Support: www.lsexperts.de
#
"""
This file contains the database definition / database model for linotp objects
"""
import binascii
import logging
import traceback
from datetime import datetime
try:
import json
except ImportError:
import simplejson as json
"""The application's model objects"""
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relation
import linotp
from linotp.model import meta
from linotp.model.meta import Session
from linotp.model.meta import MetaData
from linotp.lib.crypt import geturandom
from linotp.lib.crypt import encrypt, hash, SecretObj
from linotp.lib.crypt import encryptPin
from linotp.lib.crypt import decryptPin
from linotp.lib.crypt import get_rand_digit_str
from pylons import config
log = logging.getLogger(__name__)
implicit_returning = config.get('linotpSQL.implicit_returning', True)
## for oracle we need a mapping of columns
## due to reserved keywords 'session' and 'timestamp'
COL_PREFIX = ""
SQLU = config.get("sqlalchemy.url", "")
if SQLU.startswith("oracle:"):
COL_PREFIX = config.get("oracle.sql.column_prefix", "lino")
session_column = "%ssession" % COL_PREFIX
timestamp_column = "%stimestamp" % COL_PREFIX
[docs]def init_model(engine):
"""
init_model binds the table objects to the class objects
- to be called before using any of the tables or classes in the model!!!
:param engine: the sql engine
"""
log.debug('model init: init_model')
meta.engine = engine
meta.Session.configure(bind=engine)
log.debug('model init: init_model')
return
token_table = sa.Table('Token', meta.metadata,
sa.Column('LinOtpTokenId', sa.types.Integer(), sa.Sequence('token_seq_id', optional=True), primary_key=True, nullable=False),
sa.Column('LinOtpTokenDesc', sa.types.Unicode(80), default=u''),
sa.Column('LinOtpTokenSerialnumber', sa.types.Unicode(40), default=u'', unique=True, nullable=False, index=True),
sa.Column('LinOtpTokenType', sa.types.Unicode(30), default=u'HMAC', index=True),
sa.Column('LinOtpTokenInfo', sa.types.Unicode(2000), default=u''),
sa.Column('LinOtpTokenPinUser', sa.types.Unicode(512), default=u''), ## encrypt
sa.Column('LinOtpTokenPinUserIV', sa.types.Unicode(32), default=u''), ## encrypt
sa.Column('LinOtpTokenPinSO', sa.types.Unicode(512), default=u''), ## encrypt
sa.Column('LinOtpTokenPinSOIV', sa.types.Unicode(32), default=u''), ## encrypt
sa.Column('LinOtpIdResolver', sa.types.Unicode(120), default=u'', index=True),
sa.Column('LinOtpIdResClass', sa.types.Unicode(120), default=u''),
sa.Column('LinOtpUserid', sa.types.Unicode(320), default=u'', index=True),
sa.Column('LinOtpSeed', sa.types.Unicode(32), default=u''),
sa.Column('LinOtpOtpLen', sa.types.Integer(), default=6),
sa.Column('LinOtpPinHash', sa.types.Unicode(512), default=u''), ## hashed
sa.Column('LinOtpKeyEnc', sa.types.Unicode(1024), default=u''), ## encrypt
sa.Column('LinOtpKeyIV', sa.types.Unicode(32), default=u''),
sa.Column('LinOtpMaxFail', sa.types.Integer(), default=10),
sa.Column('LinOtpIsactive', sa.types.Boolean(), default=True),
sa.Column('LinOtpFailCount', sa.types.Integer(), default=0),
sa.Column('LinOtpCount', sa.types.Integer(), default=0),
sa.Column('LinOtpCountWindow', sa.types.Integer(), default=10),
sa.Column('LinOtpSyncWindow', sa.types.Integer(), default=1000),
implicit_returning=implicit_returning,
)
TOKEN_ENCODE = ["LinOtpTokenDesc", "LinOtpTokenSerialnumber",
"LinOtpTokenInfo", "LinOtpUserid", "LinOtpIdResClass",
"LinOtpIdResolver"]
[docs]class Token(object):
def __init__(self, serial):
log.debug(' __init__(%s)' % serial)
## self.LinOtpTokenId - will be generated DBType serial
self.LinOtpTokenSerialnumber = u'' + serial
self.LinOtpTokenType = u''
self.LinOtpCount = 0
self.LinOtpFailCount = 0
# get maxFail should have a configurable default
self.LinOtpMaxFail = 10
self.LinOtpIsactive = True
self.LinOtpCountWindow = 10
self.LinOtpOtpLen = 6
self.LinOtpSeed = u''
self.LinOtpIdResolver = None
self.LinOtpIdResClass = None
self.LinOtpUserid = None
# will be assigned automaticaly
# self.LinOtpTokenId = 0
log.debug('Token init done')
def __setattr__(self, name, value):
"""
to support unicode on all backends, we use the json encoder with
the assci encode default
:param name: db column name or class memeber
:param value: the corresponding value
:return: - nothing -
"""
if name in TOKEN_ENCODE:
## encode data
if value:
value = linotp.lib.crypt.uencode(value)
super(Token, self).__setattr__(name, value)
def __getattribute__(self, name):
"""
to support unicode on all backends, we use the json decoder with
the assci decode default
:param name: db column name or class memeber
:return: the corresponding value
"""
#Default behaviour
value = object.__getattribute__(self, name)
if name in TOKEN_ENCODE:
if value:
value = linotp.lib.crypt.udecode(value)
else:
value = ""
# port of the 2.6. resolver to 2.7
if name in ['LinOtpIdResClass']:
if value[:len('useridresolveree.')] == 'useridresolveree.':
value = "useridresolver.%s" % value[len('useridreseolveree.') - 1:]
return value
def _fix_spaces(self, data):
'''
On MS SQL server empty fields ("") like the LinOtpTokenInfo
are returned as a string with a space (" ").
This functions helps fixing this.
Also avoids running into errors, if the data is a None Type.
:param data: a string from teh database
:type data: usually a string
:return: a stripped string
'''
if data:
data = data.strip()
return data
[docs] def getSerial(self):
return self.LinOtpTokenSerialnumber
[docs] def setHKey(self, hOtpKey, reset_failcount=True):
log.debug('setHKey()')
iv = geturandom(16)
#bhOtpKey = binascii.unhexlify(hOtpKey)
enc_otp_key = encrypt(hOtpKey, iv)
self.LinOtpKeyEnc = unicode(binascii.hexlify(enc_otp_key))
self.LinOtpKeyIV = unicode(binascii.hexlify(iv))
self.LinOtpCount = 0
if True == reset_failcount:
self.LinOtpFailCount = 0
[docs] def setUserPin(self, userPin):
log.debug('setUserPin()')
iv = geturandom(16)
enc_userPin = encrypt(userPin, iv)
self.LinOtpTokenPinUser = unicode(binascii.hexlify(enc_userPin))
self.LinOtpTokenPinUserIV = unicode(binascii.hexlify(iv))
[docs] def getHOtpKey(self):
log.debug('getHOtpKey()')
key = binascii.unhexlify(self.LinOtpKeyEnc or '')
iv = binascii.unhexlify(self.LinOtpKeyIV or '')
secret = SecretObj(key, iv)
return secret
[docs] def getOtpCounter(self):
return self.LinOtpCount or 0
[docs] def getUserPin(self):
log.debug('getHOtpKey()')
pu = self.LinOtpTokenPinUser or ''
puiv = self.LinOtpTokenPinUserIV or ''
key = binascii.unhexlify(pu)
iv = binascii.unhexlify(puiv)
secret = SecretObj(key, iv)
return secret
[docs] def setHashedPin(self, pin):
log.debug('setHashedPin()')
seed = geturandom(16)
self.LinOtpSeed = unicode(binascii.hexlify(seed))
self.LinOtpPinHash = unicode(binascii.hexlify(hash(pin, seed)))
return self.LinOtpPinHash
[docs] def getHashedPin(self, pin):
# TODO: we could log the PIN here.
log.debug('getHashedPin()')
## calculate a hash from a pin
# Fix for working with MS SQL servers
# MS SQL servers sometimes return a '<space>' when the column is empty: ''
seed_str = self._fix_spaces(self.LinOtpSeed or '')
seed = binascii.unhexlify(seed_str)
hPin = hash(pin, seed)
log.debug("[getHashedPin] hPin: %s, pin: %s, seed: %s" %
(binascii.hexlify(hPin), pin, self.LinOtpSeed or ''))
return binascii.hexlify(hPin)
[docs] def setDescription(self, desc):
log.debug('setDescription(%s)' % desc)
if desc is None:
desc = ""
self.LinOtpTokenDesc = unicode(desc)
return self.LinOtpTokenDesc
[docs] def setOtpLen(self, otplen):
log.debug('setOtpLen %i' % int(otplen))
self.LinOtpOtpLen = int(otplen)
[docs] def setPin(self, pin, hashed=True):
# TODO: we could log the PIN here
log.debug("setPin()")
upin = ""
if pin != "" and pin is not None:
upin = pin
if hashed == True:
self.setHashedPin(upin)
log.debug("setPin(HASH:%r)" % self.LinOtpPinHash)
elif hashed == False:
self.LinOtpPinHash = "@@" + encryptPin(upin)
log.debug("setPin(ENCR:%r)" % self.LinOtpPinHash)
return self.LinOtpPinHash
[docs] def comparePin(self, pin):
log.debug("[comparePin] entering comparePin")
res = False
## check for a valid input
if pin is None:
log.error("[comparePin] no valid PIN!")
return res
if self.LinOtpPinHash:
if self.isPinEncrypted():
log.debug("[comparePin] we got an encrypted PIN!")
tokenPin = self.LinOtpPinHash[2:]
decryptTokenPin = decryptPin(tokenPin)
if decryptTokenPin == pin:
res = True
else:
log.debug("[comparePin] we got a hashed PIN!")
## is there a hashed pin
hashed_pin = self.getHashedPin(pin)
if hashed_pin == self.LinOtpPinHash:
res = True
else: ## token hashed pin is empyt or none
log.debug("[comparePin] there is no pin for this token!")
if pin == '':
res = True
return res
[docs] def deleteToken(self):
log.debug('deleteToken()')
## some dbs (eg. DB2) runs in deadlock, if the TokenRealm entry
## is deleteted via foreign key relation
## so we delete it explicit
Session.query(TokenRealm).filter(TokenRealm.token_id == self.LinOtpTokenId).delete()
Session.delete(self)
log.debug('delete token success')
return True
[docs] def isPinEncrypted(self, pin=None):
ret = False
if pin is None:
pin = self.LinOtpPinHash
if pin and pin.startswith("@@"):
ret = True
return ret
[docs] def getPin(self):
ret = -1
if self.isPinEncrypted() == True:
tokenPin = self.LinOtpPinHash[2:]
ret = decryptPin(tokenPin)
return ret
[docs] def setSoPin(self, soPin):
# TODO: we could log the PIN here
log.debug('setSoPin()')
iv = geturandom(16)
enc_soPin = encrypt(soPin, iv)
self.LinOtpTokenPinSO = unicode(binascii.hexlify(enc_soPin))
self.LinOtpTokenPinSOIV = unicode(binascii.hexlify(iv))
def __unicode__(self):
return self.LinOtpTokenDesc
[docs] def get(self, key=None, fallback=None, save=False):
'''
simulate the dict behaviour to make challenge processing
easier, as this will have to deal as well with
'dict only challenges'
:param key: the attribute name - in case of key is not provided, a dict
of all class attributes are returned
:param fallback: if the attribute is not found, the fallback is returned
:param save: in case of all attributes and save==True, the timestamp is
converted to a string representation
'''
if key == None:
return self.get_vars(save=save)
if hasattr(self, key):
kMethod = "get" + key.capitalize()
if hasattr(self, kMethod):
return getattr(self, kMethod)()
else:
return getattr(self, key) or ''
else:
return fallback
[docs] def get_vars(self, save=False):
log.debug('get_vars()')
ret = {}
ret['LinOtp.TokenId'] = self.LinOtpTokenId or ''
ret['LinOtp.TokenDesc'] = self.LinOtpTokenDesc or ''
ret['LinOtp.TokenSerialnumber'] = self.LinOtpTokenSerialnumber or ''
ret['LinOtp.TokenType'] = self.LinOtpTokenType or 'hmac'
ret['LinOtp.TokenInfo'] = self._fix_spaces(self.LinOtpTokenInfo or '')
# ret['LinOtpTokenPinUser'] = self.LinOtpTokenPinUser
# ret['LinOtpTokenPinSO'] = self.LinOtpTokenPinSO
ret['LinOtp.IdResolver'] = self.LinOtpIdResolver or ''
ret['LinOtp.IdResClass'] = self.LinOtpIdResClass or ''
ret['LinOtp.Userid'] = self.LinOtpUserid or ''
ret['LinOtp.OtpLen'] = self.LinOtpOtpLen or 6
# ret['LinOtp.PinHash'] = self.LinOtpPinHash
ret['LinOtp.MaxFail'] = self.LinOtpMaxFail
ret['LinOtp.Isactive'] = self.LinOtpIsactive
ret['LinOtp.FailCount'] = self.LinOtpFailCount
ret['LinOtp.Count'] = self.LinOtpCount
ret['LinOtp.CountWindow'] = self.LinOtpCountWindow
ret['LinOtp.SyncWindow'] = self.LinOtpSyncWindow
# list of Realm names
ret['LinOtp.RealmNames'] = self.getRealmNames()
return ret
__str__ = __unicode__
def __repr__(self):
'''
return the token state as text
:return: token state as string representation
:rtype: string
'''
ldict = {}
for attr in self.__dict__:
key = "%r" % attr
val = "%r" % getattr(self, attr)
ldict[key] = val
res = "<%r %r>" % (self.__class__, ldict)
return res
[docs] def getSyncWindow(self):
return self.LinOtpSyncWindow
[docs] def setCountWindow(self, counter):
self.LinOtpCountWindow = counter
[docs] def getCountWindow(self):
return self.LinOtpCountWindow
[docs] def getInfo(self):
# Fix for working with MS SQL servers
# MS SQL servers sometimes return a '<space>' when the column is empty: ''
return self._fix_spaces(self.LinOtpTokenInfo or '')
[docs] def setInfo(self, info):
self.LinOtpTokenInfo = info
def _setPin(self, pin, hashed=True):
log.debug("_setPin(%s)" % pin)
if pin is None or pin == "":
log.debug("Token pin was None")
else:
self.setPin(pin, hashed)
[docs] def storeToken(self):
if self.LinOtpUserid is None:
self.LinOtpUserid = u''
if self.LinOtpIdResClass is None:
self.LinOtpIdResClass = ''
if self.LinOtpIdResolver is None:
self.LinOtpIdResolver = ''
log.debug('storeToken()')
Session.add(self)
Session.flush()
Session.commit()
log.debug('store token success')
return True
[docs] def setType(self, typ):
self.LinOtpTokenType = typ
return
[docs] def getType(self):
return self.LinOtpTokenType or 'hmac'
[docs] def updateType(self, typ):
#in case the prevoius has been different type
# we must reset the counters
# But be aware, ray, this could also be upper and lower case mixing...
if self.LinOtpTokenType.lower() != typ.lower() :
self.LinOtpCount = 0
self.LinOtpFailCount = 0
self.LinOtpTokenType = typ
return
[docs] def updateOtpKey(self, otpKey):
#in case of a new hOtpKey we have to do some more things
if (otpKey is not None):
secretObj = self.getHOtpKey()
if secretObj.compare(otpKey) == False:
log.debug('update token OtpKey - counter reset')
self.setHKey(otpKey)
[docs] def updateToken(self, tokenDesc, otpKey, pin):
log.debug('updateToken()')
self.setDescription(tokenDesc)
self._setPin(pin)
self.updateOtpKey(otpKey)
[docs] def getRealms(self):
return self.realms or ''
[docs] def getRealmNames(self):
r_list = []
for r in self.realms:
r_list.append(r.name)
return r_list
[docs] def addRealm(self, realm):
if realm is not None:
self.realms.append(realm)
else:
log.error("adding empty realm!")
[docs] def setRealms(self, realms):
if realms is not None:
self.realms = realms
else:
log.error("assigning empty realm!")
[docs]def createToken(serial):
log.debug('createToken(%s)' % serial)
serial = u'' + serial
token = Token(serial)
log.debug('token object created')
return token
###############################################################################
config_table = sa.Table('Config', meta.metadata,
sa.Column('Key', sa.types.Unicode(255), primary_key=True, nullable=False),
sa.Column('Value', sa.types.Unicode(2000), default=u''),
sa.Column('Type', sa.types.Unicode(2000), default=u''),
sa.Column('Description', sa.types.Unicode(2000), default=u''),
implicit_returning=implicit_returning,
)
log.debug('config table append_column')
CONFIG_ENCODE = ["Key", "Value", "Description"]
[docs]class Config(object):
def __init__(self, Key, Value, Type=u'', Description=u''):
log.debug('__init__')
if (not Key.startswith("linotp.") and not Key.startswith("enclinotp.")):
Key = "linotp." + Key
self.Key = unicode(Key)
self.Value = unicode(Value)
self.Type = unicode(Type)
self.Description = unicode(Description)
log.debug('Config init')
def __unicode__(self):
return self.Description
def __setattr__(self, name, value):
"""
to support unicode on all backends, we use the json encoder with
the assci encode default
:param name: db column name or class memeber
:param value: the corresponding value
:return: - nothing -
"""
if name in CONFIG_ENCODE:
## encode data
if value:
value = linotp.lib.crypt.uencode(value)
super(Config, self).__setattr__(name, value)
def __getattribute__(self, name):
"""
to support unicode on all backends, we use the json decoder with
the assci decode default
:param name: db column name or class memeber
:return: the corresponding value
"""
#Default behaviour
value = object.__getattribute__(self, name)
if name in CONFIG_ENCODE:
if value:
value = linotp.lib.crypt.udecode(value)
else:
value = ""
return value
__str__ = __unicode__
# This table connect a token to several realms
tokenrealm_table = sa.Table('TokenRealm', meta.metadata,
sa.Column('id', sa.types.Integer(), sa.Sequence('tokenrealm_seq_id', optional=True), primary_key=True, nullable=False),
sa.Column('token_id', sa.types.Integer(), ForeignKey('Token.LinOtpTokenId')),
#sa.Column('realm_id', sa.types.Integer())
sa.Column('realm_id', sa.types.Integer(), ForeignKey('Realm.id')),
implicit_returning=implicit_returning,
)
[docs]class TokenRealm(object):
def __init__(self, realmid):
log.debug("setting realm_id to %i" % realmid)
self.realm_id = realmid
self.token_id = 0
realm_table = sa.Table('Realm', meta.metadata,
sa.Column('id', sa.types.Integer(), sa.Sequence('realm_seq_id', optional=True), primary_key=True, nullable=False),
sa.Column('name', sa.types.Unicode(255), default=u'', unique=True, nullable=False),
sa.Column('default', sa.types.Boolean(), default=False),
sa.Column('option', sa.types.Unicode(40), default=u''),
implicit_returning=implicit_returning,
)
REALM_ENCODE = ["name", "option"]
[docs]class Realm(object):
def __setattr__(self, name, value):
"""
to support unicode on all backends, we use the json encoder with
the assci encode default
:param name: db column name or class memeber
:param value: the corresponding value
:return: - nothing -
"""
if name in REALM_ENCODE:
## encode data
if value:
value = linotp.lib.crypt.uencode(value)
super(Realm, self).__setattr__(name, value)
def __getattribute__(self, name):
"""
to support unicode on all backends, we use the json decoder with
the assci decode default
:param name: db column name or class memeber
:return: the corresponding value
"""
#Default behaviour
value = object.__getattribute__(self, name)
if name in REALM_ENCODE:
if value:
value = linotp.lib.crypt.udecode(value)
else:
value = ""
return value
def __init__(self, realm):
log.debug("setting realm name to %s" % realm)
self.name = realm
if realm is not None:
self.name = realm.lower()
#self.id = 0
[docs] def storeRealm(self):
if self.name is None:
self.name = ''
self.name = self.name.lower()
log.debug('storeRealm()')
Session.add(self)
Session.commit()
log.debug('store realm success')
return True
''' ''' '''
ocra challenges are stored
''' ''' '''
log.debug('creating ocra table')
ocra_table = sa.Table('ocra', meta.metadata,
sa.Column('id', sa.types.Integer(), sa.Sequence('token_seq_id', optional=True), primary_key=True, nullable=False),
sa.Column('transid', sa.types.Unicode(20), unique=True,
nullable=False, index=True),
sa.Column('data', sa.types.Unicode(512), default=u''),
sa.Column('challenge', sa.types.Unicode(256), default=u''),
sa.Column(session_column, sa.types.Unicode(512), default=u''),
sa.Column('tokenserial', sa.types.Unicode(64), default=u''),
sa.Column(timestamp_column, sa.types.DateTime, default=datetime.now()),
sa.Column('received_count', sa.types.Integer(), default=0),
sa.Column('received_tan', sa.types.Boolean, default=False),
sa.Column('valid_tan', sa.types.Boolean, default=False),
implicit_returning=implicit_returning,
)
OCRA_ENCODE = ["data", "challenge", "tokenserial"]
[docs]class OcraChallenge(object):
'''
'''
def __init__(self, transId, challenge, tokenserial, data, session=u''):
log.debug('OcraChallenge: __init__ ')
self.transid = u'' + transId
self.challenge = u'' + challenge
self.tokenserial = u'' + tokenserial
self.data = u'' + data
self.timestamp = datetime.now()
self.session = u'' + session
self.received_count = 0
self.received_tan = False
self.valid_tan = False
log.debug('OcraChallenge: init done!')
def __setattr__(self, name, value):
"""
to support unicode on all backends, we use the json encoder with
the assci encode default
:param name: db column name or class memeber
:param value: the corresponding value
:return: - nothing -
"""
if name in OCRA_ENCODE:
## encode data
if value:
value = linotp.lib.crypt.uencode(value)
super(OcraChallenge, self).__setattr__(name, value)
def __getattribute__(self, name):
"""
to support unicode on all backends, we use the json decoder with
the assci decode default
:param name: db column name or class memeber
:return: the corresponding value
"""
#Default behaviour
value = object.__getattribute__(self, name)
if name in OCRA_ENCODE:
if value:
value = linotp.lib.crypt.udecode(value)
else:
value = ""
return value
[docs] def setData(self, data):
self.data = unicode(data)
[docs] def getData(self):
return self.data
[docs] def getSession(self):
return self.session
[docs] def setSession(self, session):
self.session = unicode(session)
[docs] def setChallenge(self, challenge):
self.challenge = unicode(challenge)
[docs] def setTanStatus(self, received=False, valid=False):
self.received_tan = received
self.received_count += 1
self.valid_tan = valid
[docs] def getTanStatus(self):
return (self.received_tan, self.valid_tan)
[docs] def getChallenge(self):
return self.challenge
[docs] def getTransactionId(self):
return self.transid
[docs] def save(self):
log.debug('save ocra challenge')
Session.add(self)
Session.commit()
log.debug('save ocra challenge : success')
return self.transid
def __unicode__(self):
descr = {}
descr['id'] = self.id
descr['transid'] = self.transid
descr['challenge'] = self.challenge
descr['tokenserial'] = self.tokenserial
descr['data'] = self.data
descr['timestamp'] = self.timestamp
descr['received_tan'] = self.received_tan
descr['valid_tan'] = self.valid_tan
return "%s" % unicode(descr)
__str__ = __unicode__
''' ''' '''
challenges are stored
''' ''' '''
log.debug('creating challenges table')
challenges_table = sa.Table('challenges', meta.metadata,
sa.Column('id', sa.types.Integer(),
sa.Sequence('token_seq_id', optional=True),
primary_key=True, nullable=False),
sa.Column('transid', sa.types.Unicode(64),
unique=True, nullable=False,
index=True),
sa.Column('data', sa.types.Unicode(512), default=u''),
sa.Column('challenge', sa.types.Unicode(512), default=u''),
sa.Column(session_column, sa.types.Unicode(512), default=u''),
sa.Column('tokenserial', sa.types.Unicode(64), default=u''),
sa.Column(timestamp_column, sa.types.DateTime,
default=datetime.now()),
sa.Column('received_count', sa.types.Integer(), default=0),
sa.Column('received_tan', sa.types.Boolean, default=False),
sa.Column('valid_tan', sa.types.Boolean, default=False),
implicit_returning=implicit_returning,
)
CHALLENGE_ENCODE = ["data", "challenge", 'tokenserial']
[docs]class Challenge(object):
'''
the generic challange handling
'''
def __init__(self, transid, tokenserial, challenge=u'', data=u'', session=u''):
log.debug('Challenge: __init__ ')
self.transid = u'' + transid
self.challenge = u'' + challenge
self.tokenserial = u'' + tokenserial
self.data = u'' + data
self.timestamp = datetime.now()
self.session = u'' + session
self.received_count = 0
self.received_tan = False
self.valid_tan = False
log.debug('Challenge: init done!')
def __setattr__(self, name, value):
"""
to support unicode on all backends, we use the json encoder with
the assci encode default
:param name: db column name or class memeber
:param value: the corresponding value
:return: - nothing -
"""
if name in CHALLENGE_ENCODE:
## encode data
if value:
value = linotp.lib.crypt.uencode(value)
super(Challenge, self).__setattr__(name, value)
def __getattribute__(self, name):
"""
to support unicode on all backends, we use the json decoder with
the assci decode default
:param name: db column name or class memeber
:return: the corresponding value
"""
#Default behaviour
value = object.__getattribute__(self, name)
if name in CHALLENGE_ENCODE:
if value:
value = linotp.lib.crypt.udecode(value)
else:
value = ""
return value
@classmethod
[docs] def createTransactionId(cls , length=20):
return get_rand_digit_str(length)
[docs] def setData(self, data):
if type(data) in [dict, list]:
self.data = json.dumps(data)
else:
self.data = unicode(data)
[docs] def getData(self):
data = {}
try:
data = json.loads(self.data)
except:
data = self.data
return data
[docs] def get(self, key=None, fallback=None, save=False):
'''
simulate the dict behaviour to make challenge processing
easier, as this will have to deal as well with
'dict only challenges'
:param key: the attribute name - in case of key is not provided, a dict
of all class attributes are returned
:param fallback: if the attribute is not found, the fallback is returned
:param save: in case of all attributes and save==True, the timestamp is
converted to a string representation
'''
if key == None:
return self.get_vars(save=save)
if hasattr(self, key):
kMethod = "get" + key.capitalize()
if hasattr(self, kMethod):
return getattr(self, kMethod)()
else:
return getattr(self, key)
else:
return fallback
[docs] def getId(self):
return self.id
[docs] def getSession(self):
return self.session
[docs] def setSession(self, session):
self.session = unicode(session)
[docs] def setChallenge(self, challenge):
self.challenge = unicode(challenge)
[docs] def setTanStatus(self, received=False, valid=False):
self.received_tan = received
self.received_count += 1
self.valid_tan = valid
[docs] def getTanStatus(self):
return (self.received_tan, self.valid_tan)
[docs] def getTanCount(self):
return self.received_count
[docs] def getChallenge(self):
return self.challenge
[docs] def getTransactionId(self):
return self.transid
[docs] def getTokenSerial(self):
return self.tokenserial
[docs] def save(self):
'''
enforce the saveing of a challenge
- will guarentee the uniqness of the transaction id
:return: transaction id of the stored challeng
'''
log.debug('[save] save challenge')
try:
Session.add(self)
Session.commit()
log.debug('save challenge : success')
except Exception as exce:
log.error('[save]Error during saving challenge: %r' % exce)
log.error("[save] %s" % traceback.format_exc())
return self.transid
[docs] def get_vars(self, save=False):
'''
return a dictionary of all vars in the challenge class
:return: dict of vars
'''
descr = {}
descr['id'] = self.id
descr['transid'] = self.transid
descr['challenge'] = self.challenge
descr['tokenserial'] = self.tokenserial
descr['data'] = self.getData()
if save is True:
descr['timestamp'] = "%s" % self.timestamp
else:
descr['timestamp'] = self.timestamp
descr['received_tan'] = self.received_tan
descr['valid_tan'] = self.valid_tan
return descr
def __unicode__(self):
descr = self.get_vars()
return "%s" % unicode(descr)
__str__ = __unicode__
log.debug('calling ORM Mapper')
# config_table.append_column( sa.Column('IV', sa.types.Unicode(2000), default=u''),)
# see: http://www.sqlalchemy.org/docs/orm/relationships.html#sqlalchemy.orm.relationship
# http://www.sqlalchemy.org/docs/05/reference/orm/mapping.html
# The realms of a token will be stored in the additional attribute "realms"
# and the token, to which the realms belong will be stored in the backed "token"
#orm.mapper(Token, token_table, properties={
# #'realms':relation(Realm, secondary=tokenrealm_table)
# 'realms':relation(TokenRealm, backref=backref('token'))
# })
orm.mapper(Token, token_table, properties={
'realms':relation(Realm, secondary=tokenrealm_table,
primaryjoin=token_table.c.LinOtpTokenId == tokenrealm_table.c.token_id,
secondaryjoin=tokenrealm_table.c.realm_id == realm_table.c.id)
})
orm.mapper(Realm, realm_table)
orm.mapper(TokenRealm, tokenrealm_table)
orm.mapper(Config, config_table)
## for oracle and the SQLAlchemy 0.7 we need a mapping of columns
## due to reserved keywords session and timestamp
mapping = {}
mapping['session'] = "%ssession" % COL_PREFIX
mapping['timestamp'] = "%stimestamp" % COL_PREFIX
## create challenges ORM mapping to the Challenge class
challenge_properties = {}
if len(COL_PREFIX) > 0:
for key, value in mapping.items():
challenge_properties[key] = challenges_table.columns[value]
orm.mapper(Challenge, challenges_table, properties=challenge_properties)
## create Ocra ORM mapping to the Ocra class
ocra_properties = {}
if len(COL_PREFIX) > 0:
for key, value in mapping.items():
ocra_properties[key] = ocra_table.columns[value]
orm.mapper(OcraChallenge, ocra_table, properties=ocra_properties)
##eof#########################################################################