Source code for linotp.controllers.audit

# -*- coding: utf-8 -*-
#
#    LinOTP - the open source solution for two factor authentication
#    Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
#    This file is part of LinOTP server.
#
#    This program is free software: you can redistribute it and/or
#    modify it under the terms of the GNU Affero General Public
#    License, version 3, as published by the Free Software Foundation.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the
#               GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#    E-mail: linotp@lsexperts.de
#    Contact: www.linotp.org
#    Support: www.lsexperts.de
#
"""
audit controller - to search the audit trail
"""




import logging


from pylons import tmpl_context as c
from pylons import request, response, config
from linotp.lib.base import BaseController


from linotp.lib.util import  check_session
from linotp.lib.user import  getUserFromRequest
from linotp.lib.policy import checkPolicyPre, PolicyException

from linotp.lib.reply import sendError
from linotp.lib.audit.base import search as audit_search
from linotp.lib.audit.iterator import AuditQuery
from linotp.lib.audit.iterator import CSVAuditIterator
from linotp.lib.audit.iterator import JSONAuditIterator

from linotp.lib.util import getParam
from linotp.lib.util import get_client

from linotp.model.meta import Session

import traceback


audit = config.get('audit')

optional = True
required = False

log = logging.getLogger(__name__)


[docs]class AuditController(BaseController): ''' this is the controller for doing some audit stuff https://server/audit/<functionname> ''' def __before__(self, action, **params): log.debug("[__before__::%r] %r" % (action, params)) try: audit.initialize() c.audit['client'] = get_client() check_session() except Exception as exx: log.error("[__before__::%r] exception %r" % (action, exx)) log.error("[__before__] %s" % traceback.format_exc()) Session.rollback() Session.close() return sendError(response, exx, context='before') finally: log.debug("[__before__::%r] done" % (action)) def __after__(self): c.audit['administrator'] = getUserFromRequest(request).get("login") audit.log(c.audit)
[docs] def search(self): ''' This functions searches within the audit trail It returns the audit information for the given search pattern method: audit/search arguments: key, value pairs as search patterns. * outform - optional: if set to "csv", than the token list will be given in CSV or: Usually the key=values will be locally AND concatenated. it a parameter or=true is passed, the filters will be OR concatenated. The Flexigrid provides us the following parameters: ('page', u'1'), ('rp', u'25'), ('sortname', u'number'), ('sortorder', u'asc'), ('query', u''), ('qtype', u'serial')] returns: JSON response or csv format ''' param = {} try: param.update(request.params) log.debug("[search] params: %s" % param) checkPolicyPre('audit', 'view', {}) log.debug("[search] params %r" % param) # remove the param outform (and other parameters that should not # be used for search! search_params = {} search_params.update(param) for key in ["outform", 'delimiter']: if key in search_params: del search_params[key] output_format = param.get("outform", 'json') or 'json' delimiter = param.get('delimiter', ',') or ',' audit_iterator = None log.debug("[search] search params %r" % search_params) audit_query = AuditQuery(search_params, audit) if output_format == "csv": filename = "linotp-audit.csv" response.content_type = "application/force-download" response.headers['Content-disposition'] = ( 'attachment; filename=%s' % filename) audit_iterator = CSVAuditIterator(audit_query, delimiter) else: response.content_type = 'application/json' audit_iterator = JSONAuditIterator(audit_query) c.audit['success'] = True Session.commit() return audit_iterator except PolicyException as pe: log.error("[getotp] gettoken/getotp policy failed: %r" % pe) log.error("[getotp] %s" % traceback.format_exc()) Session.rollback() return sendError(response, unicode(pe), 1) except Exception as e: log.error("[search] audit/search failed: %r" % e) log.error("[search] %s" % traceback.format_exc()) Session.rollback() return sendError(response, "audit/search failed: %s" % unicode(e), 0) finally: Session.close() log.debug('[search] done') #eof###########################################################################