# -*- coding: utf-8 -*-
#
# LinOTP - the open source solution for two factor authentication
# Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
# This file is part of LinOTP server.
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public
# License, version 3, as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the
# GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
# E-mail: linotp@lsexperts.de
# Contact: www.linotp.org
# Support: www.lsexperts.de
#
""" policy processing """
import logging
import linotp.lib.token
from pylons.i18n.translation import _
from pylons import request, config, tmpl_context as c
from linotp.lib.config import getLinotpConfig
from linotp.lib.config import removeFromConfig
from linotp.lib.config import storeConfig
from linotp.lib.realm import getDefaultRealm
from linotp.lib.realm import getRealms
from linotp.lib.user import getUserRealms
from linotp.lib.user import User, getUserFromParam, getUserFromRequest
from linotp.lib.user import getResolversOfUser
from linotp.lib.util import get_client
from linotp.lib.error import ServerError, LinotpError
from netaddr import IPAddress
from netaddr import IPNetwork
from configobj import ConfigObj
# for loading XML file
import re
# for generating random passwords
from linotp.lib.crypt import urandom
import string
from linotp.lib.util import getParam, uniquify
log = logging.getLogger(__name__)
optional = True
required = False
REG_POLICY_C = config.get("linotpPolicy.pin_c", "[a-zA-Z]")
REG_POLICY_N = config.get("linotpPolicy.pin_n", "[0-9]")
REG_POLICY_S = config.get("linotpPolicy.pin_s", "[.:,;-_<>+*!/()=?$§%&#~\^]")
# This dictionary maps the token_types to actions in the scope gettoken,
# that define the maximum allowed otp valies in case of getotp/getmultiotp
MAP_TYPE_GETOTP_ACTION = {"dpw": "max_count_dpw",
"hmac": "max_count_hotp",
"totp": "max_count_totp"}
[docs]class PolicyException(LinotpError):
def __init__(self, description="unspecified error!", id=410):
LinotpError.__init__(self, description=description, id=id)
[docs]class AuthorizeException(LinotpError):
def __init__(self, description="unspecified error!", id=510):
LinotpError.__init__(self, description=description, id=id)
[docs]def getPolicyDefinitions(scope=""):
'''
returns the policy definitions of
- allowed scopes
- allowed actions in scopes
- type of actions
'''
pol = {
'admin': {
'enable': {'type': 'bool'},
'disable': {'type': 'bool'},
'set': {'type': 'bool'},
'setOTPPIN': {'type': 'bool'},
'setMOTPPIN': {'type': 'bool'},
'setSCPIN': {'type': 'bool'},
'resync': {'type': 'bool'},
'reset': {'type': 'bool'},
'assign': {'type': 'bool'},
'unassign': {'type': 'bool'},
'import': {'type': 'bool'},
'remove': {'type': 'bool'},
'userlist': {'type': 'bool'},
'checkstatus': {'type': 'bool'},
'manageToken': {'type': 'bool'},
'getserial': {'type': 'bool'},
'copytokenpin': {'type': 'bool'},
'copytokenuser': {'type': 'bool'},
'losttoken': {'type': 'bool'},
'getotp': {
'type': 'bool',
'desc': 'allow the administrator to retrieve '
'OTP values for tokens.'
}
},
'gettoken': {
'max_count_dpw': {'type': 'int'},
'max_count_hotp': {'type': 'int'},
'max_count_totp': {'type': 'int'},
},
'selfservice': {
'assign': {
'type': 'bool',
'desc': "The user is allowed to assign an existing "
"token using the token serial number."},
'disable': {'type': 'bool'},
'enable': {'type': 'bool'},
'delete': {'type': 'bool'},
'unassign': {'type': 'bool'},
'resync': {'type': 'bool'},
'reset': {
'type': 'bool',
'desc': 'Allow to reset the failcounter of a token.'},
'setOTPPIN': {'type': 'bool'},
'setMOTPPIN': {'type': 'bool'},
'getotp': {'type': 'bool'},
'otp_pin_maxlength': {'type': 'int', 'value': range(0, 100)},
'otp_pin_minlength': {'type': 'int', 'value': range(0, 100)},
'otp_pin_contents': {'type': 'str'},
'activateQR': {'type': 'bool'},
'webprovisionOATH': {'type': 'bool'},
'webprovisionGOOGLE': {'type': 'bool'},
'webprovisionGOOGLEtime': {'type': 'bool'},
'max_count_dpw': {'type': 'int'},
'max_count_hotp': {'type': 'int'},
'max_count_totp': {'type': 'int'},
'history': {
'type': 'bool',
'desc': 'Allow the user to view his own token history'},
'getserial': {
'type': 'bool',
'desc': 'Allow to search an unassigned token by OTP value.'}
},
'system': {
'read': {'type': 'bool'},
'write': {'type': 'bool'},
},
'enrollment': {
'tokencount': {
'type': 'int',
'desc': 'Limit the number of tokens in a realm.'},
'maxtoken': {
'type': 'int',
'desc': 'Limit the number of tokens a user in the realm may '
'have assigned.'},
'otp_pin_random': {
'type': 'int',
'value': range(0, 100)},
'otp_pin_encrypt': {
'type': 'int',
'value': [0, 1]},
'tokenlabel': {
'type': 'str',
'desc': 'the label for the google authenticator.'},
'autoassignment': {
'type': 'int',
'value': [6, 8],
'desc': 'users can assign a token just by using the '
'unassigned token to authenticate.'},
'autoassignment': {
'type': 'int',
'value': [6, 8, 32, 48],
'desc' : 'users can assign a token just by using the unassigned token to authenticate.'},
'ignore_autoassignment_pin': {
'type': 'bool',
'desc' : "Do not set password from auto assignment as token pin."},
'lostTokenPWLen': {
'type': 'int',
'desc': 'The length of the password in case of '
'temporary token.'},
'lostTokenPWContents': {
'type': 'str',
'desc': 'The contents of the temporary password, '
'described by the characters C, c, n, s.'},
'lostTokenValid': {
'type': 'int',
'desc': 'The length of the validity for the temporary '
'token (in days).'},
},
'authentication': {
'smstext': {
'type': 'str',
'desc': 'The text that will be send via SMS for an SMS token. '
'Use <otp> and <serial> as parameters.'},
'otppin': {
'type': 'int',
'value': [0, 1, 2],
'desc': 'either use the Token PIN (0), use the Userstore '
'Password (1) or use no fixed password '
'component (2).'},
'autosms': {
'type': 'bool',
'desc': 'if set, a new SMS OTP will be sent after '
'successful authentication with one SMS OTP'},
'passthru': {
'type': 'bool',
'desc': 'If set, the user in this realm will be authenticated '
'against the UserIdResolver, if the user has no '
'tokens assigned.'
},
'passOnNoToken': {
'type': 'bool',
'desc': 'if the user has no token, the authentication request '
'for this user will always be true.'
},
'qrtanurl': {
'type': 'str',
'desc': 'The URL for the half automatic mode that should be '
'used in a QR Token'
},
'challenge_response': {
'type': 'str',
'desc': 'A list of tokentypes for which challenge response '
'should be used.'
}
},
'authorization': {
'authorize': {
'type': 'bool',
'desc': 'The user/realm will be authorized to login '
'to the clients IPs.'},
'tokentype': {
'type': 'str',
'desc': 'The user will only be authenticated with this '
'very tokentype.'},
'serial': {
'type': 'str',
'desc': 'The user will only be authenticated if the serial '
'number of the token matches this regexp.'},
'setrealm': {
'type': 'str',
'desc': 'The Realm of the user is set to this very realm. '
'This is important if the user is not contained in '
'the default realm and can not pass his realm.'},
'detail_on_success': {
'type': 'bool',
'desc': 'In case of successful authentication additional '
'detail information will be returned.'},
'detail_on_fail': {
'type': 'bool',
'desc': 'In case of failed authentication additional '
'detail information will be returned.'}
},
'audit': {
'view': {
'type': 'bool'}
},
'ocra': {
'request': {
'type': 'bool',
'desc': 'Allow to do a ocra/request'},
'status': {
'type': 'bool',
'desc': 'Allow to check the transaction status.'},
'activationcode': {
'type': 'bool',
'desc': 'Allow to do an ocra/getActivationCode.'},
'calcOTP': {
'type': 'bool',
'desc': 'Allow to do an ocra/calculateOtp.'}
}
}
## now add generic policies, which every token should provide:
## - init<TT>
## - enroll<TT>, but only, if there is a rendering section
token_type_list = linotp.lib.token.get_token_type_list()
for ttype in token_type_list:
pol['admin']["init%s" % ttype.upper()] = {'type': 'bool'}
# TODO: action=initETNG
# Cornelius Kölbel Apr 18 7: 31 PM
#
# Haben wir auch noch den die policy
#
# scope=admin, action=initETNG?
#
# Das ist nämlich eine spezialPolicy, die der HMAC-Token mitbringen
# muss.
# todo: if all tokens are dynamic, the token init must be only shown
# if there is a rendering section for:
# conf = linotp.lib.token.getTokenConfig(ttype, section='init')
# if len(conf) > 0:
# pol['admin']["init%s" % ttype.upper()]={'type': 'bool'}
conf = linotp.lib.token.getTokenConfig(ttype, section='selfservice')
if 'enroll' in conf:
pol['selfservice']["enroll%s" % ttype.upper()] = {
'type': 'bool',
'desc': "The user is allowed to enroll a %s token." % ttype}
## now merge the dynamic Token policy definition
## into the global definitions
policy = linotp.lib.token.getTokenConfig(ttype, section='policy')
## get all policy sections like: admin, selfservice . . '''
pol_keys = pol.keys()
for pol_section in policy.keys():
## if we have a dyn token definition of this section type
## add this to this section - and make sure, that it is
## then token type prefixed
if pol_section in pol_keys:
pol_entry = policy.get(pol_section)
for pol_def in pol_entry:
set_def = pol_def
if pol_def.startswith(ttype) is not True:
set_def = '%s_%s' % (ttype, pol_def)
pol[pol_section][set_def] = pol_entry.get(pol_def)
##return sub section, if scope is defined
## make sure that scope is in the policy key
## e.g. scope='_' is undefined and would break
if scope and scope in pol:
pol = pol[scope]
return pol
[docs]def setPolicy(param):
'''
Function to set a policy. It expects a dict of with the following keys:
* name
* action
* scope
* realm
* user
* time
* client
'''
ret = {}
name = param.get('name')
action = param.get('action')
scope = param.get('scope')
realm = param.get('realm')
user = param.get('user')
time = param.get('time')
client = param.get('client')
active = param.get('active', True)
ret["action"] = storeConfig("Policy.%s.action" % name,
action, "", "a policy definition")
ret["scope"] = storeConfig("Policy.%s.scope" % name,
scope, "", "a policy definition")
ret["realm"] = storeConfig("Policy.%s.realm" % name,
realm, "", "a policy definition")
ret["user"] = storeConfig("Policy.%s.user" % name,
user, "", "a policy definition")
ret["time"] = storeConfig("Policy.%s.time" % name,
time, "", "a policy definition")
ret["client"] = storeConfig("Policy.%s.client" % name,
client, "", "a policy definition")
ret["active"] = storeConfig("Policy.%s.active" % name,
active, "", "a policy definition")
return ret
[docs]def create_policy_export_file(policy, filename):
'''
This function takes a policy dictionary and creates an export file from it
'''
TMP_DIRECTORY = "/tmp"
filename = "%s/%s" % (TMP_DIRECTORY, filename)
if len(policy) == 0:
f = open(filename, "w")
f.write('')
f.close()
else:
for value in policy.values():
for k in value.keys():
value[k] = value[k] or ""
policy_file = ConfigObj(encoding="UTF-8")
policy_file.filename = filename
for name in policy.keys():
policy_file[name] = policy[name]
policy_file.write()
return filename
[docs]def getPolicy(param, display_inactive=False):
'''
Function to retrieve the list of policies.
attributes:
name: (optional) will only return the policy with the name
user: (optional) will only return the policies for this user
realm: (optional) will only return the policies of this realm
scope: (optional) will only return the policies within this scope
action: (optional) will only return the policies with this action
The action can also be something like "otppin" and will
return policies containing "otppin = 2"
returns:
a dictionary with the policies. The name of the policy being the key
'''
Policies = {}
#log.debug("[getPolicy] params %s" % str(param))
# First we load ALL policies from the Config
lConfig = getLinotpConfig()
for entry in lConfig:
if entry.startswith("linotp.Policy."):
#log.debug("[getPolicy] entry: %s" % entry )
policy = entry.split(".", 4)
if len(policy) == 4:
# check if we should return this named policy
insert_this = True
if param.get('name', None) is not None:
# If a named policy was requested, we do not want to add
# the policy if the name does not match!
insert_this = bool(param['name'].lower()
== policy[2].lower())
if insert_this:
name = policy[2]
key = policy[3]
value = lConfig.get(entry)
#log.debug("[getPolicy] found POL: %s, KEY: %s, VAL: %s"
# %(name, key, value))
if name in Policies:
if key == "realm":
if value is not None:
value = value.lower()
Policies[name][key] = value
else:
Policies[name] = {key: value}
#log.debug( Policies )
# Now we need to clean up policies, that are inactive
if not display_inactive:
pol2delete = []
for polname, policy in Policies.items():
pol_active = policy.get("active", "True")
if pol_active == "False":
pol2delete.append(polname)
for polname in pol2delete:
del Policies[polname]
# Now we need to clean up realms, that were not requested
pol2delete = []
if param.get('realm', None) is not None:
#log.debug("[getPolicy] cleanup acccording to realm %s"
# % param["realm"])
for polname, policy in Policies.items():
delete_it = True
#log.debug("[getPolicy] evaluating policy %s: %s"
# % (polname, str(policy)))
if policy.get("realm") is not None:
pol_realms = [p.strip()
for p in policy['realm'].lower().split(',')]
#log.debug("[getPolicy] realms in policy %s: %s"
# % (polname, str(pol_realms) ))
for r in pol_realms:
#log.debug("[getPolicy] Realm: %s" % r)
if r == param['realm'].lower() or r == '*':
#log.debug( "[getPolicy] Setting delete_it to false.
# Se we are using policy: %s" % str(polname))
delete_it = False
if delete_it:
pol2delete.append(polname)
for polname in pol2delete:
del Policies[polname]
pol2delete = []
if param.get('scope', None) is not None:
#log.debug("[getPolicy] cleanup acccording to scope %s"
# % param["scope"])
for polname, policy in Policies.items():
if policy['scope'].lower() != param['scope'].lower():
pol2delete.append(polname)
for polname in pol2delete:
del Policies[polname]
pol2delete = []
if param.get('action', None) is not None:
#log.debug("[getPolicy] cleanup acccording to action %s"
# % param["action"])
param_action = param['action'].strip().lower()
for polname, policy in Policies.items():
delete_it = True
#log.debug("[getPolicy] evaluating policy %s: %s"
# % (polname, str(policy)))
if policy.get("action") is not None:
pol_actions = [p.strip()
for p in policy.get('action', "").
lower().split(',')]
#log.debug("[getPolicy] actions in policy %s: %s "
# % (polname, str(pol_actions) ))
for policy_action in pol_actions:
if policy_action == '*' or policy_action == param_action:
# If any action (*) or the exact action we are looking
# for matches, then keep the policy
# e.g. otppin=1 matches when we search for 'otppin=1'
delete_it = False
elif policy_action.split('=')[0].strip() == param_action:
# If the first part of the action matches then keep the
# policy
# e.g. otppin=1 matches when we search for 'otppin'
delete_it = False
else:
# No match, delete_it = True
pass
if delete_it:
pol2delete.append(polname)
for polname in pol2delete:
del Policies[polname]
pol2delete = []
if param.get('user', None) is not None:
# log.debug("[getPolicy] cleanup acccording to user %s" % param["user"])
for polname, policy in Policies.items():
if policy.get('user'):
pol_users = [p.strip()
for p in policy.get('user').lower().split(',')]
# log.debug("[getPolicy] users in policy %s: %s"
# % (polname, str(pol_users) ))
else:
log.error("Empty userlist in policy '%s' not supported!" % polname)
raise Exception("Empty userlist in policy '%s' not supported!" % polname)
delete_it = True
for u in pol_users:
# log.debug("[getPolicy] User: %s" % u )
if u == param['user'].lower():
# log.debug("[getPolicy] setting delete_it to false."
# "We are using policy %s" % str(polname))
delete_it = False
if delete_it:
pol2delete.append(polname)
for polname in pol2delete:
del Policies[polname]
log.debug("[getPolicy] getting policies %s for "
"params %s" % (Policies, param))
return Policies
[docs]def deletePolicy(name):
'''
Function to delete one named policy
attributes:
name: (required) will only return the policy with the name
'''
res = {}
if not re.match('^[a-zA-Z0-9_]*$', name):
raise ServerError("policy name may only contain the "
"characters a-zA-Z0-9_", id=8888)
Config = getLinotpConfig()
delEntries = []
for entry in Config:
if entry.startswith("linotp.Policy.%s." % name):
delEntries.append(entry)
for entry in delEntries:
#delete this entry.
log.debug("[deletePolicy] removing key: %s" % entry)
ret = removeFromConfig(entry)
res[entry] = ret
return res
[docs]def getPolicyActionValue(policies, action, max=True, String=False):
'''
This function retrieves the int value of an action from a list of policies
input
policies: list of policies as returned from config.getPolicy
This is a list of dictionaries
action: an action, to be searched
max: if True, it will return the highest value, if there are
multiple policies
if False, it will return the lowest value, if there
are multiple policies
String: if True, the value is a string and not an integer
pol10: {
* action: "maxtoken = 10"
* scope: "enrollment"
* realm: "realm1"
* user: ""
* time: ""
}
'''
ret = -1
if String:
ret = ""
for _polname, pol in policies.items():
for a in [p.strip() for p in pol['action'].split(',')]:
log.debug("[getPolicyActionValue] Investigating %s (string=%s)"
% (a, unicode(String)))
split_action = [ca.strip() for ca in a.rsplit('=', 1)]
if len(split_action) > 1:
(name, value) = split_action
log.debug("[getPolicyActionValue] splitting <<%s>> <<%s>>"
% (name, unicode(value)))
if name == action:
if String:
ret = value
else:
if not String:
value = int(value)
if max:
if value > ret:
ret = value
else:
if value < ret or -1 == ret:
ret = value
return ret
[docs]def getAdminPolicies(action, lowerRealms=False):
"""
This internal function returns the admin policies (of scope=admin)
for the currently authenticated administrativ user.__builtins__
:param action: this is the action (like enable, disable, init...)
:param lowerRealms: if set to True, the list of realms returned will
be lower case.
:return: a dictionary with the following keys:
active (if policies are used)
realms (the realms, in which the admin is allowed to do this action)
resolvers (the resolvers in which the admin is allowed to perform
this action)
admin (the name of the authenticated admin user)
"""
active = True
# check if we got admin policies at all
p_at_all = getPolicy({'scope': 'admin'})
if len(p_at_all) == 0:
log.info("[getAdminPolicies] No policies in scope admin found."
" Admin authorization will be disabled.")
active = False
# We may change this later to other authetnication schemes
admin_user = getUserFromRequest(request)
log.info("[getAdminPolicies] Evaluating policies for the "
"user: %s" % admin_user['login'])
pol_request = {'user': admin_user['login'], 'scope': 'admin'}
if '' != action:
pol_request['action'] = action
policies = getPolicy(pol_request)
log.debug("[getAdminPolicies] Found the following "
"policies: %r" % policies)
# get all the realms from the policies:
realms = []
for _pol, val in policies.items():
## the val.get('realm') could return None
pol_realm = val.get('realm', '') or ''
pol_realm = pol_realm.split(',')
for r in pol_realm:
if lowerRealms:
realms.append(r.strip(" ").lower())
else:
realms.append(r.strip(" "))
log.debug("[getAdminPolicies] Found the following realms in the "
"policies: %r" % realms)
# get resolvers from realms
resolvers = []
all_realms = getRealms()
for realm, realm_conf in all_realms.items():
if realm in realms:
for r in realm_conf['useridresolver']:
resolvers.append(r.strip(" "))
log.debug("[getAdminPolicies] Found the following resolvers in the "
"policy: %r" % resolvers)
return {'active': active,
'realms': realms,
'resolvers': resolvers,
'admin': admin_user['login']}
[docs]def getAuthorization(scope, action):
"""
This internal function returns the Authrorizaition within some
the scope=system. for the currently authenticated
administrativ user. This does not take into account the REALMS!
arguments:
action - this is the action
scope = system
read
write
returns:
a dictionary with the following keys:
active (if policies are used)
admin (the name of the authenticated admin user)
auth (True if admin is authorized for this action)
"""
active = True
auth = False
p_at_all = getPolicy({'scope': scope})
if len(p_at_all) == 0:
log.info("[getAuthorization] No policies in scope %s found. Checking "
"of scope %s be disabled." % (scope, scope))
active = False
auth = True
# TODO: We may change this later to other authentication schemes
log.debug("[getAuthorization] now getting the admin user name")
admin_user = getUserFromRequest(request)
log.debug("[getAuthorization] Evaluating policies for the user: %s"
% admin_user['login'])
policies = getPolicy({'user': admin_user['login'],
'scope': scope,
'action': action})
log.debug("[getAuthorization] Found the following policies: "
"%r" % policies)
if len(policies.keys()) > 0:
auth = True
return {'active': active, 'auth': auth, 'admin': admin_user['login']}
[docs]def checkAdminAuthorization(policies, serial, user, fitAllRealms=False):
"""
This function checks if the token object defined by either "serial"
or "user" is in the corresponding realm, where the admin has access to /
fits to the given policy.
fitAllRealms: If set to True, then the administrator must have rights
in all realms of the token. e.g. for deleting tokens.
returns:
True: if admin is allowed
False: if admin is not allowed
"""
log.info("[checkAdminAuthorization] policies: %r" % policies)
# in case there are absolutely no policies
if not policies['active']:
return True
# If the policy is valid for all realms
if '*' in policies['realms']:
return True
# convert realms and resolvers to lowercase
policies['realms'] = [x.lower() for x in policies['realms']]
policies['resolvers'] = [x.lower() for x in policies['resolvers']]
# in case we got a serial
if serial != "" and serial is not None:
realms = linotp.lib.token.getTokenRealms(serial)
log.debug("[checkAdminAuthorization] the token %r is contained "
"in the realms: %r" % (serial, realms))
log.debug("[checkAdminAuthorization] the policy contains "
"the realms: %r" % policies['realms'])
for r in realms:
if fitAllRealms:
if r not in policies['realms']:
return False
else:
if r in policies['realms']:
return True
return fitAllRealms
# in case we got a user
if user.login != "":
# default realm user
if user.realm == "" and user.conf == "":
return getDefaultRealm() in policies['realms']
if not user.realm and not user.conf:
return getDefaultRealm() in policies['realms']
# we got a realm:
if user.realm != "":
return user.realm.lower() in policies['realms']
if user.conf != "":
return user.conf.lower() in policies['resolvers']
# catch all
return False
[docs]def getSelfserviceActions(user):
'''
This function returns the allowed actions in the self service portal
for the given user
'''
c.user = user.login
c.realm = user.realm
log.debug("[getSelfserviceActions] checking actions for scope=selfservice,"
" realm=%r" % c.realm)
client = get_client()
policies = get_client_policy(client, scope="selfservice", realm=c.realm,
user=c.user, userObj=user)
# Now we got a dictionary of all policies within the scope selfservice for
# this realm. as there can be more than one policy, we concatenate all
# their actions to a list later we might want to change this
all_actions = []
for pol in policies:
# remove whitespaces and split at the comma
action_list = policies[pol].\
get('action', '').\
replace(' ', '').split(',')
all_actions.extend(action_list)
for act in all_actions:
act.strip()
# return the list with all actions
return all_actions
[docs]def checkTokenNum(user=None, realm=None):
'''
This internal function checks if the number of the tokens is valid...
for a certain realm...
Therefor it checks the policy
"scope = enrollment", action = "tokencount = <number>"
'''
# If there is an empty user, we need to set it to None
if user:
if "" == user.login:
user = None
if user is None and realm is None:
# No user and realm given, so we check all the tokens
ret = True
tNum = linotp.lib.token.getTokenNumResolver()
log.debug("[checkTokenNum] Number of tokens in DB: %i" % int(tNum))
log.debug("[checkTokenNum] result of checking the token "
"number: %i" % ret)
return ret
else:
#allRealms = getRealms()
Realms = []
if user:
log.debug("[checkTokenNum] checking token num in realm: %s,"
" resolver: %s" % (user.realm, user.conf))
# 1. alle resolver aus dem Realm holen.
# 2. fuer jeden Resolver die tNum holen.
# 3. die Policy holen und gegen die tNum checken.
Realms = getUserRealms(user)
elif realm:
Realms = [realm]
log.debug("[checkTokenNum] checking token num in realm: %r" % Realms)
tokenInRealms = {}
for R in Realms:
tIR = linotp.lib.token.getTokenInRealm(R)
tokenInRealms[R] = tIR
log.debug("[checkTokenNum] There are %i tokens in realm %r"
% (tIR, R))
# Now we are checking the policy for every Realm! (if there are more)
policyFound = False
maxToken = 0
for R in Realms:
pol = getPolicy({'scope': 'enrollment', 'realm': R})
polTNum = getPolicyActionValue(pol, 'tokencount')
if polTNum > -1:
policyFound = True
if int(polTNum) > int(maxToken):
maxToken = int(polTNum)
log.info("[checkTokenNum] Realm: %r, max: %i, tokens in realm: "
" %i" % (R, int(maxToken), int(tokenInRealms[R])))
if int(maxToken) > int(tokenInRealms[R]):
return True
if policyFound is False:
log.debug("[checkTokenNum] there is no scope=enrollment, "
"action=tokencount policy for the realms %r" % Realms)
return True
log.info("[checkTokenNum] No policy available for realm %r, "
"where enough managable tokens were defined." % Realms)
return False
[docs]def checkTokenAssigned(user):
'''
This internal function checks the number of assigned tokens to a user
Therefor it checks the policy
"scope = enrollment", action = "maxtoken = <number>"
returns FALSE, if the user has to many tokens assigned
returns TRUE, if more tokens may be assigned to the user
'''
if user is None:
return True
if user.login == "":
return True
Realms = getUserRealms(user)
log.debug("[checkTokenAssigned] checking the already assigned tokens for"
" user %s, realms %s" % (user.login, Realms))
for R in Realms:
pol = get_client_policy(get_client(), scope='enrollment', realm=R,
user=user.login, userObj=user)
log.debug("[checkTokenAssigned] found policies %s" % pol)
if len(pol) == 0:
log.debug("[checkTokenAssigned] there is no scope=enrollment"
" policy for Realm %s" % R)
return True
maxTokenAssigned = getPolicyActionValue(pol, "maxtoken")
# get the tokens of the user
tokens = linotp.lib.token.getTokens4UserOrSerial(user, "")
# If there is a policy, where the tokennumber exceeds the tokens in
# the corresponding realm..
log.debug("[checkTokenAssigned] the user %r has %r tokens assigned. "
"The policy says a maximum of %r tokens."
% (user.login, len(tokens), maxTokenAssigned))
if (int(maxTokenAssigned) > int(len(tokens)) or
maxTokenAssigned == -1):
return True
return False
[docs]def get_tokenlabel(user="", realm="", serial=""):
'''
This internal function returns the naming of the token as defined in policy
scope = enrollment, action = tokenname = <string>
The string can have the following varaibles:
<u>: user
<r>: realm
<s>: token serial
This function is used by the creation of googleauthenticator url
'''
tokenlabel = ""
# TODO: What happens when we got no realms?
#pol = getPolicy( {'scope': 'enrollment', 'realm': realm} )
pol = get_client_policy(get_client(), scope="enrollment",
realm=realm, user=user)
if len(pol) == 0:
# No policy, so we use the serial number as label
log.debug("[get_tokenlabel] there is no scope=enrollment policy "
"for realm %r" % realm)
tokenlabel = serial
else:
string_label = getPolicyActionValue(pol, "tokenlabel", String=True)
if "" == string_label:
# empty label, so we use the serial
tokenlabel = serial
else:
string_label = re.sub('<u>', user, string_label)
string_label = re.sub('<r>', realm, string_label)
string_label = re.sub('<s>', serial, string_label)
tokenlabel = string_label
return tokenlabel
[docs]def get_autoassignment(user):
'''
this function checks the policy scope=enrollment, action=autoassignment
This is a boolean policy.
The function returns true, if autoassignment is defined.
'''
ret = False
otplen = 6
pol = get_client_policy(get_client(), scope='enrollment',
realm=user.realm, user=user.login, userObj=user)
if len(pol) > 0:
otplen = getPolicyActionValue(pol, "autoassignment")
log.debug("[get_autoassigmnet] got the otplen = %s" % str(otplen))
if type(otplen) == int and otplen > 0:
ret = True
return ret, otplen
[docs]def ignore_autoassignment_pin(user):
'''
This function checks the policy
scope=enrollment, action=ignore_autoassignment_pin
This is a boolean policy.
The function returns true, if the password used in the autoassignment
should not be set as token pin.
'''
ret = False
pol = get_client_policy(get_client(), scope='enrollment',
action="ignore_autoassignment_pin",
realm=user.realm, user=user.login, userObj=user)
if len(pol) > 0:
ret = True
return ret
[docs]def getRandomOTPPINLength(user):
'''
This internal function returns the length of the random otp pin that is
define in policy scope = enrollment, action = otp_pin_random = 111
'''
Realms = getUserRealms(user)
maxOTPPINLength = -1
for R in Realms:
pol = get_client_policy(get_client(),
scope='enrollment', action='otp_pin_random',
realm=R, user=user.login, userObj=user)
if len(pol) == 0:
log.debug("[getRandomOTPPINLength] there is no scope=enrollment "
"policy for Realm %r" % R)
return -1
OTPPINLength = getPolicyActionValue(pol, "otp_pin_random")
# If there is a policy, with a higher random pin length
log.debug("[getRandomOTPPINLength] found policy with "
"otp_pin_random = %r" % OTPPINLength)
if (int(OTPPINLength) > int(maxOTPPINLength)):
maxOTPPINLength = OTPPINLength
return maxOTPPINLength
[docs]def getOTPPINEncrypt(serial=None, user=None):
'''
This function returns, if the otppin should be stored as
an encrpyted value
'''
# do store as hashed value
encrypt_pin = 0
Realms = []
if serial:
Realms = linotp.lib.token.getTokenRealms(serial)
elif user:
Realms = getUserRealms(user)
log.debug("[getOTPPINEncrypt] checking realms: %r" % Realms)
for R in Realms:
pol = getPolicy({'scope': 'enrollment', 'realm': R})
log.debug("[getOTPPINEncrypt] realm: %r, pol: %r" % (R, pol))
if 1 == getPolicyActionValue(pol, 'otp_pin_encrypt'):
encrypt_pin = 1
return encrypt_pin
[docs]def getOTPPINPolicies(user, scope="selfservice"):
'''
This internal function returns the PIN policies for a realm.
These policies can either be in the scope "selfservice" or "admin"
The policy define when resettng an OTP PIN:
- what should be the length of the otp pin
- what should be the contents of the otp pin
by the actions:
otp_pin_minlength =
otp_pin_maxlength =
otp_pin_contents = [cns] (character, number, special character)
:return: dictionary like {contents: "cns", min: 7, max: 10}
'''
log.debug("[getOTPPINPolicies]")
Realms = getUserRealms(user)
ret = {'min':-1, 'max':-1, 'contents': ""}
log.debug("[getOTPPINPolicies] searching for OTP PIN policies in "
"scope=%r policies." % scope)
for R in Realms:
pol = get_client_policy(get_client(), scope=scope, realm=R,
user=user.login, userObj=user)
if len(pol) == 0:
log.debug("[getOTPPINPolicies] there is no "
"scope=%r policy for Realm %r" % (scope, R))
return ret
n_max = getPolicyActionValue(pol, "otp_pin_maxlength")
n_min = getPolicyActionValue(pol, "otp_pin_minlength", max=False)
n_contents = getPolicyActionValue(pol, "otp_pin_contents", String=True)
# find the maximum length
log.debug("[getOTPPINPolicies] find the maximum length for OTP PINs.")
if (int(n_max) > ret['max']):
ret['max'] = n_max
# find the minimum length
log.debug("[getOTPPINPolicies] find the minimum length for OTP_PINs")
if (not n_min == -1):
if (ret['min'] == -1):
ret['min'] = n_min
elif (n_min < ret['min']):
ret['min'] = n_min
# find all contents
log.debug("[getOTPPINPolicies] find the allowed contents for OTP PINs")
for k in n_contents:
if k not in ret['contents']:
ret['contents'] += k
return ret
[docs]def checkOTPPINPolicy(pin, user):
'''
This function checks the given PIN (OTP PIN) against the policy
returned by the function
getOTPPINPolicy
It returns a dictionary:
{'success': True/False,
'error': errortext}
At the moment this works for the selfservice portal
'''
log.debug("[checkOTPPINPolicy]")
pol = getOTPPINPolicies(user)
log.debug("[checkOTPPINPolicy] checking for otp_pin_minlength")
if pol['min'] != -1:
if pol['min'] > len(pin):
return {'success': False,
'error': _('The provided PIN is too short. It should be '
'at least %i characters.') % pol['min']}
log.debug("[checkOTPPINPolicy] checking for otp_pin_maxlength")
if pol['max'] != -1:
if pol['max'] < len(pin):
return {'success': False,
'error': (_('The provided PIN is too long. It should not '
'be longer than %i characters.') % pol['max'])}
log.debug("[checkOTPPINPolicy] checking for otp_pin_contents")
if pol['contents']:
policy_c = "c" in pol['contents']
policy_n = "n" in pol['contents']
policy_s = "s" in pol['contents']
policy_o = "o" in pol['contents']
contains_c = False
contains_n = False
contains_s = False
contains_other = False
for c in pin:
if re.search(REG_POLICY_C, c):
contains_c = True
elif re.search(REG_POLICY_N, c):
contains_n = True
elif re.search(REG_POLICY_S, c):
contains_s = True
else:
contains_other = True
if "+" == pol['contents'][0]:
log.debug("[checkOTPPINPolicy] checking for an additive character "
"group: %s" % pol['contents'])
if ((not (
(policy_c and contains_c) or
(policy_s and contains_s) or
(policy_o and contains_other) or
(policy_n and contains_n)
)
) or (
(not policy_c and contains_c) or
(not policy_s and contains_s) or
(not policy_n and contains_n) or
(not policy_o and contains_other))):
return {'success': False,
'error': _("The provided PIN does not contain characters"
" of the group or it does contains "
"characters that are not in the group %s")
% pol['contents']}
else:
log.debug("[checkOTPPINPolicy] normal check: %s" % pol['contents'])
if (policy_c and not contains_c):
return {'success': False,
'error': _('The provided PIN does not contain any '
'letters. Check policy otp_pin_contents.')}
if (policy_n and not contains_n):
return {'success': False,
'error': _('The provided PIN does not contain any '
'numbers. Check policy otp_pin_contents.')}
if (policy_s and not contains_s):
return {'success': False,
'error': _('The provided PIN does not contain any '
'special characters. It should contain '
'some of these characters like '
'.: ,;-_<>+*~!/()=?$. Check policy '
'otp_pin_contents.')}
if (policy_o and not contains_other):
return {'success': False,
'error': _('The provided PIN does not contain any '
'other characters. It should contain some of'
' these characters that are not contained '
'in letters, digits and the defined special '
'characters. Check policy otp_pin_contents.')}
# Additionally: in case of -cn the PIN must not contain "s" or "o"
if '-' == pol['contents'][0]:
if (not policy_c and contains_c):
return {'success': False,
'error': _("The PIN contains letters, although it "
"should not! (%s)") % pol['contents']}
if (not policy_n and contains_n):
return {'success': False,
'error': _("The PIN contains digits, although it "
"should not! (%s)") % pol['contents']}
if (not policy_s and contains_s):
return {'success': False,
'error': _("The PIN contains special characters, "
"although it should not! "
"(%s)") % pol['contents']}
if (not policy_o and contains_other):
return {'success': False,
'error': _("The PIN contains other characters, "
"although it should not! "
"(%s)") % pol['contents']}
return {'success': True,
'error': ''}
[docs]def getRandomPin(randomPINLength):
newpin = ""
log.debug("[getRandomPin] creating a random otp pin of "
"length %r" % randomPINLength)
chars = string.letters + string.digits
for _i in range(randomPINLength):
newpin = newpin + urandom.choice(chars)
return newpin
##### Pre and Post checks
[docs]def checkPolicyPre(controller, method, param={}, authUser=None, user=None):
'''
This function will check for all policy definition for a certain
controller/method It is run directly before doing the action in the
controller. I will raise an exception, if it fails.
:param param: This is a dictionary with the necessary parameters.
:return: dictionary with the necessary results. These depend on
the controller.
'''
ret = {}
log.debug("[checkPolicyPre] entering controller %s" % controller)
log.debug("[checkPolicyPre] entering method %s" % method)
if 'admin' == controller:
serial = getParam(param, "serial", optional)
if user is None:
user = getUserFromParam(param, optional)
realm = getParam(param, "realm", optional)
if realm is None or len(realm) == 0:
realm = getDefaultRealm()
if 'show' == method:
log.debug("[checkPolicyPre] entering method %s" % method)
# get the realms for this administrator
policies = getAdminPolicies('')
log.debug("[checkPolicyPre] The admin >%s< may manage the "
"following realms: %s" % (policies['admin'],
policies['realms']))
if policies['active'] and 0 == len(policies['realms']):
log.error("[checkPolicyPre] The admin >%s< has no rights in "
"any realms!" % policies['admin'])
raise PolicyException(_("You do not have any rights in any "
"realm! Check the policies."))
return {'realms': policies['realms'], 'admin': policies['admin']}
elif 'remove' == method:
policies = getAdminPolicies("remove")
# FIXME: A token that belongs to multiple realms should not be
# deleted. Should it? If an admin has the right on this
# token, he might be allowed to delete it,
# even if the token is in other realms.
# We could use fitAllRealms=True
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[remove] the admin >%s< is not allowed to remove "
"token %s for user %s@%s"
% (policies['admin'], serial,
user.login, user.realm))
raise PolicyException(_("You do not have the administrative "
"right to remove token %s. Check the "
"policies.") % serial)
elif 'enable' == method:
policies = getAdminPolicies("enable")
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[enable] the admin >%s< is not allowed to enable "
"token %s for user %s@%s"
% (policies['admin'], serial,
user.login, user.realm))
raise PolicyException(_("You do not have the administrative "
"right to enable token %s. Check the "
"policies.") % serial)
if not checkTokenNum():
log.error("[enable] The maximum token number "
"is reached!")
raise PolicyException(_("You may not enable any more tokens. "
"Your maximum token number is "
"reached!"))
# We need to check which realm the token will be in.
realmList = linotp.lib.token.getTokenRealms(serial)
for r in realmList:
if not checkTokenNum(realm=r):
log.warning("[enable] the maximum tokens for the realm "
"%s is exceeded." % r)
raise PolicyException(_("You may not enable any more tokens "
"in realm %s. Check the policy "
"'tokencount'") % r)
elif 'disable' == method:
policies = getAdminPolicies("disable")
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[disable] the admin >%s< is not allowed to "
"disable token %s for user %s@%s"
% (policies['admin'], serial,
user.login, user.realm))
raise PolicyException(_("You do not have the administrative "
"right to disable token %s. Check the "
"policies.") % serial)
elif 'copytokenpin' == method:
policies = getAdminPolicies("copytokenpin")
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[copytokenpin] the admin >%s< is not allowed to "
"copy token pin of token %s for user %s@%s"
% (policies['admin'], serial,
user.login, user.realm))
raise PolicyException(_("You do not have the administrative "
"right to copy pin of token %s. Check "
"the policies.") % serial)
elif 'copytokenuser' == method:
policies = getAdminPolicies("copytokenuser")
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[copytokenuser] the admin >%s< is not allowed to "
"copy token user of token %s for user %s@%s"
% (policies['admin'], serial,
user.login, user.realm))
raise PolicyException(_("You do not have the administrative "
"right to copy user of token %s. Check "
"the policies.") % serial)
elif 'losttoken' == method:
policies = getAdminPolicies("losttoken")
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[losttoken] the admin >%s< is not allowed to run "
"the losttoken workflow for token %s for "
"user %s@%s" % (policies['admin'], serial,
user.login, user.realm))
raise PolicyException(_("You do not have the administrative "
"right to run the losttoken workflow "
"for token %s. Check the "
"policies.") % serial)
elif 'getotp' == method:
policies = getAdminPolicies("getotp")
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[getotp] the admin >%s< is not allowed to run "
"the getotp workflow for token %s for user %s@%s"
% (policies['admin'], serial, user.login,
user.realm))
raise PolicyException(_("You do not have the administrative "
"right to run the getotp workflow for "
"token %s. Check the policies.") % serial)
elif 'getserial' == method:
policies = getAdminPolicies("getserial")
# check if we want to search the token in certain realms
if realm is not None:
dummy_user = User('dummy', realm, None)
else:
dummy_user = User('', '', '')
# We need to allow this, as no realm was passed at all.
policies['realms'] = '*'
if (policies['active'] and not
checkAdminAuthorization(policies, None, dummy_user)):
log.warning("[getserial] the admin >%s< is not allowed to get "
"serials for user %s@%s"
% (policies['admin'], user.login, user.realm))
raise PolicyException(_("You do not have the administrative "
"right to get serials by OTPs in "
"this realm!"))
elif 'init' == method:
ttype = getParam(param, "type", optional)
# possible actions are:
# initSPASS, initHMAC, initETNG, initSMS, initMOTP
policies = {}
# default: we got HMAC / ETNG
log.debug("[checkPolicyPre] checking init action")
if ((not ttype) or
(ttype and (ttype.lower() == "hmac"))):
p1 = getAdminPolicies("initHMAC")
p2 = getAdminPolicies("initETNG")
policies = {'active': p1['active'],
'admin': p1['admin'],
'realms': p1['realms'] + p2['realms'],
'resolvers': p1['resolvers'] + p2['resolvers']}
else:
# See if there is a policy like initSPASS or ....
token_type_list = linotp.lib.token.get_token_type_list()
token_type_found = False
for tt in token_type_list:
if tt.lower() == ttype.lower():
policies = getAdminPolicies("init%s" % tt.upper())
token_type_found = True
break
if not token_type_found:
policies = {}
log.error("[checkPolicyPre] Unknown token type:"
" %s" % ttype)
raise Exception(_("The tokentype '%s' could not be "
"found.") % ttype)
"""
We need to assure, that an admin does not enroll a token into a
realm were he has no ACCESS! : -(
The admin may not enroll a token with a serial, that is already
assigned to a user outside of his realm
"""
# if a user is given, we need to check the realm of this user
log.debug("[checkPolicyPre] checking realm of the user")
if (policies['active'] and
(user.login != "" and not
checkAdminAuthorization(policies, "", user))):
log.warning("[init] the admin >%s< is not allowed to enroll "
"token %s of type %s to user %s@%s"
% (policies['admin'], serial, ttype,
user.login, user.realm))
raise PolicyException(_("You do not have the administrative "
"right to init token %s of type %s to "
"user %s@%s. Check the policies.")
% (serial, ttype, user.login,
user.realm))
# no right to enroll token in any realm
log.debug("[checkPolicyPre] checking enroll token at all")
if policies['active'] and len(policies['realms']) == 0:
log.warning("[init] the admin >%s< is not allowed to enroll "
"a token at all."
% (policies['admin']))
raise PolicyException(_("You do not have the administrative "
"right to enroll tokens. Check the "
"policies."))
# the token is assigned to a user, not in the realm of the admin!
# we only need to check this, if the token already exists. If
# this is a new token, we do not need to check this.
log.debug("[checkPolicyPre] checking for token existens")
if policies['active'] and linotp.lib.token.tokenExist(serial):
if not checkAdminAuthorization(policies, serial, ""):
log.warning("[init] the admin >%s< is not allowed to "
"enroll token %s of type %s."
% (policies['admin'], serial, ttype))
raise PolicyException(_("You do not have the administrative "
"right to init token %s of type %s.")
% (serial, ttype))
# Here we check, if the tokennum exceeded
log.debug("[checkPolicyPre] checking number of tokens")
if not checkTokenNum():
log.error("[init] The maximum token number "
"is reached!")
raise PolicyException(_("You may not enroll any more tokens. "
"Your maximum token number "
"is reached!"))
# if a policy restricts the tokennumber for a realm
log.debug("[checkPolicyPre] checking tokens in realms "
"%s" % policies['realms'])
for R in policies['realms']:
if not checkTokenNum(realm=R):
log.warning("[init] the admin >%s< is not allowed to "
"enroll any more tokens for the realm %s"
% (policies['admin'], R))
raise PolicyException(_("The maximum allowed number of "
"tokens for the realm %s was "
"reached. You can not init any more "
"tokens. Check the policies "
"scope=enrollment, "
"action=tokencount.") % R)
log.debug("[checkPolicyPre] checking tokens in realm for "
"user %s" % user)
if not checkTokenNum(user=user):
log.warning("[init] the admin >%s< is not allowed to enroll "
"any more tokens for the realm %s"
% (policies['admin'], user.realm))
raise PolicyException(_("The maximum allowed number of tokens "
"for the realm %s was reached. You can "
"not init any more tokens. Check the "
"policies scope=enrollment, "
"action=tokencount.") % user.realm)
log.debug("[checkPolicyPre] checking tokens of user")
# if a policy restricts the tokennumber for the user in a realm
if not checkTokenAssigned(user):
log.warning("[init] the maximum number of allowed tokens per "
"user is exceeded. Check the policies")
raise PolicyException(_("The maximum number of allowed tokens "
"per user is exceeded. Check the "
"policies scope=enrollment, "
"action=maxtoken"))
# ==== End of policy check 'init' ======
ret['realms'] = policies['realms']
elif 'unassign' == method:
policies = getAdminPolicies("unassign")
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[unassign] the admin >%s< is not allowed to "
"unassign token %s for user %s@%s"
% (policies['admin'], serial, user.login,
user.realm))
raise PolicyException(_("You do not have the administrative "
"right to unassign token %s. Check the "
"policies.") % serial)
elif 'assign' == method:
policies = getAdminPolicies("assign")
# the token is assigned to a user, not in the realm of the admin!
if (policies['active'] and not
checkAdminAuthorization(policies, serial, "")):
log.warning("[assign] the admin >%s< is not allowed to assign "
"token %s. " % (policies['admin'], serial))
raise PolicyException(_("You do not have the administrative "
"right to assign token %s. "
"Check the policies.") % (serial))
# The user, the token should be assigned to,
# is not in the admins realm
if (policies['active'] and not
checkAdminAuthorization(policies, "", user)):
log.warning("[assign] the admin >%s< is not allowed to assign "
"token %s for user %s@%s" % (policies['admin'],
serial, user.login,
user.realm))
raise PolicyException(_("You do not have the administrative "
"right to assign token %s. Check the "
"policies.") % serial)
# if a policy restricts the tokennumber for the realm/user
if not checkTokenNum(user):
log.warning("[init] the admin >%s< is not allowed to assign "
"any more tokens for the realm %s(%s)"
% (policies['admin'], user.realm, user.conf))
raise PolicyException(_("The maximum allowed number of tokens "
"for the realm %s (%s) was reached. You "
"can not assign any more tokens. Check "
"the policies.")
% (user.realm, user.conf))
# check the number of assigned tokens
if not checkTokenAssigned(user):
log.warning("[assign] the maximum number of allowed tokens "
"is exceeded. Check the policies")
raise PolicyException(_("the maximum number of allowed tokens "
"is exceeded. Check the policies"))
elif 'setPin' == method:
if "userpin" in param:
getParam(param, "userpin", required)
# check admin authorization
policies1 = getAdminPolicies("setSCPIN")
policies2 = getAdminPolicies("setMOTPPIN")
if ((policies1['active'] and not
(checkAdminAuthorization(policies1, serial,
User("", "", ""))))
or (policies2['active'] and not
(checkAdminAuthorization(policies2, serial,
User("", "", ""))))):
log.warning("[setPin] the admin >%s< is not allowed to "
"set MOTP PIN/SC UserPIN for token %s."
% (policies['admin'], serial))
raise PolicyException(_("You do not have the administrative "
"right to set MOTP PIN/ SC UserPIN "
"for token %s. Check the policies.")
% serial)
if "sopin" in param:
getParam(param, "sopin", required)
# check admin authorization
policies = getAdminPolicies("setSCPIN")
if (policies['active'] and not
checkAdminAuthorization(policies, serial,
User("", "", ""))):
log.warning("[setPin] the admin >%s< is not allowed to "
"setPIN for token %s."
% (policies['admin'], serial))
raise PolicyException(_("You do not have the administrative "
"right to set Smartcard PIN for "
"token %s. Check the policies.")
% serial)
elif 'set' == method:
if "pin" in param:
policies = getAdminPolicies("setOTPPIN")
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[set] the admin >%s< is not allowed to set "
"OTP PIN for token %s for user %s@%s"
% (policies['admin'], serial, user.login,
user.realm))
raise PolicyException(_("You do not have the administrative "
"right to set OTP PIN for token %s. "
"Check the policies.") % serial)
if ("MaxFailCount".lower() in param or
"SyncWindow".lower() in param or
"CounterWindow".lower() in param or
"OtpLen".lower() in param):
policies = getAdminPolicies("set")
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[set] the admin >%s< is not allowed to set "
"token properites for %s for user %s@%s"
% (policies['admin'], serial,
user.login, user.realm))
raise PolicyException(_("You do not have the administrative "
"right to set token properties for "
"%s. Check the policies.") % serial)
elif 'resync' == method:
policies = getAdminPolicies("resync")
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[resync] the admin >%s< is not allowed to resync "
"token %s for user %s@%s"
% (policies['admin'], serial,
user.login, user.realm))
raise PolicyException(_("You do not have the administrative "
"right to resync token %s. Check the "
"policies.") % serial)
elif 'userlist' == method:
policies = getAdminPolicies("userlist")
# check if the admin may view the users in this realm
if (policies['active'] and
not checkAdminAuthorization(policies, "", user)):
log.warning("[userlist] the admin >%s< is not allowed to list"
" users in realm %s(%s)!"
% (policies['admin'], user.realm, user.conf))
raise PolicyException(_("You do not have the administrative"
" right to list users in realm %s(%s).")
% (user.realm, user.conf))
elif 'checkstatus' == method:
policies = getAdminPolicies("checkstatus")
# check if the admin may view the users in this realm
if (policies['active'] and not
checkAdminAuthorization(policies, "", user)):
log.warning("[checkstatus] the admin >%s< is not allowed to "
"show status of token challenges in realm %s(%s)!"
% (policies['admin'], user.realm, user.conf))
raise PolicyException(_("You do not have the administrative "
"right to show status of token "
"challenges in realm "
"%s(%s).") % (user.realm, user.conf))
elif 'tokenrealm' == method:
log.debug("[checkPolicyPre] entering method %s" % method)
# The admin needs to have the right "manageToken" for all realms,
# the token is currently in and all realm the Token should go into.
policies = getAdminPolicies("manageToken")
realms = getParam(param, "realms", required)
# List of the new realms
realmNewList = realms.split(',')
# List of existing realms
realmExistList = linotp.lib.token.getTokenRealms(serial)
for r in realmExistList:
if (policies['active'] and not
checkAdminAuthorization(policies, None,
User("dummy", r, None))):
log.warning("[tokenrealm] the admin >%s< is not allowed "
"to manage tokens in realm %s"
% (policies['admin'], r))
raise PolicyException(_("You do not have the administrative "
"right to remove tokens from realm "
"%s. Check the policies.") % r)
for r in realmNewList:
if (policies['active'] and not
checkAdminAuthorization(policies, None,
User("dummy", r, None))):
log.warning("[tokenrealm] the admin >%s< is not allowed "
"to manage tokens in realm %s"
% (policies['admin'], r))
raise PolicyException(_("You do not have the administrative "
"right to add tokens to realm %s. "
"Check the policies.") % r)
if not checkTokenNum(realm=r):
log.warning("[tokenrealm] the maximum tokens for the "
"realm %s is exceeded." % r)
raise PolicyException(_("You may not put any more tokens in "
"realm %s. Check the policy "
"'tokencount'") % r)
elif 'reset' == method:
policies = getAdminPolicies("reset")
if (policies['active'] and not
checkAdminAuthorization(policies, serial, user)):
log.warning("[reset] the admin >%s< is not allowed to reset "
"token %s for user %s@%s" % (policies['admin'],
serial, user.login,
user.realm))
raise PolicyException(_("You do not have the administrative "
"right to reset token %s. Check the "
"policies.") % serial)
elif 'import' == method:
policies = getAdminPolicies("import")
# no right to import token in any realm
log.debug("[checkPolicyPre] checking import token at all")
if policies['active'] and len(policies['realms']) == 0:
log.warning("[import] the admin >%s< is not allowed "
"to import a token at all."
% (policies['admin']))
raise PolicyException(_("You do not have the administrative "
"right to import tokens. Check the "
"policies."))
ret['realms'] = policies['realms']
elif 'loadtokens' == method:
tokenrealm = param.get('tokenrealm')
policies = getAdminPolicies("import")
if policies['active'] and tokenrealm not in policies['realms']:
log.warning("[loadtokens] the admin >%s< is not allowed to "
"import token files to realm %s: %s"
% (policies['admin'], tokenrealm, policies))
raise PolicyException(_("You do not have the administrative "
"right to import token files to realm %s"
". Check the policies.") % tokenrealm)
if not checkTokenNum(realm=tokenrealm):
log.warning("[loadtokens] the maximum tokens for the realm "
"%s is exceeded." % tokenrealm)
raise PolicyException(_("The maximum number of allowed tokens "
"in realm %s is exceeded. Check policy "
"tokencount!") % tokenrealm)
else:
# unknown method
log.error("[checkPolicyPre] an unknown method "
"<<%s>> was passed." % method)
raise PolicyException(_("Failed to run checkPolicyPre. "
"Unknown method: %s") % method)
elif 'gettoken' == controller:
if 'max_count' == method[0: len('max_count')]:
ret = 0
serial = getParam(param, "serial", optional)
ttype = linotp.lib.token.getTokenType(serial).lower()
trealms = linotp.lib.token.getTokenRealms(serial)
pol_action = MAP_TYPE_GETOTP_ACTION.get(ttype, "")
admin_user = getUserFromRequest(request)
if pol_action == "":
raise PolicyException( _("There is no policy gettoken/"
"max_count definable for the "
"tokentype %r") % ttype)
policies = {}
for realm in trealms:
pol = getPolicy({'scope': 'gettoken', 'realm': realm,
'user': admin_user['login']})
log.error("[checkPolicyPre][gettoken] got a policy: "
" %r" % policies)
policies.update(pol)
value = getPolicyActionValue(policies, pol_action)
log.debug("[checkPolicyPre][gettoken] got all "
"policies: %r: %r" % (policies, value))
ret = value
elif 'audit' == controller:
if 'view' == method:
auth = getAuthorization("audit", "view")
if auth['active'] and not auth['auth']:
log.warning("[audit view] the admin >%r< is not allowed to "
"view the audit trail" % auth['admin'])
ret = _("You do not have the administrative right to view the "
"audit trail. You are missing a policy "
"scope=audit, action=view")
raise PolicyException(ret)
else:
log.error("[checkPolicyPre] an unknown method was passed in :"
" %s" % method)
raise PolicyException(_("Failed to run checkPolicyPre. Unknown "
"method: %s") % method)
elif 'manage' == controller:
log.debug("[checkPolicyPre] entering controller %s" % controller)
elif 'selfservice' == controller:
log.debug("[checkPolicyPre] entering controller %s" % controller)
if 'max_count' == method[0: len('max_count')]:
ret = 0
serial = getParam(param, "serial", optional)
ttype = linotp.lib.token.getTokenType(serial).lower()
urealm = authUser.realm
pol_action = MAP_TYPE_GETOTP_ACTION.get(ttype, "")
if pol_action == "":
raise PolicyException(_("There is no policy selfservice/"
"max_count definable for the token "
"type %s.") % ttype)
policies = get_client_policy(get_client(), scope='selfservice',
realm=urealm, user=authUser.login,
userObj=authUser)
log.debug("[checkPolicyPre][seflservice][max_count] got a policy: "
" %r" % policies)
if policies == {}:
raise PolicyException(_("There is no policy selfservice/"
"max_count defined for the tokentype "
"%s in realm %s.") % (ttype, urealm))
value = getPolicyActionValue(policies, pol_action)
log.debug("[checkPolicyPre][seflservice][max_count] "
"got all policies: %r: %r" % (policies, value))
ret = value
elif 'usersetpin' == method:
if not 'setOTPPIN' in getSelfserviceActions(authUser):
log.warning("[usersetpin] user %s@%s is not allowed to call "
"this function!" % (authUser.login,
authUser.realm))
raise PolicyException(_('The policy settings do not allow you '
'to issue this request!'))
elif 'userreset' == method:
if not 'reset' in getSelfserviceActions(authUser):
log.warning("[userreset] user %s@%s is not allowed to call "
"this function!" % (authUser.login,
authUser.realm))
raise PolicyException(_('The policy settings do not allow you '
'to issue this request!'))
elif 'userresync' == method:
if not 'resync' in getSelfserviceActions(authUser):
log.warning("[userresync] user %s@%s is not allowed to call "
"this function!" % (authUser.login,
authUser.realm))
raise PolicyException(_('The policy settings do not allow you '
'to issue this request!'))
elif 'usersetmpin' == method:
if not 'setMOTPPIN' in getSelfserviceActions(authUser):
log.warning("[usersetmpin] user %r@%r is not allowed to call "
"this function!" % (authUser.login,
authUser.realm))
raise PolicyException(_('The policy settings do not allow you '
'to issue this request!'))
elif 'useractivateocratoken' == method:
user_selfservice_actions = getSelfserviceActions(authUser)
typ = param.get('type').lower()
if (typ == 'ocra'
and 'activateQR' not in user_selfservice_actions):
log.warning("[activateQR] user %r@%r is not allowed to call "
"this function!" % (authUser.login,
authUser.realm))
raise PolicyException(_('The policy settings do not allow you '
'to issue this request!'))
elif 'useractivateocra2token' == method:
user_selfservice_actions = getSelfserviceActions(authUser)
typ = param.get('type').lower()
if (typ == 'ocra2'
and 'activateQR2' not in user_selfservice_actions):
log.warning("[activateQR2 user %r@%r is not allowed to call "
"this function!" % (authUser.login,
authUser.realm))
raise PolicyException(_('The policy settings do not allow you '
'to issue this request!'))
elif 'userassign' == method:
if not 'assign' in getSelfserviceActions(authUser):
log.warning("[userassign] user %r@%r is not allowed to call "
"this function!" % (authUser.login,
authUser.realm))
raise PolicyException(_('The policy settings do not allow '
'you to issue this request!'))
# Here we check, if the tokennum exceeds the tokens
if not checkTokenNum():
log.error("[init] The maximum token number "
"is reached!")
raise PolicyException(_("You may not enroll any more tokens. "
"Your maximum token number "
"is reached!"))
if not checkTokenAssigned(authUser):
log.warning("[assign] the maximum number of allowed tokens is"
" exceeded. Check the policies")
raise PolicyException(_("The maximum number of allowed tokens "
"is exceeded. Check the policies"))
elif 'usergetserialbyotp' == method:
if not 'getserial' in getSelfserviceActions(authUser):
log.warning("[usergetserialbyotp] user %s@%s is not allowed to"
" call this function!" % (authUser.login,
authUser.realm))
raise PolicyException(_('The policy settings do not allow you to'
' request a serial by OTP!'))
elif 'userdisable' == method:
if not 'disable' in getSelfserviceActions(authUser):
log.warning("[userdisable] user %r@%r is not allowed to call "
"this function!"
% (authUser.login, authUser.realm))
raise PolicyException(_('The policy settings do not allow you '
'to issue this request!'))
elif 'userenable' == method:
if not 'enable' in getSelfserviceActions(authUser):
log.warning("[userenable] user %s@%s is not allowed to call "
"this function!"
% (authUser.login, authUser.realm))
raise PolicyException(_('The policy settings do not allow you to'
' issue this request!'))
elif 'userunassign' == method:
if not 'unassign' in getSelfserviceActions(authUser):
log.warning("[userunassign] user %r@%r is not allowed to call "
"this function!"
% (authUser.login, authUser.realm))
raise PolicyException(_('The policy settings do not allow you '
'to issue this request!'))
elif 'userdelete' == method:
if not 'delete' in getSelfserviceActions(authUser):
log.warning("[userdelete] user %r@%r is not allowed to call "
"this function!"
% (authUser.login, authUser.realm))
raise PolicyException(_('The policy settings do not allow you '
'to issue this request!'))
elif 'userwebprovision' == method:
user_selfservice_actions = getSelfserviceActions(authUser)
typ = param.get('type').lower()
if ((typ == 'oathtoken'
and 'webprovisionOATH' not in user_selfservice_actions)
or (typ == 'googleauthenticator_time'and
'webprovisionGOOGLEtime' not in user_selfservice_actions)
or (typ == 'googleauthenticator'
and 'webprovisionGOOGLE' not in user_selfservice_actions)):
log.warning("[userwebprovision] user %r@%r is not allowed to "
"call this function!" % (authUser.login,
authUser.realm))
raise PolicyException(_('The policy settings do not allow you '
'to issue this request!'))
# Here we check, if the tokennum exceeds the allowed tokens
if not checkTokenNum():
log.error("[userwebprovision] The maximum token "
"number is reached!")
raise PolicyException(_("You may not enroll any more tokens. "
"Your maximum token number "
"is reached!"))
if not checkTokenAssigned(authUser):
log.warning("[userwebprovision] the maximum number of allowed "
"tokens is exceeded. Check the policies")
raise PolicyException(_("The maximum number of allowed tokens "
"is exceeded. Check the policies"))
elif 'userhistory' == method:
if not 'history' in getSelfserviceActions(authUser):
log.warning("[userhistory] user %r@%r is not allowed to call "
"this function!"
% (authUser.login, authUser.realm))
raise PolicyException(_('The policy settings do not allow you '
'to issue this request!'))
elif 'userinit' == method:
allowed_actions = getSelfserviceActions(authUser)
typ = param['type'].lower()
meth = 'enroll' + typ.upper()
if meth not in allowed_actions:
log.warning("[userinit] user %r@%r is not allowed to "
"enroll %s!" % (authUser.login,
authUser.realm, typ))
raise PolicyException(_('The policy settings do not allow '
'you to issue this request!'))
# Here we check, if the tokennum exceeds the allowed tokens
if not checkTokenNum():
log.error("[userinit] The maximum token "
"number is reached!")
raise PolicyException(_("You may not enroll any more tokens. "
"Your maximum token number "
"is reached!"))
if not checkTokenAssigned(authUser):
log.warning("[userinit] the maximum number of allowed tokens "
"is exceeded. Check the policies")
raise PolicyException(_("The maximum number of allowed tokens "
"is exceeded. Check the policies"))
else:
log.error("[checkPolicyPre] Unknown method in "
"selfservice: %s" % method)
raise PolicyException(_("Unknown method in selfservice: %s") % method)
elif 'system' == controller:
actions = {
'setDefault': 'write',
'setConfig': 'write',
'delConfig': 'write',
'getConfig': 'read',
'getRealms': 'read',
'delResolver': 'write',
'getResolver': 'read',
'setResolver': 'write',
'getResolvers': 'read',
'setDefaultRealm': 'write',
'getDefaultRealm': 'read',
'setRealm': 'write',
'delRealm': 'write',
'setPolicy': 'write',
'importPolicy': 'write',
'policies_flexi': 'read',
'getPolicy': 'read',
'getPolicyDef': 'read',
'checkPolicy': "read",
'delPolicy': 'write',
'setSupport': 'write',
}
if not method in actions:
log.error("[checkPolicyPre] an unknown method was passed "
"in system: %s" % method)
raise PolicyException(_("Failed to run checkPolicyPre. "
"Unknown method: %s") % method)
auth = getAuthorization('system', actions[method])
if auth['active'] and not auth['auth']:
log.warning("[checkPolicyPre] admin >%s< is not authorited to %s."
" Missing policy scope=system, action=%s"
% (auth['admin'], method, actions[method]))
raise PolicyException(_("Policy check failed. You are not allowed "
"to %s system config.") % actions[method])
elif controller == 'ocra':
method_map = {'request': 'request', 'status': 'checkstatus',
'activationcode': 'getActivationCode',
'calcOTP': 'calculateOtp'}
admin_user = getUserFromRequest(request)
policies = getPolicy({'user': admin_user.get('login'), 'scope': 'ocra',
'action': method, 'client': get_client()})
if len(policies) == 0:
log.warning("[request] the admin >%r< is not allowed to do an ocra"
"/%r" % (admin_user.get('login'),
method_map.get(method)))
raise PolicyException(_("You do not have the administrative right to"
" do an ocra/%s") % method_map.get(method))
else:
# unknown controller
log.error("[checkPolicyPre] an unknown controller "
"<<%r>> was passed." % controller)
raise PolicyException(_("Failed to run getPolicyPre. Unknown "
"controller: %s") % controller)
return ret
[docs]def checkPolicyPost(controller, method, param=None, user=None):
'''
This function will check policies after a successful action in a
controller. E.g. this can be setting a random PIN after successfully
enrolling a token.
:param controller: the controller context
:param method: the calling action
:param param: This is a dictionary with the necessary parameters.
:param auth_user: This is the authenticated user. For the selfservice this
will be the user in the selfservice portal, for admin or
manage it will be the administrator
:return: It returns a dictionary with the necessary results. These depend
on the controller.
'''
ret = {}
if param is None:
param = {}
if 'admin' == controller:
log.debug("[checkPolicyPost] entering controller %s" % controller)
log.debug("[checkPolicyPost] entering method %s" % method)
log.debug("[checkPolicyPost] using params %s" % param)
serial = getParam(param, "serial", optional)
if user is None:
user = getUserFromParam(param, optional)
if 'init' == method:
# check if we are supposed to genereate a random OTP PIN
randomPINLength = -1
if user and user.login:
randomPINLength = getRandomOTPPINLength(user)
if randomPINLength > 0:
newpin = getRandomPin(randomPINLength)
log.debug("[init] setting random pin for token with serial "
"%s and user: %s" % (serial, user))
linotp.lib.token.setPin(newpin, None, serial)
log.debug("[init] pin set")
# TODO: This random PIN could be processed and
# printed in a PIN letter
elif 'getserial' == method:
# check if the serial/token, that was returned is in
# the realms of the admin!
policies = getAdminPolicies("getserial")
if (policies['active'] and not
checkAdminAuthorization(policies, serial,
User('', '', ''))):
log.warning("[getserial] the admin >%s< is not allowed to get "
"serial of token %s" % (policies['admin'], serial))
raise PolicyException(_("You do not have the administrative "
"right to get serials from this realm!"))
else:
# unknown method
log.error("[checkPolicyPost] an unknown method <<%s>>"
" was passed." % method)
raise PolicyException(_("Failed to run getPolicyPost. "
"Unknown method: %s") % method)
elif 'system' == controller:
log.debug("[cehckPolicyPost] entering controller %s" % controller)
if 'getRealms' == method:
systemReadRights = False
res = param['realms']
auth = getAuthorization('system', 'read')
if auth['auth']:
systemReadRights = True
if not systemReadRights:
# If the admin is not allowed to see all realms,
# (policy scope=system, action=read)
# the realms, where he has no administrative rights need,
# to be stripped.
pol = getAdminPolicies('')
if pol['active']:
log.debug("[getRealms] the admin has policies "
"in these realms: %r" % pol['realms'])
lowerRealms = uniquify(pol['realms'])
for realm, _v in res.items():
if ((not realm.lower() in lowerRealms)
and (not '*' in lowerRealms)):
log.debug("[getRealms] the admin has no policy in "
"realm %r. Deleting "
"it: %r" % (realm, res))
del res[realm]
else:
log.error("[checkPolicyPost] system: : getRealms: "
"The admin >%s< is not allowed to read system "
"config and has not realm administrative rights!"
% auth['admin'])
raise PolicyException(_("You do not have system config read "
"rights and not realm admin "
"policies."))
ret['realms'] = res
else:
# unknown controller
log.error("[checkPolicyPost] an unknown constroller <<%s>> "
"was passed." % controller)
raise PolicyException(_("Failed to run getPolicyPost. "
"Unknown controller: %s") % controller)
return ret
###############################################################################
#
# Client Policies
#
[docs]def get_client_policy(client, scope=None, action=None, realm=None, user=None,
find_resolver=True, userObj=None):
'''
This function returns the dictionary of policies for the given client.
1. First it searches for all policies matching (scope, action, realm) and
checks, whether the given client is contained in the policy field client.
If no policy for the given client is found it takes the policy without
a client
2. Then it strips down the returnable policies to those, that only contain
the username - UNLESS - none of the above policies contains a username
3. then we try to find resolvers in the username (OPTIONAL)
'''
Policies = {}
param = {}
if scope:
param["scope"] = scope
if action:
param["action"] = action
if realm:
param["realm"] = realm
log.debug("[get_client_policy] with params %r, "
"client %r and user %r" % (param, client, user))
Pols = getPolicy(param)
log.debug("[get_client_policy] got policies %s " % Pols)
def get_array(policy, attribute="client", marks=False):
## This function returns the parameter "client" or
## "user" in a policy as an array
attrs = policy.get(attribute, "")
if attrs == "None" or attrs is None:
attrs = ""
log.debug("[get_array] splitting <%s>" % attrs)
attrs_array = []
if marks:
attrs_array = [co.strip()[:-1] for co in attrs.split(',')
if len(co.strip()) and co.strip()[-1] == ":"]
else:
attrs_array = [co.strip()
for co in attrs.split(',')
if len(co.strip()) and co.strip()[-1] != ":"]
# if for some reason the first element is empty, delete it.
if len(attrs_array) and attrs_array[0] == "":
del attrs_array[0]
return attrs_array
## 1. Find a policy with this client
for pol, policy in Pols.items():
log.debug("[get_client_policy] checking policy %s" % pol)
clients_array = get_array(policy, attribute="client")
log.debug("[get_client_policy] the policy %s has these clients: %s. "
"checking against %s." % (pol, clients_array, client))
client_found = False
client_excluded = False
for cl in clients_array:
try:
if cl[0] in ['-', '!']:
if IPAddress(client) in IPNetwork(cl[1:]):
log.debug("[get_client_policy] the client %s is "
"excluded by %s in policy "
"%s" % (client, cl, pol))
client_excluded = True
if IPAddress(client) in IPNetwork(cl):
client_found = True
except Exception as e:
log.warning("[get_client_policy] authorization policy %s with "
"invalid client: %r" % (pol, e))
if client_found and not client_excluded:
Policies[pol] = policy
# No policy for this client was found, but maybe
# there is one without clients
if len(Policies) == 0:
log.debug("[get_client_policy] looking for policy without any client")
for pol, policy in Pols.items():
if len(get_array(policy, attribute="client")) == 0:
Policies[pol] = policy
## 2. Within those policies select the policy with the user.
## if there is a policy with this very user, return only
## these policies, otherwise return all policies
if user:
user_policy_found = False
own_policies = {}
default_policies = {}
for polname, pol in Policies.items():
users = get_array(pol, attribute="user")
log.debug("[get_client_policy] search user %s in users %s "
"of policy %s" % (user, users, polname))
if user in users or '*' in users:
log.debug("[get_client_policy] adding %s to "
"own_policies" % polname)
own_policies[polname] = pol
elif len(users) == 0:
log.debug("[get_client_policy] adding %s to "
"default_policies" % polname)
default_policies[polname] = pol
else:
log.debug("[get_client_policy] policy %s contains only users "
"(%s) other than %s" % (polname, users, user))
if len(own_policies):
Policies = own_policies
user_policy_found = True
else:
Policies = default_policies
##3. If no user specific policy was found, we now take a look,
## if we find a policy with the matching resolver.
if not user_policy_found and realm and find_resolver:
## get the resolver of the user in the realm and search for this
## resolver in the policies
if userObj is not None:
resolvers = getResolversOfUser(userObj)
else:
resolvers = getResolversOfUser(User(login=user, realm=realm))
own_policies = {}
default_policies = {}
for polname, pol in Policies.items():
resolvs = get_array(pol, attribute="user", marks=True)
for r in resolvers:
# trim the resolver useridresolver.LDAPIdResolver.\
# IdResolver.local to its name
r = r[r.rfind('.') + 1:]
if r in resolvs:
log.debug("[get_client_policy] adding %s to "
"own_policies" % polname)
own_policies[polname] = pol
elif len(resolvs) == 0:
log.debug("[get_client_policy] adding %s (no "
"resolvers) to default_policies" % polname)
default_policies[polname] = pol
else:
log.debug("[get_client_policy] policy %s contains "
"only resolvers (%s) other than %s" %
(polname, resolvs, r))
if len(own_policies):
Policies = own_policies
else:
Policies = default_policies
return Policies
[docs]def set_realm(login, realm, exception=False):
'''
this function reads the policy scope: authorization, client: x.y.z,
action: setrealm=new_realm and overwrites the existing realm of the user
with the new_realm.
This can be used, if the client is not able to pass a realm and the users
are not be located in the default realm.
returns:
realm - name of the new realm taken from the policy
'''
client = get_client()
log.debug("[set_realm] got the client %s" % client)
log.debug("[set_realm] users %s original realm is %s" % (login, realm))
policies = get_client_policy(client, scope="authorization",
action="setrealm", realm=realm,
user=login, find_resolver=False)
if len(policies):
realm = getPolicyActionValue(policies, "setrealm", String=True)
log.debug("[set_realm] users %s new realm is %s" % (login, realm))
return realm
[docs]def check_user_authorization(login, realm, exception=False):
'''
check if the given user/realm is in the given policy.
The realm may contain the wildcard '*', then the policy holds for
all realms. If no username or '*' is given, the policy holds for all users.
attributes:
login - loginname of the user
realm - realm of the user
exception - wether it should return True/False or raise an Exception
'''
res = False
# if there is absolutely NO policy in scope authorization,
# we return immediately
if len(getPolicy({"scope": "authorization", "action": "authorize"})) == 0:
log.debug("[check_user_authorization] absolutely "
"no authorization policy.")
return True
client = get_client()
log.debug("[check_user_authorization] got the client %s" % client)
policies = get_client_policy(client, scope="authorization",
action="authorize", realm=realm, user=login)
log.debug("[check_user_authorization] got policies %s for "
"user %s" % (policies, login))
if len(policies):
res = True
if res is False and exception:
raise AuthorizeException("Authorization on client %s failed "
"for %s@%s." % (client, login, realm))
return res
###############################################################################
#
# Authentication stuff
#
[docs]def get_auth_passthru(user):
'''
returns True, if the user in this realm should be authenticated against
the UserIdResolver in case the user has no tokens assigned.
'''
ret = False
client = get_client()
pol = get_client_policy(client, scope="authentication",
action="passthru", realm=user.realm,
user=user.login, userObj=user)
if len(pol) > 0:
ret = True
return ret
[docs]def get_auth_passOnNoToken(user):
'''
returns True, if the user in this realm should be always authenticated
in case the user has no tokens assigned.
'''
ret = False
client = get_client()
pol = get_client_policy(client, scope="authentication",
action="passOnNoToken", realm=user.realm,
user=user.login, userObj=user)
if len(pol) > 0:
ret = True
return ret
[docs]def get_auth_AutoSMSPolicy(realms=None):
'''
Returns true, if the autosms policy is set in one of the realms
return:
True or False
input:
list of realms
'''
log.debug("[get_auth_AutoSMSPolicy] checking realms %r " % realms)
client = get_client()
user = getUserFromParam(request.params, optional)
login = user.login
if realms is None:
realm = user.realm or getDefaultRealm()
realms = [realm]
ret = False
for realm in realms:
pol = get_client_policy(client, scope="authentication",
action="autosms", realm=realm,
user=login, userObj=user)
if len(pol) > 0:
log.debug("[get_auth_AutoSMSPolicy] found policy in "
"realm %s" % realm)
ret = True
return ret
[docs]def get_auth_challenge_response(user, ttype):
"""
returns True, if the user in this realm with this token type should be
authenticated via Challenge Response
:param user: the user object
:param ttype: the type of the token
:return: bool
"""
ret = False
p_user = None
p_realm = None
if user is not None:
p_user = user.login
p_realm = user.realm
client = get_client()
pol = get_client_policy(client, scope="authentication",
action="challenge_response",
realm=p_realm,
user=p_user, userObj=user)
log.debug("[get_auth_challenge_response] got policy %r for user "
"%r@%r from client %r" % (pol, p_user, p_realm, client))
Token_Types = getPolicyActionValue(pol, "challenge_response", String=True)
token_types = [t.lower() for t in Token_Types.split()]
if ttype.lower() in token_types or '*' in token_types:
log.debug("[get_auth_challenge_response] found matching "
"token type %s" % ttype)
ret = True
return ret
[docs]def get_auth_PinPolicy(realm=None, user=None):
'''
Returns the PIN policy, that defines, how the OTP PIN is to be verified
within the given realm
return:
0 - verify against fixed OTP PIN
1 - verify the password component against the
UserResolver (LPAP Password etc.)
2 - verify no OTP PIN at all! Only OTP value!
The policy is defined via
scope : authentication
realm : ....
action: otppin=0/1/2
client: IP
user : some user
'''
log.debug("[get_auth_PinPolicy]")
client = get_client()
if user is None:
user = getUserFromParam(request.params, optional)
login = user.login
if realm is None:
realm = user.realm or getDefaultRealm()
pol = get_client_policy(client, scope="authentication", action="otppin",
realm=realm, user=login, userObj=user)
log.debug("[get_auth_PinPolicy] got policy %s"
" for user %s@%s client %s" % (pol, login, realm, client))
pin_check = getPolicyActionValue(pol, "otppin", max=False)
if pin_check in [1, 2]:
return pin_check
return 0
[docs]def get_qrtan_url(realm):
'''
Returns the URL for the half automatic mode for the QR TAN token
for the given realm
:return: url string
'''
log.debug("[get_qrtan_url] getting url for realm %s" % realm)
pol = getPolicy({"scope": "authentication", "realm": realm})
url = getPolicyActionValue(pol, "qrtanurl", String=True)
log.debug("[get_qrtan_url] using url %s for realm %s" % (url, realm))
return url
###############################################################################
#
# Authorization
#
[docs]def check_auth_tokentype(serial, exception=False, user=None):
'''
Checks if the token type of the given serial matches the tokentype policy
:return: True/False - returns true or false or raises an exception
if exception=True
'''
log.debug("[check_auth_tokentype]")
if serial is None:
# if no serial is given, we return True right away
log.debug("[check_auth_tokentype] We have got no serial. "
"Obviously doing passthru.")
return True
client = get_client()
if user is None:
user = getUserFromParam(request.params, optional)
login = user.login
realm = user.realm or getDefaultRealm()
tokentypes = []
tokentype = ""
res = False
pol = get_client_policy(client, scope="authorization", action="tokentype",
realm=realm, user=login, userObj=user)
log.debug("[check_auth_tokentype] got policy %s"
" for user %s@%s client %s" % (pol, login, realm, client))
t_type = getPolicyActionValue(pol, "tokentype", max=False, String=True)
if len(t_type) > 0:
tokentypes = [t.strip() for t in t_type.lower().split(" ")]
log.debug("[check_auth_tokentype] found these "
"tokentypes: <%s>" % tokentypes)
toks = linotp.lib.token.getTokens4UserOrSerial(None, serial)
if len(toks) > 1:
log.error("[check_auth_tokentype] multiple tokens with serial %s found"
" - cannot get OTP!" % serial)
raise PolicyException(_("multiple tokens found - "
"cannot determine tokentype!"))
elif len(toks) == 1:
log.debug("[check_auth_tokentype] found one token with "
"serial %s" % serial)
tokentype = toks[0].getType().lower()
log.debug("[check_auth_tokentype] got the type %s for "
"token %s" % (tokentype, serial))
if (tokentype in tokentypes or '*' in tokentypes
or len(tokentypes) == 0):
res = True
elif len(toks) == 0:
## TODO if the user does not exis or does have no token
## ---- WHAT DO WE DO? ---
## At the moment we pass through: This is the old behaviour...
res = True
if res is False and exception:
c.audit["action_detail"] = \
"failed due to authorization/tokentype policy"
raise AuthorizeException("Authorization for token %s with type %s "
"failed on client %s" % (serial, tokentype,
client))
return res
[docs]def check_auth_serial(serial, exception=False, user=None):
'''
Checks if the token with the serial number matches the serial
authorize policy scope=authoriztaion, action=serial
:param serial: The serial number of the token to check
:type serial: string
:param exception: If "True" an exception is raised instead of
returning False
:type exception: boolean
:param user: User to narrow down the policy
:type user: User object
:return: result
:rtype: boolean
'''
log.debug("[check_auth_serial]")
if serial is None:
# if no serial is given, we return True right away
log.debug("[check_auth_serial] We have got no serial. "
"Obviously doing passthru.")
return True
client = get_client()
if user is None:
user = getUserFromParam(request.params, optional)
login = user.login
realm = user.realm or getDefaultRealm()
res = False
pol = get_client_policy(client, scope="authorization", action="serial",
realm=realm, user=login, userObj=user)
if len(pol) == 0:
# No policy found, so we skip the rest
log.debug("[check_auth_serial] No policy scope=authorize,"
"action=serial for user %r, realm %r, client %r"
% (login, realm, client))
return True
log.debug("[check_auth_serial] got policy %s"
" for user %s@%s client %s" % (pol, login, realm, client))
# extract the value from the policy
serial_regexp = getPolicyActionValue(pol, "serial", max=False, String=True)
log.debug("[check_auth_serial] found this regexp /%r/ for the serial %r"
% (serial_regexp, serial))
if re.search(serial_regexp, serial):
log.debug("[check_auth_serial] regexp matches.")
res = True
if res is False and exception:
c.audit["action_detail"] = "failed due to authorization/serial policy"
raise AuthorizeException("Authorization for token %s failed on "
"client %s" % (serial, client))
return res
[docs]def is_auth_return(success=True, user=None):
'''
returns True if the policy
scope = authorization
action = detail_on_success/detail_on_fail
is set.
:param success: Defines if we should check of the policy
detaul_on_success (True) or detail_on_fail (False)
:type success: bool
'''
ret = False
client = get_client()
if user is None:
user = getUserFromParam(request.params, optional)
login = user.login
realm = user.realm or getDefaultRealm()
if success:
pol = get_client_policy(client, scope="authorization",
action="detail_on_success", realm=realm,
user=login, userObj=user)
else:
pol = get_client_policy(client, scope="authorization",
action="detail_on_fail", realm=realm,
user=login, userObj=user)
if len(pol):
ret = True
return ret
### helper ################################
[docs]def get_pin_policies(user):
'''
lookup for the pin policies - the list of policies
is preserved for repeated lookups
: raises: exception, if more then one pin policies are matching
:param user: the policies which are applicable to the user
:return: list of otppin id's
'''
pin_policies = []
pin_policies.append(get_auth_PinPolicy(user=user))
pin_policies = list(set(pin_policies))
if len(pin_policies) > 1:
msg = ("conflicting authentication polices. "
"Check scope=authentication. policies: %r" % pin_policies)
log.error("[__checkToken] %r" % msg)
#self.context.audit['action_detail'] = msg
raise Exception('multiple pin policies found')
## former return -2
return pin_policies
#eof###########################################################################