Changeset - 3cab6bc45cc3
[Not reviewed]
stable
0 7 0
Mads Kiilerich - 6 years ago 2019-12-29 15:11:13
mads@kiilerich.com
Grafted from: 7e353be4c536
ssh: use fingerprint when deleting public keys

Avoid relying on a database index of the full public key string.
7 files changed with 13 insertions and 13 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/my_account.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.my_account
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
my account controller for Kallithea admin
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: August 20, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib import auth_modules
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import AuthUser, LoginRequired
 
from kallithea.lib.base import BaseController, IfSshEnabled, render
 
from kallithea.lib.utils2 import generate_api_key, safe_int
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model.db import Repository, User, UserEmailMap, UserFollowing
 
from kallithea.model.forms import PasswordChangeForm, UserForm
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.ssh_key import SshKeyModel, SshKeyModelException
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MyAccountController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('setting', 'settings', controller='admin/settings',
 
    #         path_prefix='/admin', name_prefix='admin_')
 

	
 
    @LoginRequired()
 
    def _before(self, *args, **kwargs):
 
        super(MyAccountController, self)._before(*args, **kwargs)
 

	
 
    def __load_data(self):
 
        c.user = User.get(request.authuser.user_id)
 
        if c.user.is_default_user:
 
            h.flash(_("You can't edit this user since it's"
 
                      " crucial for entire application"), category='warning')
 
            raise HTTPFound(location=url('users'))
 

	
 
    def _load_my_repos_data(self, watched=False):
 
        if watched:
 
            admin = False
 
            repos_list = Session().query(Repository) \
 
                         .join(UserFollowing) \
 
                         .filter(UserFollowing.user_id ==
 
                                 request.authuser.user_id).all()
 
        else:
 
            admin = True
 
            repos_list = Session().query(Repository) \
 
                         .filter(Repository.owner_id ==
 
                                 request.authuser.user_id).all()
 

	
 
        return RepoModel().get_repos_as_dict(repos_list, admin=admin)
 

	
 
    def my_account(self):
 
        c.active = 'profile'
 
        self.__load_data()
 
        c.perm_user = AuthUser(user_id=request.authuser.user_id)
 
        managed_fields = auth_modules.get_managed_fields(c.user)
 
        def_user_perms = AuthUser(dbuser=User.get_default_user()).permissions['global']
 
        if 'hg.register.none' in def_user_perms:
 
            managed_fields.extend(['username', 'firstname', 'lastname', 'email'])
 

	
 
        c.readonly = lambda n: 'readonly' if n in managed_fields else None
 

	
 
        defaults = c.user.get_dict()
 
        update = False
 
        if request.POST:
 
            _form = UserForm(edit=True,
 
                             old_data={'user_id': request.authuser.user_id,
 
                                       'email': request.authuser.email})()
 
            form_result = {}
 
            try:
 
                post_data = dict(request.POST)
 
                post_data['new_password'] = ''
 
                post_data['password_confirmation'] = ''
 
                form_result = _form.to_python(post_data)
 
                # skip updating those attrs for my account
 
                skip_attrs = ['admin', 'active', 'extern_type', 'extern_name',
 
                              'new_password', 'password_confirmation',
 
                             ] + managed_fields
 

	
 
                UserModel().update(request.authuser.user_id, form_result,
 
                                   skip_attrs=skip_attrs)
 
                h.flash(_('Your account was updated successfully'),
 
                        category='success')
 
                Session().commit()
 
                update = True
 

	
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('admin/my_account/my_account.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during update of user %s')
 
                        % form_result.get('username'), category='error')
 
        if update:
 
            raise HTTPFound(location='my_account')
 
        return htmlfill.render(
 
            render('admin/my_account/my_account.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def my_account_password(self):
 
        c.active = 'password'
 
        self.__load_data()
 

	
 
        managed_fields = auth_modules.get_managed_fields(c.user)
 
        c.can_change_password = 'password' not in managed_fields
 

	
 
        if request.POST and c.can_change_password:
 
            _form = PasswordChangeForm(request.authuser.username)()
 
            try:
 
                form_result = _form.to_python(request.POST)
 
                UserModel().update(request.authuser.user_id, form_result)
 
                Session().commit()
 
                h.flash(_("Successfully updated password"), category='success')
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('admin/my_account/my_account.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during update of user password'),
 
                        category='error')
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_repos(self):
 
        c.active = 'repos'
 
        self.__load_data()
 

	
 
        # data used to render the grid
 
        c.data = self._load_my_repos_data()
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_watched(self):
 
        c.active = 'watched'
 
        self.__load_data()
 

	
 
        # data used to render the grid
 
        c.data = self._load_my_repos_data(watched=True)
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_perms(self):
 
        c.active = 'perms'
 
        self.__load_data()
 
        c.perm_user = AuthUser(user_id=request.authuser.user_id)
 

	
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_emails(self):
 
        c.active = 'emails'
 
        self.__load_data()
 

	
 
        c.user_email_map = UserEmailMap.query() \
 
            .filter(UserEmailMap.user == c.user).all()
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_emails_add(self):
 
        email = request.POST.get('new_email')
 

	
 
        try:
 
            UserModel().add_extra_email(request.authuser.user_id, email)
 
            Session().commit()
 
            h.flash(_("Added email %s to user") % email, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['email']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during email saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('my_account_emails'))
 

	
 
    def my_account_emails_delete(self):
 
        email_id = request.POST.get('del_email_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_email(request.authuser.user_id, email_id)
 
        Session().commit()
 
        h.flash(_("Removed email from user"), category='success')
 
        raise HTTPFound(location=url('my_account_emails'))
 

	
 
    def my_account_api_keys(self):
 
        c.active = 'api_keys'
 
        self.__load_data()
 
        show_expired = True
 
        c.lifetime_values = [
 
            (str(-1), _('Forever')),
 
            (str(5), _('5 minutes')),
 
            (str(60), _('1 hour')),
 
            (str(60 * 24), _('1 day')),
 
            (str(60 * 24 * 30), _('1 month')),
 
        ]
 
        c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
 
        c.user_api_keys = ApiKeyModel().get_api_keys(request.authuser.user_id,
 
                                                     show_expired=show_expired)
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_api_keys_add(self):
 
        lifetime = safe_int(request.POST.get('lifetime'), -1)
 
        description = request.POST.get('description')
 
        ApiKeyModel().create(request.authuser.user_id, description, lifetime)
 
        Session().commit()
 
        h.flash(_("API key successfully created"), category='success')
 
        raise HTTPFound(location=url('my_account_api_keys'))
 

	
 
    def my_account_api_keys_delete(self):
 
        api_key = request.POST.get('del_api_key')
 
        if request.POST.get('del_api_key_builtin'):
 
            user = User.get(request.authuser.user_id)
 
            user.api_key = generate_api_key()
 
            Session().commit()
 
            h.flash(_("API key successfully reset"), category='success')
 
        elif api_key:
 
            ApiKeyModel().delete(api_key, request.authuser.user_id)
 
            Session().commit()
 
            h.flash(_("API key successfully deleted"), category='success')
 

	
 
        raise HTTPFound(location=url('my_account_api_keys'))
 

	
 
    @IfSshEnabled
 
    def my_account_ssh_keys(self):
 
        c.active = 'ssh_keys'
 
        self.__load_data()
 
        c.user_ssh_keys = SshKeyModel().get_ssh_keys(request.authuser.user_id)
 
        return render('admin/my_account/my_account.html')
 

	
 
    @IfSshEnabled
 
    def my_account_ssh_keys_add(self):
 
        description = request.POST.get('description')
 
        public_key = request.POST.get('public_key')
 
        try:
 
            new_ssh_key = SshKeyModel().create(request.authuser.user_id,
 
                                               description, public_key)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('my_account_ssh_keys'))
 

	
 
    @IfSshEnabled
 
    def my_account_ssh_keys_delete(self):
 
        public_key = request.POST.get('del_public_key')
 
        fingerprint = request.POST.get('del_public_key_fingerprint')
 
        try:
 
            SshKeyModel().delete(public_key, request.authuser.user_id)
 
            SshKeyModel().delete(fingerprint, request.authuser.user_id)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key successfully deleted"), category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('my_account_ssh_keys'))
kallithea/controllers/admin/users.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.users
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Users crud controller
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from sqlalchemy.sql.expression import func
 
from tg import app_globals, config, request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound, HTTPNotFound
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib import auth_modules
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyDecorator, LoginRequired
 
from kallithea.lib.base import BaseController, IfSshEnabled, render
 
from kallithea.lib.exceptions import DefaultUserException, UserCreationError, UserOwnsReposException
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import datetime_to_time, generate_api_key, safe_int
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm
 
from kallithea.model.forms import CustomDefaultPermissionsForm, UserForm
 
from kallithea.model.meta import Session
 
from kallithea.model.ssh_key import SshKeyModel, SshKeyModelException
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UsersController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 

	
 
    @LoginRequired()
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def _before(self, *args, **kwargs):
 
        super(UsersController, self)._before(*args, **kwargs)
 
        c.available_permissions = config['available_permissions']
 

	
 
    def index(self, format='html'):
 
        c.users_list = User.query().order_by(User.username) \
 
                        .filter_by(is_default_user=False) \
 
                        .order_by(func.lower(User.username)) \
 
                        .all()
 

	
 
        users_data = []
 
        total_records = len(c.users_list)
 
        _tmpl_lookup = app_globals.mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        grav_tmpl = '<div class="gravatar">%s</div>'
 

	
 
        username = lambda user_id, username: (
 
                template.get_def("user_name")
 
                .render(user_id, username, _=_, h=h, c=c))
 

	
 
        user_actions = lambda user_id, username: (
 
                template.get_def("user_actions")
 
                .render(user_id, username, _=_, h=h, c=c))
 

	
 
        for user in c.users_list:
 
            users_data.append({
 
                "gravatar": grav_tmpl % h.gravatar(user.email, size=20),
 
                "raw_name": user.username,
 
                "username": username(user.user_id, user.username),
 
                "firstname": h.escape(user.name),
 
                "lastname": h.escape(user.lastname),
 
                "last_login": h.fmt_date(user.last_login),
 
                "last_login_raw": datetime_to_time(user.last_login),
 
                "active": h.boolicon(user.active),
 
                "admin": h.boolicon(user.admin),
 
                "extern_type": user.extern_type,
 
                "extern_name": user.extern_name,
 
                "action": user_actions(user.user_id, user.username),
 
            })
 

	
 
        c.data = {
 
            "sort": None,
 
            "dir": "asc",
 
            "records": users_data
 
        }
 

	
 
        return render('admin/users/users.html')
 

	
 
    def create(self):
 
        c.default_extern_type = User.DEFAULT_AUTH_TYPE
 
        c.default_extern_name = ''
 
        user_model = UserModel()
 
        user_form = UserForm()()
 
        try:
 
            form_result = user_form.to_python(dict(request.POST))
 
            user = user_model.create(form_result)
 
            action_logger(request.authuser, 'admin_created_user:%s' % user.username,
 
                          None, request.ip_addr)
 
            h.flash(_('Created user %s') % user.username,
 
                    category='success')
 
            Session().commit()
 
        except formencode.Invalid as errors:
 
            return htmlfill.render(
 
                render('admin/users/user_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except UserCreationError as e:
 
            h.flash(e, 'error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during creation of user %s')
 
                    % request.POST.get('username'), category='error')
 
        raise HTTPFound(location=url('edit_user', id=user.user_id))
 

	
 
    def new(self, format='html'):
 
        c.default_extern_type = User.DEFAULT_AUTH_TYPE
 
        c.default_extern_name = ''
 
        return render('admin/users/user_add.html')
 

	
 
    def update(self, id):
 
        user_model = UserModel()
 
        user = user_model.get(id)
 
        _form = UserForm(edit=True, old_data={'user_id': id,
 
                                              'email': user.email})()
 
        form_result = {}
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
 
            skip_attrs = ['extern_type', 'extern_name',
 
                         ] + auth_modules.get_managed_fields(user)
 

	
 
            user_model.update(id, form_result, skip_attrs=skip_attrs)
 
            usr = form_result['username']
 
            action_logger(request.authuser, 'admin_updated_user:%s' % usr,
 
                          None, request.ip_addr)
 
            h.flash(_('User updated successfully'), category='success')
 
            Session().commit()
 
        except formencode.Invalid as errors:
 
            defaults = errors.value
 
            e = errors.error_dict or {}
 
            defaults.update({
 
                'create_repo_perm': user_model.has_perm(id,
 
                                                        'hg.create.repository'),
 
                'fork_repo_perm': user_model.has_perm(id, 'hg.fork.repository'),
 
            })
 
            return htmlfill.render(
 
                self._render_edit_profile(user),
 
                defaults=defaults,
 
                errors=e,
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during update of user %s')
 
                    % form_result.get('username'), category='error')
 
        raise HTTPFound(location=url('edit_user', id=id))
 

	
 
    def delete(self, id):
 
        usr = User.get_or_404(id)
 
        try:
 
            UserModel().delete(usr)
 
            Session().commit()
 
            h.flash(_('Successfully deleted user'), category='success')
 
        except (UserOwnsReposException, DefaultUserException) as e:
 
            h.flash(e, category='warning')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during deletion of user'),
 
                    category='error')
 
        raise HTTPFound(location=url('users'))
 

	
 
    def _get_user_or_raise_if_default(self, id):
 
        try:
 
            return User.get_or_404(id, allow_default=False)
 
        except DefaultUserException:
 
            h.flash(_("The default user cannot be edited"), category='warning')
 
            raise HTTPNotFound
 

	
 
    def _render_edit_profile(self, user):
 
        c.user = user
 
        c.active = 'profile'
 
        c.perm_user = AuthUser(dbuser=user)
 
        managed_fields = auth_modules.get_managed_fields(user)
 
        c.readonly = lambda n: 'readonly' if n in managed_fields else None
 
        return render('admin/users/user_edit.html')
 

	
 
    def edit(self, id, format='html'):
 
        user = self._get_user_or_raise_if_default(id)
 
        defaults = user.get_dict()
 

	
 
        return htmlfill.render(
 
            self._render_edit_profile(user),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def edit_advanced(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'advanced'
 
        c.perm_user = AuthUser(dbuser=c.user)
 

	
 
        umodel = UserModel()
 
        defaults = c.user.get_dict()
 
        defaults.update({
 
            'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
 
            'create_user_group_perm': umodel.has_perm(c.user,
 
                                                      'hg.usergroup.create.true'),
 
            'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
 
        })
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def edit_api_keys(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'api_keys'
 
        show_expired = True
 
        c.lifetime_values = [
 
            (str(-1), _('Forever')),
 
            (str(5), _('5 minutes')),
 
            (str(60), _('1 hour')),
 
            (str(60 * 24), _('1 day')),
 
            (str(60 * 24 * 30), _('1 month')),
 
        ]
 
        c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
 
        c.user_api_keys = ApiKeyModel().get_api_keys(c.user.user_id,
 
                                                     show_expired=show_expired)
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_api_key(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        lifetime = safe_int(request.POST.get('lifetime'), -1)
 
        description = request.POST.get('description')
 
        ApiKeyModel().create(c.user.user_id, description, lifetime)
 
        Session().commit()
 
        h.flash(_("API key successfully created"), category='success')
 
        raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id))
 

	
 
    def delete_api_key(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        api_key = request.POST.get('del_api_key')
 
        if request.POST.get('del_api_key_builtin'):
 
            c.user.api_key = generate_api_key()
 
            Session().commit()
 
            h.flash(_("API key successfully reset"), category='success')
 
        elif api_key:
 
            ApiKeyModel().delete(api_key, c.user.user_id)
 
            Session().commit()
 
            h.flash(_("API key successfully deleted"), category='success')
 

	
 
        raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id))
 

	
 
    def update_account(self, id):
 
        pass
 

	
 
    def edit_perms(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'perms'
 
        c.perm_user = AuthUser(dbuser=c.user)
 

	
 
        umodel = UserModel()
 
        defaults = c.user.get_dict()
 
        defaults.update({
 
            'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
 
            'create_user_group_perm': umodel.has_perm(c.user,
 
                                                      'hg.usergroup.create.true'),
 
            'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
 
        })
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def update_perms(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 

	
 
        try:
 
            form = CustomDefaultPermissionsForm()()
 
            form_result = form.to_python(request.POST)
 

	
 
            user_model = UserModel()
 

	
 
            defs = UserToPerm.query() \
 
                .filter(UserToPerm.user == user) \
 
                .all()
 
            for ug in defs:
 
                Session().delete(ug)
 

	
 
            if form_result['create_repo_perm']:
 
                user_model.grant_perm(id, 'hg.create.repository')
 
            else:
 
                user_model.grant_perm(id, 'hg.create.none')
 
            if form_result['create_user_group_perm']:
 
                user_model.grant_perm(id, 'hg.usergroup.create.true')
 
            else:
 
                user_model.grant_perm(id, 'hg.usergroup.create.false')
 
            if form_result['fork_repo_perm']:
 
                user_model.grant_perm(id, 'hg.fork.repository')
 
            else:
 
                user_model.grant_perm(id, 'hg.fork.none')
 
            h.flash(_("Updated permissions"), category='success')
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during permissions saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('edit_user_perms', id=id))
 

	
 
    def edit_emails(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'emails'
 
        c.user_email_map = UserEmailMap.query() \
 
            .filter(UserEmailMap.user == c.user).all()
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_email(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 
        email = request.POST.get('new_email')
 
        user_model = UserModel()
 

	
 
        try:
 
            user_model.add_extra_email(id, email)
 
            Session().commit()
 
            h.flash(_("Added email %s to user") % email, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['email']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during email saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def delete_email(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 
        email_id = request.POST.get('del_email_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_email(id, email_id)
 
        Session().commit()
 
        h.flash(_("Removed email from user"), category='success')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def edit_ips(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'ips'
 
        c.user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == c.user).all()
 

	
 
        c.default_user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == User.get_default_user()).all()
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_ip(self, id):
 
        ip = request.POST.get('new_ip')
 
        user_model = UserModel()
 

	
 
        try:
 
            user_model.add_extra_ip(id, ip)
 
            Session().commit()
 
            h.flash(_("Added IP address %s to user whitelist") % ip, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['ip']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred while adding IP address'),
 
                    category='error')
 

	
 
        if 'default_user' in request.POST:
 
            raise HTTPFound(location=url('admin_permissions_ips'))
 
        raise HTTPFound(location=url('edit_user_ips', id=id))
 

	
 
    def delete_ip(self, id):
 
        ip_id = request.POST.get('del_ip_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_ip(id, ip_id)
 
        Session().commit()
 
        h.flash(_("Removed IP address from user whitelist"), category='success')
 

	
 
        if 'default_user' in request.POST:
 
            raise HTTPFound(location=url('admin_permissions_ips'))
 
        raise HTTPFound(location=url('edit_user_ips', id=id))
 

	
 
    @IfSshEnabled
 
    def edit_ssh_keys(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'ssh_keys'
 
        c.user_ssh_keys = SshKeyModel().get_ssh_keys(c.user.user_id)
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @IfSshEnabled
 
    def ssh_keys_add(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        description = request.POST.get('description')
 
        public_key = request.POST.get('public_key')
 
        try:
 
            new_ssh_key = SshKeyModel().create(c.user.user_id,
 
                                               description, public_key)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
 

	
 
    @IfSshEnabled
 
    def ssh_keys_delete(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        public_key = request.POST.get('del_public_key')
 
        fingerprint = request.POST.get('del_public_key_fingerprint')
 
        try:
 
            SshKeyModel().delete(public_key, c.user.user_id)
 
            SshKeyModel().delete(fingerprint, c.user.user_id)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key successfully deleted"), category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
kallithea/model/ssh_key.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.ssh_key
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
SSH key model for Kallithea
 

	
 
"""
 

	
 
import errno
 
import logging
 
import os
 
import stat
 
import tempfile
 

	
 
from tg import config
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import ssh
 
from kallithea.lib.utils2 import safe_str, str2bool
 
from kallithea.model.db import User, UserSshKeys
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SshKeyModelException(Exception):
 
    """Exception raised by SshKeyModel methods to report errors"""
 

	
 

	
 
class SshKeyModel(object):
 

	
 
    def create(self, user, description, public_key):
 
        """
 
        :param user: user or user_id
 
        :param description: description of SshKey
 
        :param publickey: public key text
 
        Will raise SshKeyModelException on errors
 
        """
 
        try:
 
            keytype, pub, comment = ssh.parse_pub_key(public_key)
 
        except ssh.SshKeyParseError as e:
 
            raise SshKeyModelException(_('SSH key %r is invalid: %s') % (safe_str(public_key), e.message))
 
        if not description.strip():
 
            description = comment.strip()
 

	
 
        user = User.guess_instance(user)
 

	
 
        new_ssh_key = UserSshKeys()
 
        new_ssh_key.user_id = user.user_id
 
        new_ssh_key.description = description
 
        new_ssh_key.public_key = public_key
 

	
 
        for ssh_key in UserSshKeys.query().filter(UserSshKeys.fingerprint == new_ssh_key.fingerprint).all():
 
            raise SshKeyModelException(_('SSH key %s is already used by %s') %
 
                                       (new_ssh_key.fingerprint, ssh_key.user.username))
 

	
 
        Session().add(new_ssh_key)
 

	
 
        return new_ssh_key
 

	
 
    def delete(self, public_key, user=None):
 
    def delete(self, fingerprint, user=None):
 
        """
 
        Deletes given public_key, if user is set it also filters the object for
 
        deletion by given user.
 
        Deletes ssh key with given fingerprint. If user is set, it also filters
 
        the object for deletion by given user.
 
        Will raise SshKeyModelException on errors
 
        """
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys._public_key == public_key)
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.fingerprint == fingerprint)
 

	
 
        if user:
 
            user = User.guess_instance(user)
 
            ssh_key = ssh_key.filter(UserSshKeys.user_id == user.user_id)
 

	
 
        ssh_key = ssh_key.scalar()
 
        if ssh_key is None:
 
            raise SshKeyModelException(_('SSH key %r not found') % safe_str(public_key))
 
            raise SshKeyModelException(_('SSH key with fingerprint %r found') % safe_str(fingerprint))
 
        Session().delete(ssh_key)
 

	
 
    def get_ssh_keys(self, user):
 
        user = User.guess_instance(user)
 
        user_ssh_keys = UserSshKeys.query() \
 
            .filter(UserSshKeys.user_id == user.user_id).all()
 
        return user_ssh_keys
 

	
 
    def write_authorized_keys(self):
 
        if not str2bool(config.get('ssh_enabled', False)):
 
            log.error("Will not write SSH authorized_keys file - ssh_enabled is not configured")
 
            return
 
        authorized_keys = config.get('ssh_authorized_keys')
 
        kallithea_cli_path = config.get('kallithea_cli_path', 'kallithea-cli')
 
        if not authorized_keys:
 
            log.error('Cannot write SSH authorized_keys file - ssh_authorized_keys is not configured')
 
            return
 
        log.info('Writing %s', authorized_keys)
 

	
 
        authorized_keys_dir = os.path.dirname(authorized_keys)
 
        try:
 
            os.makedirs(authorized_keys_dir)
 
            os.chmod(authorized_keys_dir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) # ~/.ssh/ must be 0700
 
        except OSError as exception:
 
            if exception.errno != errno.EEXIST:
 
                raise
 
        # Now, test that the directory is or was created in a readable way by previous.
 
        if not (os.path.isdir(authorized_keys_dir) and
 
                os.access(authorized_keys_dir, os.W_OK)):
 
            raise Exception("Directory of authorized_keys cannot be written to so authorized_keys file %s cannot be written" % (authorized_keys))
 

	
 
        # Make sure we don't overwrite a key file with important content
 
        if os.path.exists(authorized_keys):
 
            with open(authorized_keys) as f:
 
                for l in f:
 
                    if not l.strip() or l.startswith('#'):
 
                        pass # accept empty lines and comments
 
                    elif ssh.SSH_OPTIONS in l and ' ssh-serve ' in l:
 
                        pass # Kallithea entries are ok to overwrite
 
                    else:
 
                        raise Exception("Safety check failed, found %r in %s - please review and remove it" % (l.strip(), authorized_keys))
 

	
 
        fh, tmp_authorized_keys = tempfile.mkstemp('.authorized_keys', dir=os.path.dirname(authorized_keys))
 
        with os.fdopen(fh, 'w') as f:
 
            for key in UserSshKeys.query().join(UserSshKeys.user).filter(User.active == True):
 
                f.write(ssh.authorized_keys_line(kallithea_cli_path, config['__file__'], key))
 
        os.chmod(tmp_authorized_keys, stat.S_IRUSR | stat.S_IWUSR)
 
        # This preliminary remove is needed for Windows, not for Unix.
 
        # TODO In Python 3, the remove+rename sequence below should become os.replace.
 
        if os.path.exists(authorized_keys):
 
            os.remove(authorized_keys)
 
        os.rename(tmp_authorized_keys, authorized_keys)
kallithea/templates/admin/my_account/my_account_ssh_keys.html
Show inline comments
 
<table class="table">
 
    %if c.user_ssh_keys:
 
        <tr>
 
            <th>${_('Fingerprint')}</th>
 
            <th>${_('Description')}</th>
 
            <th>${_('Last Used')}</th>
 
            <th>${_('Action')}</th>
 
        </tr>
 
        %for ssh_key in c.user_ssh_keys:
 
          <tr>
 
            <td>
 
                ${ssh_key.fingerprint}
 
            </td>
 
            <td>
 
                ${ssh_key.description}
 
            </td>
 
            <td>
 
              %if ssh_key.last_seen:
 
                ${h.fmt_date(ssh_key.last_seen)}
 
              %else:
 
                ${_('Never')}
 
              %endif
 
            </td>
 
            <td>
 
                ${h.form(url('my_account_ssh_keys_delete'))}
 
                    ${h.hidden('del_public_key', ssh_key.public_key)}
 
                    ${h.hidden('del_public_key_fingerprint', ssh_key.fingerprint)}
 
                    <button class="btn btn-danger btn-xs" type="submit"
 
                            onclick="return confirm('${_('Confirm to remove this SSH key: %s') % ssh_key.fingerprint}');">
 
                        <i class="icon-trashcan"></i>
 
                        ${_('Remove')}
 
                    </button>
 
                ${h.end_form()}
 
            </td>
 
          </tr>
 
        %endfor
 
    %else:
 
        <tr>
 
            <td>
 
                <div class="ip">${_('No SSH keys have been added')}</div>
 
            </td>
 
        </tr>
 
    %endif
 
</table>
 

	
 
<div>
 
    ${h.form(url('my_account_ssh_keys'))}
 
    <div class="form">
 
            <div class="form-group">
 
                <label class="control-label">${_('New SSH key')}</label>
 
            </div>
 
            <div class="form-group">
 
                <label class="control-label" for="public_key">${_('Public key')}:</label>
 
                <div>
 
                    ${h.textarea('public_key', '', class_='form-control', placeholder=_('Public key (contents of e.g. ~/.ssh/id_rsa.pub)'), cols=80, rows=5)}
 
                </div>
 
            </div>
 
            <div class="form-group">
 
                <label class="control-label" for="description">${_('Description')}:</label>
 
                <div>
 
                    ${h.text('description', class_='form-control', placeholder=_('Description'))}
 
                </div>
 
            </div>
 
            <div class="form-group">
 
                <div class="buttons">
 
                    ${h.submit('save', _('Add'), class_="btn btn-default")}
 
                    ${h.reset('reset', _('Reset'), class_="btn btn-default")}
 
                </div>
 
            </div>
 
    </div>
 
    ${h.end_form()}
 
</div>
kallithea/templates/admin/users/user_edit_ssh_keys.html
Show inline comments
 
<table class="table">
 
    %if c.user_ssh_keys:
 
        <tr>
 
            <th>${_('Fingerprint')}</th>
 
            <th>${_('Description')}</th>
 
            <th>${_('Last Used')}</th>
 
            <th>${_('Action')}</th>
 
        </tr>
 
        %for ssh_key in c.user_ssh_keys:
 
          <tr>
 
            <td>
 
                ${ssh_key.fingerprint}
 
            </td>
 
            <td>
 
                ${ssh_key.description}
 
            </td>
 
            <td>
 
              %if ssh_key.last_seen:
 
                ${h.fmt_date(ssh_key.last_seen)}
 
              %else:
 
                ${_('Never')}
 
              %endif
 
            </td>
 
            <td>
 
                ${h.form(url('edit_user_ssh_keys_delete', id=c.user.user_id))}
 
                    ${h.hidden('del_public_key', ssh_key.public_key)}
 
                    ${h.hidden('del_public_key_fingerprint', ssh_key.fingerprint)}
 
                    <button class="btn btn-danger btn-xs" type="submit"
 
                            onclick="return confirm('${_('Confirm to remove this SSH key: %s') % ssh_key.fingerprint}');">
 
                        <i class="icon-trashcan"></i>
 
                        ${_('Remove')}
 
                    </button>
 
                ${h.end_form()}
 
            </td>
 
          </tr>
 
        %endfor
 
    %else:
 
        <tr>
 
            <td>
 
                <div class="ip">${_('No SSH keys have been added')}</div>
 
            </td>
 
        </tr>
 
    %endif
 
</table>
 

	
 
<div>
 
    ${h.form(url('edit_user_ssh_keys', id=c.user.user_id))}
 
    <div class="form">
 
            <div class="form-group">
 
                <label class="control-label">${_('New SSH key')}</label>
 
            </div>
 
            <div class="form-group">
 
                <label class="control-label" for="public_key">${_('Public key')}:</label>
 
                <div>
 
                    ${h.textarea('public_key', '', class_='form-control', placeholder=_('Public key (contents of e.g. ~/.ssh/id_rsa.pub)'), cols=80, rows=5)}
 
                </div>
 
            </div>
 
            <div class="form-group">
 
                <label class="control-label" for="description">${_('Description')}:</label>
 
                <div>
 
                    ${h.text('description', class_='form-control', placeholder=_('Description'))}
 
                </div>
 
            </div>
 
            <div class="form-group">
 
                <div class="buttons">
 
                    ${h.submit('save', _('Add'), class_="btn btn-default")}
 
                    ${h.reset('reset', _('Reset'), class_="btn btn-default")}
 
                </div>
 
            </div>
 
    </div>
 
    ${h.end_form()}
 
</div>
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import pytest
 
from sqlalchemy.orm.exc import NoResultFound
 
from tg.util.webtest import test_context
 
from webob.exc import HTTPNotFound
 

	
 
from kallithea.controllers.admin.users import UsersController
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import check_password
 
from kallithea.model import validators
 
from kallithea.model.db import Permission, RepoGroup, User, UserApiKeys, UserSshKeys
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
@pytest.fixture
 
def user_and_repo_group_fail():
 
    username = 'repogrouperr'
 
    groupname = u'repogroup_fail'
 
    user = fixture.create_user(name=username)
 
    repo_group = fixture.create_repo_group(name=groupname, cur_user=username)
 
    yield user, repo_group
 
    # cleanup
 
    if RepoGroup.get_by_group_name(groupname):
 
        fixture.destroy_repo_group(repo_group)
 

	
 

	
 
class TestAdminUsersController(TestController):
 
    test_user_1 = 'testme'
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        if User.get_by_username(cls.test_user_1):
 
            UserModel().delete(cls.test_user_1)
 
            Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('users'))
 
        # TODO: Test response...
 

	
 
    def test_create(self):
 
        self.log_user()
 
        username = 'newtestuser'
 
        password = 'test12'
 
        password_confirmation = password
 
        name = u'name'
 
        lastname = u'lastname'
 
        email = 'mail@example.com'
 

	
 
        response = self.app.post(url('new_user'),
 
            {'username': username,
 
             'password': password,
 
             'password_confirmation': password_confirmation,
 
             'firstname': name,
 
             'active': True,
 
             'lastname': lastname,
 
             'extern_name': 'internal',
 
             'extern_type': 'internal',
 
             'email': email,
 
             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        # 302 Found
 
        # The resource was found at http://localhost/_admin/users/5/edit; you should be redirected automatically.
 

	
 
        self.checkSessionFlash(response, '''Created user %s''' % username)
 

	
 
        response = response.follow()
 
        response.mustcontain("""%s user settings""" % username) # in <title>
 

	
 
        new_user = Session().query(User). \
 
            filter(User.username == username).one()
 

	
 
        assert new_user.username == username
 
        assert check_password(password, new_user.password) == True
 
        assert new_user.name == name
 
        assert new_user.lastname == lastname
 
        assert new_user.email == email
 

	
 
    def test_create_err(self):
 
        self.log_user()
 
        username = 'new_user'
 
        password = ''
 
        name = u'name'
 
        lastname = u'lastname'
 
        email = 'errmail.example.com'
 

	
 
        response = self.app.post(url('new_user'),
 
            {'username': username,
 
             'password': password,
 
             'name': name,
 
             'active': False,
 
             'lastname': lastname,
 
             'email': email,
 
             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        with test_context(self.app):
 
            msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
 
        msg = h.html_escape(msg % {'username': 'new_user'})
 
        response.mustcontain("""<span class="error-message">%s</span>""" % msg)
 
        response.mustcontain("""<span class="error-message">Please enter a value</span>""")
 
        response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
 

	
 
        def get_user():
 
            Session().query(User).filter(User.username == username).one()
 

	
 
        with pytest.raises(NoResultFound):
 
            get_user(), 'found user in database'
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_user'))
 

	
 
    @parametrize('name,attrs',
 
        [('firstname', {'firstname': 'new_username'}),
 
         ('lastname', {'lastname': 'new_username'}),
 
         ('admin', {'admin': True}),
 
         ('admin', {'admin': False}),
 
         ('extern_type', {'extern_type': 'ldap'}),
 
         ('extern_type', {'extern_type': None}),
 
         ('extern_name', {'extern_name': 'test'}),
 
         ('extern_name', {'extern_name': None}),
 
         ('active', {'active': False}),
 
         ('active', {'active': True}),
 
         ('email', {'email': 'someemail@example.com'}),
 
        # ('new_password', {'new_password': 'foobar123',
 
        #                   'password_confirmation': 'foobar123'})
 
        ])
 
    def test_update(self, name, attrs):
 
        self.log_user()
 
        usr = fixture.create_user(self.test_user_1, password='qweqwe',
 
                                  email='testme@example.com',
 
                                  extern_type='internal',
 
                                  extern_name=self.test_user_1,
 
                                  skip_if_exists=True)
 
        Session().commit()
 
        params = usr.get_api_data(True)
 
        params.update({'password_confirmation': ''})
 
        params.update({'new_password': ''})
 
        params.update(attrs)
 
        if name == 'email':
 
            params['emails'] = [attrs['email']]
 
        if name == 'extern_type':
 
            # cannot update this via form, expected value is original one
 
            params['extern_type'] = "internal"
 
        if name == 'extern_name':
 
            # cannot update this via form, expected value is original one
 
            params['extern_name'] = self.test_user_1
 
            # special case since this user is not logged in yet his data is
 
            # not filled so we use creation data
 

	
 
        params.update({'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        response = self.app.post(url('update_user', id=usr.user_id), params)
 
        self.checkSessionFlash(response, 'User updated successfully')
 
        params.pop('_session_csrf_secret_token')
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
        updated_params.update({'new_password': ''})
 

	
 
        assert params == updated_params
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        username = 'newtestuserdeleteme'
 

	
 
        fixture.create_user(name=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_repo_err(self):
 
        self.log_user()
 
        username = 'repoerr'
 
        reponame = u'repoerr_fail'
 

	
 
        fixture.create_user(name=username)
 
        fixture.create_repo(name=reponame, cur_user=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'User "%s" still '
 
                               'owns 1 repositories and cannot be removed. '
 
                               'Switch owners or remove those repositories: '
 
                               '%s' % (username, reponame))
 

	
 
        response = self.app.post(url('delete_repo', repo_name=reponame),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Deleted repository %s' % reponame)
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_repo_group_err(self, user_and_repo_group_fail):
 
        new_user, repo_group = user_and_repo_group_fail
 
        username = new_user.username
 
        groupname = repo_group.group_name
 

	
 
        self.log_user()
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'User "%s" still '
 
                               'owns 1 repository groups and cannot be removed. '
 
                               'Switch owners or remove those repository groups: '
 
                               '%s' % (username, groupname))
 

	
 
        # Relevant _if_ the user deletion succeeded to make sure we can render groups without owner
 
        # rg = RepoGroup.get_by_group_name(group_name=groupname)
 
        # response = self.app.get(url('repos_groups', id=rg.group_id))
 

	
 
        response = self.app.post(url('delete_repo_group', group_name=groupname),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Removed repository group %s' % groupname)
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_user_group_err(self):
 
        self.log_user()
 
        username = 'usergrouperr'
 
        groupname = u'usergroup_fail'
 

	
 
        fixture.create_user(name=username)
 
        ug = fixture.create_user_group(name=groupname, cur_user=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'User "%s" still '
 
                               'owns 1 user groups and cannot be removed. '
 
                               'Switch owners or remove those user groups: '
 
                               '%s' % (username, groupname))
 

	
 
        # TODO: why do this fail?
 
        #response = self.app.delete(url('delete_users_group', id=groupname))
 
        #self.checkSessionFlash(response, 'Removed user group %s' % groupname)
 

	
 
        fixture.destroy_user_group(ug.users_group_id)
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_edit(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        response = self.app.get(url('edit_user', id=user.user_id))
 

	
 
    def test_add_perm_create_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.create.none')
 
        perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_create) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
                                     params=dict(create_repo_perm=True,
 
                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(uid, perm_none) == False
 
            assert UserModel().has_perm(uid, perm_create) == True
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_revoke_perm_create_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.create.none')
 
        perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_create) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
                                     params=dict(_session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(uid, perm_none) == True
 
            assert UserModel().has_perm(uid, perm_create) == False
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_add_perm_fork_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.fork.none')
 
        perm_fork = Permission.get_by_key('hg.fork.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_fork) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
                                     params=dict(create_repo_perm=True,
 
                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(uid, perm_none) == False
 
            assert UserModel().has_perm(uid, perm_create) == True
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_revoke_perm_fork_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.fork.none')
 
        perm_fork = Permission.get_by_key('hg.fork.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_fork) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
                                     params=dict(_session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(uid, perm_none) == True
 
            assert UserModel().has_perm(uid, perm_create) == False
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_ips(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(url('edit_user_ips', id=user.user_id))
 
        response.mustcontain('All IP addresses are allowed')
 

	
 
    @parametrize('test_name,ip,ip_range,failure', [
 
        ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
 
        ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
 
        ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
 
        ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
 
        ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
 
        ('127_bad_ip', 'foobar', 'foobar', True),
 
    ])
 
    def test_add_ip(self, test_name, ip, ip_range, failure, auto_clear_ip_permissions):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_ips_update', id=user_id),
 
                                 params=dict(new_ip=ip, _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        if failure:
 
            self.checkSessionFlash(response, 'Please enter a valid IPv4 or IPv6 address')
 
            response = self.app.get(url('edit_user_ips', id=user_id))
 
            response.mustcontain(no=[ip])
 
            response.mustcontain(no=[ip_range])
 

	
 
        else:
 
            response = self.app.get(url('edit_user_ips', id=user_id))
 
            response.mustcontain(ip)
 
            response.mustcontain(ip_range)
 

	
 
    def test_delete_ip(self, auto_clear_ip_permissions):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        ip = '127.0.0.1/32'
 
        ip_range = '127.0.0.1 - 127.0.0.1'
 
        with test_context(self.app):
 
            new_ip = UserModel().add_extra_ip(user_id, ip)
 
            Session().commit()
 
        new_ip_id = new_ip.ip_id
 

	
 
        response = self.app.get(url('edit_user_ips', id=user_id))
 
        response.mustcontain(ip)
 
        response.mustcontain(ip_range)
 

	
 
        self.app.post(url('edit_user_ips_delete', id=user_id),
 
                      params=dict(del_ip_id=new_ip_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        response = self.app.get(url('edit_user_ips', id=user_id))
 
        response.mustcontain('All IP addresses are allowed')
 
        response.mustcontain(no=[ip])
 
        response.mustcontain(no=[ip_range])
 

	
 
    def test_api_keys(self):
 
        self.log_user()
 

	
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(url('edit_user_api_keys', id=user.user_id))
 
        response.mustcontain(user.api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
    @parametrize('desc,lifetime', [
 
        ('forever', -1),
 
        ('5mins', 60*5),
 
        ('30days', 60*60*24*30),
 
    ])
 
    def test_add_api_keys(self, desc, lifetime):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_api_keys_update', id=user_id),
 
                 {'description': desc, 'lifetime': lifetime, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        try:
 
            response = response.follow()
 
            user = User.get(user_id)
 
            for api_key in user.api_keys:
 
                response.mustcontain(api_key)
 
        finally:
 
            for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all():
 
                Session().delete(api_key)
 
                Session().commit()
 

	
 
    def test_remove_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_api_keys_update', id=user_id),
 
                {'description': 'desc', 'lifetime': -1, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        # now delete our key
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        assert 1 == len(keys)
 

	
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
 
                 {'del_api_key': keys[0].api_key, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        assert 0 == len(keys)
 

	
 
    def test_reset_main_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        api_key = user.api_key
 
        response = self.app.get(url('edit_user_api_keys', id=user_id))
 
        response.mustcontain(api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
 
                 {'del_api_key_builtin': api_key, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully reset')
 
        response = response.follow()
 
        response.mustcontain(no=[api_key])
 

	
 
    def test_add_ssh_key(self):
 
        description = u'something'
 
        public_key = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== me@localhost'
 
        fingerprint = u'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8'
 

	
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_ssh_keys', id=user_id),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 

	
 
        response = response.follow()
 
        response.mustcontain(fingerprint)
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.user_id == user_id).one()
 
        assert ssh_key.fingerprint == fingerprint
 
        assert ssh_key.description == description
 
        Session().delete(ssh_key)
 
        Session().commit()
 

	
 
    def test_remove_ssh_key(self):
 
        description = u''
 
        public_key = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== me@localhost'
 
        fingerprint = u'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8'
 

	
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_ssh_keys', id=user_id),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 
        response.follow()
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.user_id == user_id).one()
 
        assert ssh_key.description == u'me@localhost'
 

	
 
        response = self.app.post(url('edit_user_ssh_keys_delete', id=user_id),
 
                                 {'del_public_key': ssh_key.public_key,
 
                                 {'del_public_key_fingerprint': ssh_key.fingerprint,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key successfully deleted')
 
        keys = UserSshKeys.query().all()
 
        assert 0 == len(keys)
 

	
 

	
 
class TestAdminUsersController_unittest(TestController):
 
    """ Unit tests for the users controller """
 

	
 
    def test_get_user_or_raise_if_default(self, monkeypatch, test_context_fixture):
 
        # flash complains about an non-existing session
 
        def flash_mock(*args, **kwargs):
 
            pass
 
        monkeypatch.setattr(h, 'flash', flash_mock)
 

	
 
        u = UsersController()
 
        # a regular user should work correctly
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        assert u._get_user_or_raise_if_default(user.user_id) == user
 
        # the default user should raise
 
        with pytest.raises(HTTPNotFound):
 
            u._get_user_or_raise_if_default(User.get_default_user().user_id)
 

	
 

	
 
class TestAdminUsersControllerForDefaultUser(TestController):
 
    """
 
    Edit actions on the default user are not allowed.
 
    Validate that they throw a 404 exception.
 
    """
 
    def test_edit_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user', id=user.user_id), status=404)
 

	
 
    def test_edit_advanced_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
 

	
 
    # API keys
 
    def test_edit_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
 

	
 
    def test_add_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys_update', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    def test_delete_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    # Permissions
 
    def test_edit_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
 

	
 
    def test_update_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_perms_update', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    # Emails
 
    def test_edit_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_emails', id=user.user_id), status=404)
 

	
 
    def test_add_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_emails_update', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    def test_delete_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_emails_delete', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    # IP addresses
 
    # Add/delete of IP addresses for the default user is used to maintain
 
    # the global IP whitelist and thus allowed. Only 'edit' is forbidden.
 
    def test_edit_ip_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)
kallithea/tests/functional/test_my_account.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
from tg.util.webtest import test_context
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.model.db import Repository, User, UserApiKeys, UserFollowing, UserSshKeys
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestMyAccountController(TestController):
 
    test_user_1 = 'testme'
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        if User.get_by_username(cls.test_user_1):
 
            UserModel().delete(cls.test_user_1)
 
            Session().commit()
 

	
 
    def test_my_account(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account'))
 

	
 
        response.mustcontain('value="%s' % TEST_USER_ADMIN_LOGIN)
 

	
 
    def test_my_account_my_repos(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_repos'))
 
        cnt = Repository.query().filter(Repository.owner ==
 
                           User.get_by_username(TEST_USER_ADMIN_LOGIN)).count()
 
        response.mustcontain('"raw_name": "%s"' % HG_REPO)
 
        response.mustcontain('"just_name": "%s"' % GIT_REPO)
 

	
 
    def test_my_account_my_watched(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_watched'))
 

	
 
        cnt = UserFollowing.query().filter(UserFollowing.user ==
 
                            User.get_by_username(TEST_USER_ADMIN_LOGIN)).count()
 
        response.mustcontain('"raw_name": "%s"' % HG_REPO)
 
        response.mustcontain('"just_name": "%s"' % GIT_REPO)
 

	
 
    def test_my_account_my_emails(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 

	
 
    def test_my_account_my_emails_add_existing_email(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 
        response = self.app.post(url('my_account_emails'),
 
                                 {'new_email': TEST_USER_REGULAR_EMAIL, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'This email address is already in use')
 

	
 
    def test_my_account_my_emails_add_missing_email_in_form(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 
        response = self.app.post(url('my_account_emails'),
 
            {'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Please enter an email address')
 

	
 
    def test_my_account_my_emails_add_remove(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 

	
 
        response = self.app.post(url('my_account_emails'),
 
                                 {'new_email': 'barz@example.com', '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response = self.app.get(url('my_account_emails'))
 

	
 
        from kallithea.model.db import UserEmailMap
 
        email_id = UserEmailMap.query() \
 
            .filter(UserEmailMap.user == User.get_by_username(TEST_USER_ADMIN_LOGIN)) \
 
            .filter(UserEmailMap.email == 'barz@example.com').one().email_id
 

	
 
        response.mustcontain('barz@example.com')
 
        response.mustcontain('<input id="del_email_id" name="del_email_id" type="hidden" value="%s" />' % email_id)
 

	
 
        response = self.app.post(url('my_account_emails_delete'),
 
                                 {'del_email_id': email_id, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Removed email from user')
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 

	
 

	
 
    @parametrize('name,attrs',
 
        [('firstname', {'firstname': 'new_username'}),
 
         ('lastname', {'lastname': 'new_username'}),
 
         ('admin', {'admin': True}),
 
         ('admin', {'admin': False}),
 
         ('extern_type', {'extern_type': 'ldap'}),
 
         ('extern_type', {'extern_type': None}),
 
         #('extern_name', {'extern_name': 'test'}),
 
         #('extern_name', {'extern_name': None}),
 
         ('active', {'active': False}),
 
         ('active', {'active': True}),
 
         ('email', {'email': 'someemail@example.com'}),
 
        # ('new_password', {'new_password': 'foobar123',
 
        #                   'password_confirmation': 'foobar123'})
 
        ])
 
    def test_my_account_update(self, name, attrs):
 
        usr = fixture.create_user(self.test_user_1, password='qweqwe',
 
                                  email='testme@example.com',
 
                                  extern_type='internal',
 
                                  extern_name=self.test_user_1,
 
                                  skip_if_exists=True)
 
        params = usr.get_api_data(True)  # current user data
 
        user_id = usr.user_id
 
        self.log_user(username=self.test_user_1, password='qweqwe')
 

	
 
        params.update({'password_confirmation': ''})
 
        params.update({'new_password': ''})
 
        params.update({'extern_type': 'internal'})
 
        params.update({'extern_name': self.test_user_1})
 
        params.update({'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        params.update(attrs)
 
        response = self.app.post(url('my_account'), params)
 

	
 
        self.checkSessionFlash(response,
 
                               'Your account was updated successfully')
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
        updated_params.update({'new_password': ''})
 

	
 
        params['last_login'] = updated_params['last_login']
 
        if name == 'email':
 
            params['emails'] = [attrs['email']]
 
        if name == 'extern_type':
 
            # cannot update this via form, expected value is original one
 
            params['extern_type'] = "internal"
 
        if name == 'extern_name':
 
            # cannot update this via form, expected value is original one
 
            params['extern_name'] = str(user_id)
 
        if name == 'active':
 
            # my account cannot deactivate account
 
            params['active'] = True
 
        if name == 'admin':
 
            # my account cannot make you an admin !
 
            params['admin'] = False
 

	
 
        params.pop('_session_csrf_secret_token')
 
        assert params == updated_params
 

	
 
    def test_my_account_update_err_email_exists(self):
 
        self.log_user()
 

	
 
        new_email = TEST_USER_REGULAR_EMAIL  # already existing email
 
        response = self.app.post(url('my_account'),
 
                                params=dict(
 
                                    username=TEST_USER_ADMIN_LOGIN,
 
                                    new_password=TEST_USER_ADMIN_PASS,
 
                                    password_confirmation='test122',
 
                                    firstname=u'NewName',
 
                                    lastname=u'NewLastname',
 
                                    email=new_email,
 
                                    _session_csrf_secret_token=self.session_csrf_secret_token())
 
                                )
 

	
 
        response.mustcontain('This email address is already in use')
 

	
 
    def test_my_account_update_err(self):
 
        self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 

	
 
        new_email = 'newmail.pl'
 
        response = self.app.post(url('my_account'),
 
                                 params=dict(
 
                                            username=TEST_USER_ADMIN_LOGIN,
 
                                            new_password=TEST_USER_ADMIN_PASS,
 
                                            password_confirmation='test122',
 
                                            firstname=u'NewName',
 
                                            lastname=u'NewLastname',
 
                                            email=new_email,
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        from kallithea.model import validators
 
        with test_context(self.app):
 
            msg = validators.ValidUsername(edit=False, old_data={}) \
 
                    ._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': TEST_USER_ADMIN_LOGIN})
 
        response.mustcontain(msg)
 

	
 
    def test_my_account_api_keys(self):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        response = self.app.get(url('my_account_api_keys'))
 
        response.mustcontain(user.api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
    @parametrize('desc,lifetime', [
 
        ('forever', -1),
 
        ('5mins', 60*5),
 
        ('30days', 60*60*24*30),
 
    ])
 
    def test_my_account_add_api_keys(self, desc, lifetime):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        response = self.app.post(url('my_account_api_keys'),
 
                                 {'description': desc, 'lifetime': lifetime, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        try:
 
            response = response.follow()
 
            user = User.get(usr['user_id'])
 
            for api_key in user.api_keys:
 
                response.mustcontain(api_key)
 
        finally:
 
            for api_key in UserApiKeys.query().all():
 
                Session().delete(api_key)
 
                Session().commit()
 

	
 
    def test_my_account_remove_api_key(self):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        response = self.app.post(url('my_account_api_keys'),
 
                                 {'description': 'desc', 'lifetime': -1, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        # now delete our key
 
        keys = UserApiKeys.query().all()
 
        assert 1 == len(keys)
 

	
 
        response = self.app.post(url('my_account_api_keys_delete'),
 
                 {'del_api_key': keys[0].api_key, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().all()
 
        assert 0 == len(keys)
 

	
 
    def test_my_account_reset_main_api_key(self):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        api_key = user.api_key
 
        response = self.app.get(url('my_account_api_keys'))
 
        response.mustcontain(api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
        response = self.app.post(url('my_account_api_keys_delete'),
 
                 {'del_api_key_builtin': api_key, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully reset')
 
        response = response.follow()
 
        response.mustcontain(no=[api_key])
 

	
 
    def test_my_account_add_ssh_key(self):
 
        description = u'something'
 
        public_key = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== me@localhost'
 
        fingerprint = u'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8'
 

	
 
        self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        response = self.app.post(url('my_account_ssh_keys'),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 

	
 
        response = response.follow()
 
        response.mustcontain(fingerprint)
 
        user_id = response.session['authuser']['user_id']
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.user_id == user_id).one()
 
        assert ssh_key.fingerprint == fingerprint
 
        assert ssh_key.description == description
 
        Session().delete(ssh_key)
 
        Session().commit()
 

	
 
    def test_my_account_remove_ssh_key(self):
 
        description = u''
 
        public_key = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== me@localhost'
 
        fingerprint = u'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8'
 

	
 
        self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        response = self.app.post(url('my_account_ssh_keys'),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 
        response.follow()
 
        user_id = response.session['authuser']['user_id']
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.user_id == user_id).one()
 
        assert ssh_key.description == u'me@localhost'
 

	
 
        response = self.app.post(url('my_account_ssh_keys_delete'),
 
                                 {'del_public_key': ssh_key.public_key,
 
                                 {'del_public_key_fingerprint': ssh_key.fingerprint,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key successfully deleted')
 
        keys = UserSshKeys.query().all()
 
        assert 0 == len(keys)
0 comments (0 inline, 0 general)