Files
@ ce5d4c582a82
Branch filter:
Location: kallithea/kallithea/controllers/admin/admin.py
ce5d4c582a82
5.3 KiB
text/x-python
py3: cleanup map usage and avoid py3 ambiguity
Based on 2to3 -f map ... but replace map with something more explicit (unless
born and raised in a lisp world) (but sometimes slightly more verbose).
Based on 2to3 -f map ... but replace map with something more explicit (unless
born and raised in a lisp world) (but sometimes slightly more verbose).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | # -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.controllers.admin.admin
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Controller for Admin panel of Kallithea
This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Apr 7, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.
"""
import logging
from sqlalchemy.orm import joinedload
from sqlalchemy.sql.expression import and_, func, or_
from tg import request
from tg import tmpl_context as c
from whoosh import query
from whoosh.qparser.dateparse import DateParserPlugin
from whoosh.qparser.default import QueryParser
from kallithea.lib.auth import HasPermissionAnyDecorator, LoginRequired
from kallithea.lib.base import BaseController, render
from kallithea.lib.indexers import JOURNAL_SCHEMA
from kallithea.lib.page import Page
from kallithea.lib.utils2 import remove_prefix, remove_suffix, safe_int
from kallithea.model.db import UserLog
log = logging.getLogger(__name__)
def _journal_filter(user_log, search_term):
"""
Filters sqlalchemy user_log based on search_term with whoosh Query language
http://packages.python.org/Whoosh/querylang.html
:param user_log:
:param search_term:
"""
log.debug('Initial search term: %r', search_term)
qry = None
if search_term:
qp = QueryParser('repository', schema=JOURNAL_SCHEMA)
qp.add_plugin(DateParserPlugin())
qry = qp.parse(unicode(search_term))
log.debug('Filtering using parsed query %r', qry)
def wildcard_handler(col, wc_term):
if wc_term.startswith('*') and not wc_term.endswith('*'):
# postfix == endswith
wc_term = remove_prefix(wc_term, prefix='*')
return func.lower(col).endswith(func.lower(wc_term))
elif wc_term.startswith('*') and wc_term.endswith('*'):
# wildcard == ilike
wc_term = remove_prefix(wc_term, prefix='*')
wc_term = remove_suffix(wc_term, suffix='*')
return func.lower(col).contains(func.lower(wc_term))
def get_filterion(field, val, term):
if field == 'repository':
field = getattr(UserLog, 'repository_name')
elif field == 'ip':
field = getattr(UserLog, 'user_ip')
elif field == 'date':
field = getattr(UserLog, 'action_date')
elif field == 'username':
field = getattr(UserLog, 'username')
else:
field = getattr(UserLog, field)
log.debug('filter field: %s val=>%s', field, val)
# sql filtering
if isinstance(term, query.Wildcard):
return wildcard_handler(field, val)
elif isinstance(term, query.Prefix):
return func.lower(field).startswith(func.lower(val))
elif isinstance(term, query.DateRange):
return and_(field >= val[0], field <= val[1])
return func.lower(field) == func.lower(val)
if isinstance(qry, (query.And, query.Term, query.Prefix, query.Wildcard,
query.DateRange)):
if not isinstance(qry, query.And):
qry = [qry]
for term in qry:
field = term.fieldname
val = (term.text if not isinstance(term, query.DateRange)
else [term.startdate, term.enddate])
user_log = user_log.filter(get_filterion(field, val, term))
elif isinstance(qry, query.Or):
filters = []
for term in qry:
field = term.fieldname
val = (term.text if not isinstance(term, query.DateRange)
else [term.startdate, term.enddate])
filters.append(get_filterion(field, val, term))
user_log = user_log.filter(or_(*filters))
return user_log
class AdminController(BaseController):
@LoginRequired(allow_default_user=True)
def _before(self, *args, **kwargs):
super(AdminController, self)._before(*args, **kwargs)
@HasPermissionAnyDecorator('hg.admin')
def index(self):
users_log = UserLog.query() \
.options(joinedload(UserLog.user)) \
.options(joinedload(UserLog.repository))
# FILTERING
c.search_term = request.GET.get('filter')
users_log = _journal_filter(users_log, c.search_term)
users_log = users_log.order_by(UserLog.action_date.desc())
p = safe_int(request.GET.get('page'), 1)
c.users_log = Page(users_log, page=p, items_per_page=10,
filter=c.search_term)
if request.environ.get('HTTP_X_PARTIAL_XHR'):
return render('admin/admin_log.html')
return render('admin/admin.html')
|