Source code for linotp.controllers.manage

# -*- coding: utf-8 -*-
#
#    LinOTP - the open source solution for two factor authentication
#    Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
#    This file is part of LinOTP server.
#
#    This program is free software: you can redistribute it and/or
#    modify it under the terms of the GNU Affero General Public
#    License, version 3, as published by the Free Software Foundation.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the
#               GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#    E-mail: linotp@lsexperts.de
#    Contact: www.linotp.org
#    Support: www.lsexperts.de
#

"""
manage controller - In provides the web gui management interface
"""

import os

try:
    import json
except ImportError:
    import simplejson as json

from pylons import request, response, config, tmpl_context as c
from linotp.lib.base import BaseController
from pylons.templating import render_mako as render
from mako.exceptions import CompileException

from paste.deploy.converters import asbool

# Our Token stuff
from linotp.lib.token   import TokenIterator
from linotp.lib.token   import getTokenType
from linotp.lib.token   import newToken


from linotp.lib.user    import getUserFromParam, getUserFromRequest
from linotp.lib.user    import getUserList, User

from linotp.lib.util    import getParam
from linotp.lib.util    import check_session
from linotp.lib.util    import get_version
from linotp.lib.util    import get_copyright_info
from linotp.lib.reply   import sendError

from linotp.lib.util    import remove_empty_lines
from linotp.lib.util import get_client
from linotp.model.meta import Session

from linotp.lib.policy import checkPolicyPre, PolicyException, getAdminPolicies, getPolicyDefinitions

from pylons.i18n.translation import _

audit = config.get('audit')

import traceback
import logging

log = logging.getLogger(__name__)

from linotp.lib.ImportOTP import getKnownTypes, getImportText
KNOWN_TYPES = getKnownTypes()
IMPORT_TEXT = getImportText()
log.info("importing linotp.lib. Known import types: %s" % IMPORT_TEXT)


optional = True
required = False

[docs]class ManageController(BaseController): def __before__(self, action, **params): log.debug("[__before__::%r] %r" % (action, params)) try: audit.initialize() c.audit['success'] = False c.audit['client'] = get_client() self.set_language() c.version = get_version() c.licenseinfo = get_copyright_info() c.polDefs = getPolicyDefinitions() # Session handling for the functions, that show data: # Also exclude custom-style.css, since the CSRF check # will always fail and return a HTTP 401 anyway. # A HTTP 404 makes more sense. if request.path.lower() in ['/manage/', '/manage', '/manage/logout', '/manage/audittrail', '/manage/policies', '/manage/tokenview', '/manage/userview', '/manage/help', '/manage/custom-style.css']: pass else: check_session() except Exception as exx: log.error("[__before__::%r] exception %r" % (action, exx)) log.error("[__before__] %s" % traceback.format_exc()) Session.rollback() Session.close() return sendError(response, exx, context='before') finally: log.debug("[__before__::%r] done" % (action)) def __after__(self): if c.audit['action'] in [ 'manage/tokenview_flexi', 'manage/userview_flexi' ]: c.audit['administrator'] = getUserFromRequest(request).get("login") if request.params.has_key('serial'): c.audit['serial'] = request.params['serial'] c.audit['token_type'] = getTokenType(request.params['serial']) audit.log(c.audit)
[docs] def index(self): ''' This is the main function of the management web UI ''' try: c.debug = asbool(config.get('debug', False)) c.title = "LinOTP Management" admin_user = getUserFromRequest(request) if admin_user.has_key('login'): c.admin = admin_user['login'] log.debug("[index] importers: %s" % IMPORT_TEXT) c.importers = IMPORT_TEXT c.help_url = config.get('help_url') ## add render info for token type config confs = _getTokenTypeConfig('config') token_config_tab = {} token_config_div = {} for conf in confs: tab = '' div = '' try: #loc = conf +'_token_settings' tab = confs.get(conf).get('title') #tab = '<li ><a href=#'+loc+'>'+tab+'</a></li>' div = confs.get(conf).get('html') #div = +div+'</div>' except Exception as e: log.debug('[index] no config info for token type %s (%r)' % (conf, e)) if tab is not None and div is not None and len(tab) > 0 and len(div) > 0: token_config_tab[conf] = tab token_config_div[conf] = div c.token_config_tab = token_config_tab c.token_config_div = token_config_div ## add the enrollment fragments from the token definition ## tab: <option value="ocra">${_("OCRA - challenge/response Token")}</option> ## div: "<div id='"+ tt + "'>"+enroll+"</div>" enrolls = _getTokenTypeConfig('init') token_enroll_tab = {} token_enroll_div = {} for conf in enrolls: tab = '' div = '' try: tab = enrolls.get(conf).get('title') div = enrolls.get(conf).get('html') except Exception as e: log.debug('[index] no enrollment info for token type %s (%r)' % (conf, e)) if tab is not None and div is not None and len(tab) > 0 and len(div) > 0: token_enroll_tab[conf] = tab token_enroll_div[conf] = div c.token_enroll_tab = token_enroll_tab c.token_enroll_div = token_enroll_div c.tokentypes = _getTokenTypes() http_host = request.environ.get("HTTP_HOST") url_scheme = request.environ.get("wsgi.url_scheme") c.logout_url = "%s://log-me-out:fake@%s/manage/logout" % (url_scheme, http_host) Session.commit() ren = render('/manage/manage-base.mako') return ren except PolicyException as pe: log.error("[index] Error during checking policies: %r" % pe) log.error("[index] %s" % traceback.format_exc()) Session.rollback() return sendError(response, unicode(pe), 1) except Exception as ex: log.error("[index] failed! %r" % ex) log.error("[index] %s" % traceback.format_exc()) Session.rollback() return sendError(response, ex) finally: Session.close() log.debug('[index] done')
[docs] def tokentype(self): ''' ''' c.title = 'TokenTypeInfo' g = config['pylons.app_globals'] tokens = g.tokenclasses ttinfo = [] ttinfo.extend(tokens.keys()) for tok in tokens: tclass = tokens.get(tok) tclass_object = newToken(tclass) if hasattr(tclass_object, 'getClassType'): ii = tclass_object.getClassType() ttinfo.append(ii) log.debug("[index] importers: %s" % IMPORT_TEXT) c.tokeninfo = ttinfo return render('/manage/tokentypeinfo.mako')
[docs] def policies(self): ''' This is the template for the policies TAB ''' c.title = "LinOTP Management - Policies" return render('/manage/policies.mako')
[docs] def audittrail(self): ''' This is the template for the audit trail TAB ''' c.title = "LinOTP Management - Audit Trail" return render('/manage/audit.mako')
[docs] def tokenview(self): ''' This is the template for the token TAB ''' c.title = "LinOTP Management" c.tokenArray = [] return render('/manage/tokenview.mako')
[docs] def userview(self): ''' This is the template for the token TAB ''' c.title = "LinOTP Management" c.tokenArray = [] return render('/manage/userview.mako')
[docs] def custom_style(self): ''' If this action was called, the user hasn't created a custom-style.css yet. To avoid hitting the debug console over and over, we serve an empty file. ''' response.headers['Content-type'] = 'text/css' return ''
def _flexi_error(self, error): return json.dumps({ "page": 1, "total": 1, "rows": [ { 'id' : 'error', 'cell' : ['E r r o r', error, '', '', '', '', '', '' ] } ] } , indent=3)
[docs] def tokenview_flexi(self): ''' This function is used to fill the flexigrid. Unlike the complex /admin/show function, it only returns a simple array of the tokens. ''' param = request.params try: #serial = getParam(param,"serial",optional) c.page = getParam(param, "page", optional) c.filter = getParam(param, "query", optional) c.qtype = getParam(param, "qtype", optional) c.sort = getParam(param, "sortname", optional) c.dir = getParam(param, "sortorder", optional) c.psize = getParam(param, "rp", optional) filter_all = None filter_realm = None user = User() if c.qtype == "loginname": if "@" in c.filter: (login, realm) = c.filter.split("@") user = User(login, realm) else: user = User(c.filter) elif c.qtype == "all": filter_all = c.filter elif c.qtype == "realm": filter_realm = c.filter # check admin authorization res = checkPolicyPre('admin', 'show', param , user=user) filterRealm = res['realms'] # check if policies are active at all # If they are not active, we are allowed to SHOW any tokens. pol = getAdminPolicies("show") # If there are no admin policies, we are allowed to see all realms if not pol['active']: filterRealm = ["*"] # check if we only want to see ONE realm or see all realms we are allowerd to see. if filter_realm: if filter_realm in filterRealm or '*' in filterRealm: filterRealm = [filter_realm] log.debug("[tokenview_flexi] admin >%s< may display the following realms: %s" % (pol['admin'], pol['realms'])) log.debug("[tokenview_flexi] page: %s, filter: %s, sort: %s, dir: %s" % (c.page, c.filter, c.sort, c.dir)) if c.page is None: c.page = 1 if c.psize is None: c.psize = 20 log.debug("[tokenview_flexi] calling TokenIterator for user=%s@%s, filter=%s, filterRealm=%s" % (user.login, user.realm, filter_all, filterRealm)) c.tokenArray = TokenIterator(user, None, c.page , c.psize, filter_all, c.sort, c.dir, filterRealm=filterRealm) c.resultset = c.tokenArray.getResultSetInfo() # If we have chosen a page to big! lines = [] for tok in c.tokenArray: lines.append( { 'id' : tok['LinOtp.TokenSerialnumber'], 'cell': [ tok['LinOtp.TokenSerialnumber'], tok['LinOtp.Isactive'], tok['User.username'], tok['LinOtp.RealmNames'], tok['LinOtp.TokenType'], tok['LinOtp.FailCount'], tok['LinOtp.TokenDesc'], tok['LinOtp.MaxFail'], tok['LinOtp.OtpLen'], tok['LinOtp.CountWindow'], tok['LinOtp.SyncWindow'], tok['LinOtp.Userid'], tok['LinOtp.IdResolver'], ] } ) # We need to return 'page', 'total', 'rows' response.content_type = 'application/json' res = { "page": int(c.page), "total": c.resultset['tokens'], "rows": lines } c.audit['success'] = True Session.commit() return json.dumps(res, indent=3) except PolicyException as pe: log.error("[tokenview_flexi] Error during checking policies: %r" % pe) log.error("[tokenview_flexi] %s" % traceback.format_exc()) Session.rollback() return sendError(response, unicode(pe), 1) except Exception as e: log.error("[tokenview_flexi] failed: %r" % e) log.error("[tokenview_flexi] %s" % traceback.format_exc()) Session.rollback() return sendError(response, e) finally: Session.close() log.debug("[tokenview_flexi] done")
[docs] def userview_flexi(self): ''' This function is used to fill the flexigrid. Unlike the complex /admin/userlist function, it only returns a simple array of the tokens. ''' param = request.params try: #serial = getParam(param,"serial",optional) c.page = getParam(param, "page", optional) c.filter = getParam(param, "query", optional) qtype = getParam(param, "qtype", optional) c.sort = getParam(param, "sortname", optional) c.dir = getParam(param, "sortorder", optional) c.psize = getParam(param, "rp", optional) c.realm = getParam(param, "realm", optional) user = getUserFromParam(param, optional) # check admin authorization # check if we got a realm or resolver, that is ok! checkPolicyPre('admin', 'userlist', { 'user': "dummy", 'realm' : c.realm }) if c.filter == "": c.filter = "*" log.debug("[userview_flexi] page: %s, filter: %s, sort: %s, dir: %s" % (c.page, c.filter, c.sort, c.dir)) if c.page is None: c.page = 1 if c.psize is None: c.psize = 20 c.userArray = getUserList({ qtype:c.filter, 'realm':c.realm }, user) c.userNum = len(c.userArray) lines = [] for u in c.userArray: # shorten the useridresolver, to get a better display value resolver_display = "" if "useridresolver" in u: if len(u['useridresolver'].split(".")) > 3: resolver_display = u['useridresolver'].split(".")[3] + " (" + u['useridresolver'].split(".")[1] + ")" else: resolver_display = u['useridresolver'] lines.append( { 'id' : u['username'], 'cell': [ (u['username']) if u.has_key('username') else (""), (resolver_display), (u['surname']) if u.has_key('surname') else (""), (u['givenname']) if u.has_key('givenname') else (""), (u['email']) if u.has_key('email') else (""), (u['mobile']) if u.has_key('mobile') else (""), (u['phone']) if u.has_key('phone') else (""), (u['userid']) if u.has_key('userid') else (""), ] } ) # sorting reverse = False sortnames = { 'username' : 0, 'useridresolver' : 1, 'surname' : 2, 'givenname' : 3, 'email' : 4, 'mobile' :5, 'phone' : 6, 'userid' : 7 } if c.dir == "desc": reverse = True lines = sorted(lines, key=lambda user: user['cell'][sortnames[c.sort]] , reverse=reverse) # end: sorting # reducing the page if c.page and c.psize: page = int(c.page) psize = int(c.psize) start = psize * (page - 1) end = start + psize lines = lines[start:end] # We need to return 'page', 'total', 'rows' response.content_type = 'application/json' res = { "page": int(c.page), "total": c.userNum, "rows": lines } c.audit['success'] = True Session.commit() return json.dumps(res, indent=3) except PolicyException as pe: log.error("[userview_flexi] Error during checking policies: %r" % pe) log.error("[userview_flexi] %s" % traceback.format_exc()) Session.rollback() return sendError(response, unicode(pe), 1) except Exception as e: log.error("[userview_flexi] failed: %r" % e) log.error("[userview_flexi] %s" % traceback.format_exc()) Session.rollback() return sendError(response, e) finally: Session.close() log.debug('[userview_flexi] done')
[docs] def tokeninfo(self): ''' this returns the contents of /admin/show?serial=xyz in a html format ''' param = request.params try: serial = getParam(param, 'serial', required) filterRealm = "" # check admin authorization res = checkPolicyPre('admin', 'show', param) filterRealm = res['realms'] # check if policies are active at all # If they are not active, we are allowed to SHOW any tokens. pol = getAdminPolicies("show") if not pol['active']: filterRealm = ["*"] log.info("[tokeninfo] admin >%s< may display the following realms: %s" % (res['admin'], filterRealm)) log.info("[tokeninfo] displaying tokens: serial: %s", serial) toks = TokenIterator(User("", "", ""), serial, filterRealm=filterRealm) ### now row by row lines = [] for tok in toks: lines.append(tok) if len(lines) > 0: c.tokeninfo = lines[0] else: c.tokeninfo = {} for k in c.tokeninfo: if "LinOtp.TokenInfo" == k: try: # Try to convert string to Dictionary c.tokeninfo['LinOtp.TokenInfo'] = json.loads(c.tokeninfo['LinOtp.TokenInfo']) except: pass return render('/manage/tokeninfo.mako') except PolicyException as pe: log.error("[tokeninfo] Error during checking policies: %r" % pe) log.error("[tokeninfo] %s" % traceback.format_exc()) Session.rollback() return sendError(response, unicode(pe), 1) except Exception as e: log.error("[tokeninfo] failed! %r" % e) log.error("[tokeninfo] %s" % traceback.format_exc()) Session.rollback() return sendError(response, e) finally: Session.close() log.debug('[tokeninfo] done')
[docs] def logout(self): ''' redirect logout ''' from pylons.controllers.util import redirect http_host = request.environ.get("HTTP_HOST") url_scheme = request.environ.get("wsgi.url_scheme", "https") redirect("%s://%s/manage/" % (url_scheme, http_host))
[docs] def help(self): ''' This downloads the Manual The filename will be the 3. part,ID https://172.16.200.6/manage/help/somehelp.pdf The file is downloaded through pylons! ''' try: directory = config.get("linotpManual.Directory", "/usr/share/doc/linotp") default_filename = config.get("linotpManual.File", "LinOTP_Manual-en.pdf") headers = [] route_dict = request.environ.get('pylons.routes_dict') filename = route_dict.get('id') if not filename: filename = default_filename + ".gz" headers = [('content-Disposition', 'attachment; filename=\"' + default_filename + '\"'), ('content-Type', 'application/x-gzip') ] from paste.fileapp import FileApp wsgi_app = FileApp("%s/%s" % (directory, filename), headers=headers) Session.commit() return wsgi_app(request.environ, self.start_response) except Exception as e: log.error("[help] Error loading helpfile: %r" % e) log.error("[help] %s" % traceback.format_exc()) Session.rollback() return sendError(response, e) finally: Session.close() log.debug("[help] done") ############################################################
def _getTokenTypes(): ''' _getTokenTypes - retrieve the list of dynamic tokens and their title section :return: dict with token type and title :rtype: dict ''' glo = config['pylons.app_globals'] tokenclasses = glo.tokenclasses tokens = [] tokens.extend(tokenclasses.keys()) tinfo = {} for tok in tokens: if tok in tokenclasses.keys(): tclass = tokenclasses.get(tok) tclass_object = newToken(tclass) if hasattr(tclass_object, 'getClassInfo'): ii = tclass_object.getClassInfo('title') or tok tinfo[tok] = _(ii) return tinfo def _getTokenTypeConfig(section='config'): ''' _getTokenTypeConfig - retrieve from the dynamic token the tokentype section, eg. config or enroll :param section: the section of the tokentypeconfig :type section: string :return: dict with tab and page definition (rendered) :rtype: dict ''' res = {} g = config['pylons.app_globals'] tokenclasses = g.tokenclasses for tok in tokenclasses.keys(): tclass = tokenclasses.get(tok) tclass_object = newToken(tclass) if hasattr(tclass_object, 'getClassInfo'): conf = tclass_object.getClassInfo(section, ret={}) ## set globale render scope, so that the mako ## renderer will return only a subsection from the template p_html = '' t_html = '' try: page = conf.get('page') c.scope = page.get('scope') p_html = render(os.path.sep + page.get('html')) p_html = remove_empty_lines(p_html) tab = conf.get('title') c.scope = tab.get('scope') t_html = render(os.path.sep + tab.get('html')) t_html = remove_empty_lines(t_html) except CompileException as ex: log.error("[_getTokenTypeConfig] compile error while processing %r.%r:" % (tok, section)) log.error("[_getTokenTypeConfig] %r" % ex) log.error("[_getTokenTypeConfig] %s" % traceback.format_exc()) raise Exception(ex) except Exception as e: log.debug('no config for token type %r (%r)' % (tok, e)) p_html = '' if len (p_html) > 0: res[tok] = { 'html' : p_html, 'title' : t_html} return res ############################################################