Changeset - 3cab6bc45cc3
[Not reviewed]
stable
0 7 0
Mads Kiilerich - 6 years ago 2019-12-29 15:11:13
mads@kiilerich.com
Grafted from: 7e353be4c536
ssh: use fingerprint when deleting public keys

Avoid relying on a database index of the full public key string.
7 files changed with 13 insertions and 13 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/my_account.py
Show inline comments
 
@@ -192,105 +192,105 @@ class MyAccountController(BaseController
 

	
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_emails(self):
 
        c.active = 'emails'
 
        self.__load_data()
 

	
 
        c.user_email_map = UserEmailMap.query() \
 
            .filter(UserEmailMap.user == c.user).all()
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_emails_add(self):
 
        email = request.POST.get('new_email')
 

	
 
        try:
 
            UserModel().add_extra_email(request.authuser.user_id, email)
 
            Session().commit()
 
            h.flash(_("Added email %s to user") % email, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['email']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during email saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('my_account_emails'))
 

	
 
    def my_account_emails_delete(self):
 
        email_id = request.POST.get('del_email_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_email(request.authuser.user_id, email_id)
 
        Session().commit()
 
        h.flash(_("Removed email from user"), category='success')
 
        raise HTTPFound(location=url('my_account_emails'))
 

	
 
    def my_account_api_keys(self):
 
        c.active = 'api_keys'
 
        self.__load_data()
 
        show_expired = True
 
        c.lifetime_values = [
 
            (str(-1), _('Forever')),
 
            (str(5), _('5 minutes')),
 
            (str(60), _('1 hour')),
 
            (str(60 * 24), _('1 day')),
 
            (str(60 * 24 * 30), _('1 month')),
 
        ]
 
        c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
 
        c.user_api_keys = ApiKeyModel().get_api_keys(request.authuser.user_id,
 
                                                     show_expired=show_expired)
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_api_keys_add(self):
 
        lifetime = safe_int(request.POST.get('lifetime'), -1)
 
        description = request.POST.get('description')
 
        ApiKeyModel().create(request.authuser.user_id, description, lifetime)
 
        Session().commit()
 
        h.flash(_("API key successfully created"), category='success')
 
        raise HTTPFound(location=url('my_account_api_keys'))
 

	
 
    def my_account_api_keys_delete(self):
 
        api_key = request.POST.get('del_api_key')
 
        if request.POST.get('del_api_key_builtin'):
 
            user = User.get(request.authuser.user_id)
 
            user.api_key = generate_api_key()
 
            Session().commit()
 
            h.flash(_("API key successfully reset"), category='success')
 
        elif api_key:
 
            ApiKeyModel().delete(api_key, request.authuser.user_id)
 
            Session().commit()
 
            h.flash(_("API key successfully deleted"), category='success')
 

	
 
        raise HTTPFound(location=url('my_account_api_keys'))
 

	
 
    @IfSshEnabled
 
    def my_account_ssh_keys(self):
 
        c.active = 'ssh_keys'
 
        self.__load_data()
 
        c.user_ssh_keys = SshKeyModel().get_ssh_keys(request.authuser.user_id)
 
        return render('admin/my_account/my_account.html')
 

	
 
    @IfSshEnabled
 
    def my_account_ssh_keys_add(self):
 
        description = request.POST.get('description')
 
        public_key = request.POST.get('public_key')
 
        try:
 
            new_ssh_key = SshKeyModel().create(request.authuser.user_id,
 
                                               description, public_key)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('my_account_ssh_keys'))
 

	
 
    @IfSshEnabled
 
    def my_account_ssh_keys_delete(self):
 
        public_key = request.POST.get('del_public_key')
 
        fingerprint = request.POST.get('del_public_key_fingerprint')
 
        try:
 
            SshKeyModel().delete(public_key, request.authuser.user_id)
 
            SshKeyModel().delete(fingerprint, request.authuser.user_id)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key successfully deleted"), category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('my_account_ssh_keys'))
kallithea/controllers/admin/users.py
Show inline comments
 
@@ -369,105 +369,105 @@ class UsersController(BaseController):
 
            msg = error.error_dict['email']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during email saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def delete_email(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 
        email_id = request.POST.get('del_email_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_email(id, email_id)
 
        Session().commit()
 
        h.flash(_("Removed email from user"), category='success')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def edit_ips(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'ips'
 
        c.user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == c.user).all()
 

	
 
        c.default_user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == User.get_default_user()).all()
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_ip(self, id):
 
        ip = request.POST.get('new_ip')
 
        user_model = UserModel()
 

	
 
        try:
 
            user_model.add_extra_ip(id, ip)
 
            Session().commit()
 
            h.flash(_("Added IP address %s to user whitelist") % ip, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['ip']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred while adding IP address'),
 
                    category='error')
 

	
 
        if 'default_user' in request.POST:
 
            raise HTTPFound(location=url('admin_permissions_ips'))
 
        raise HTTPFound(location=url('edit_user_ips', id=id))
 

	
 
    def delete_ip(self, id):
 
        ip_id = request.POST.get('del_ip_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_ip(id, ip_id)
 
        Session().commit()
 
        h.flash(_("Removed IP address from user whitelist"), category='success')
 

	
 
        if 'default_user' in request.POST:
 
            raise HTTPFound(location=url('admin_permissions_ips'))
 
        raise HTTPFound(location=url('edit_user_ips', id=id))
 

	
 
    @IfSshEnabled
 
    def edit_ssh_keys(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'ssh_keys'
 
        c.user_ssh_keys = SshKeyModel().get_ssh_keys(c.user.user_id)
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @IfSshEnabled
 
    def ssh_keys_add(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        description = request.POST.get('description')
 
        public_key = request.POST.get('public_key')
 
        try:
 
            new_ssh_key = SshKeyModel().create(c.user.user_id,
 
                                               description, public_key)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
 

	
 
    @IfSshEnabled
 
    def ssh_keys_delete(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        public_key = request.POST.get('del_public_key')
 
        fingerprint = request.POST.get('del_public_key_fingerprint')
 
        try:
 
            SshKeyModel().delete(public_key, c.user.user_id)
 
            SshKeyModel().delete(fingerprint, c.user.user_id)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key successfully deleted"), category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
kallithea/model/ssh_key.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.ssh_key
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
SSH key model for Kallithea
 

	
 
"""
 

	
 
import errno
 
import logging
 
import os
 
import stat
 
import tempfile
 

	
 
from tg import config
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import ssh
 
from kallithea.lib.utils2 import safe_str, str2bool
 
from kallithea.model.db import User, UserSshKeys
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SshKeyModelException(Exception):
 
    """Exception raised by SshKeyModel methods to report errors"""
 

	
 

	
 
class SshKeyModel(object):
 

	
 
    def create(self, user, description, public_key):
 
        """
 
        :param user: user or user_id
 
        :param description: description of SshKey
 
        :param publickey: public key text
 
        Will raise SshKeyModelException on errors
 
        """
 
        try:
 
            keytype, pub, comment = ssh.parse_pub_key(public_key)
 
        except ssh.SshKeyParseError as e:
 
            raise SshKeyModelException(_('SSH key %r is invalid: %s') % (safe_str(public_key), e.message))
 
        if not description.strip():
 
            description = comment.strip()
 

	
 
        user = User.guess_instance(user)
 

	
 
        new_ssh_key = UserSshKeys()
 
        new_ssh_key.user_id = user.user_id
 
        new_ssh_key.description = description
 
        new_ssh_key.public_key = public_key
 

	
 
        for ssh_key in UserSshKeys.query().filter(UserSshKeys.fingerprint == new_ssh_key.fingerprint).all():
 
            raise SshKeyModelException(_('SSH key %s is already used by %s') %
 
                                       (new_ssh_key.fingerprint, ssh_key.user.username))
 

	
 
        Session().add(new_ssh_key)
 

	
 
        return new_ssh_key
 

	
 
    def delete(self, public_key, user=None):
 
    def delete(self, fingerprint, user=None):
 
        """
 
        Deletes given public_key, if user is set it also filters the object for
 
        deletion by given user.
 
        Deletes ssh key with given fingerprint. If user is set, it also filters
 
        the object for deletion by given user.
 
        Will raise SshKeyModelException on errors
 
        """
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys._public_key == public_key)
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.fingerprint == fingerprint)
 

	
 
        if user:
 
            user = User.guess_instance(user)
 
            ssh_key = ssh_key.filter(UserSshKeys.user_id == user.user_id)
 

	
 
        ssh_key = ssh_key.scalar()
 
        if ssh_key is None:
 
            raise SshKeyModelException(_('SSH key %r not found') % safe_str(public_key))
 
            raise SshKeyModelException(_('SSH key with fingerprint %r found') % safe_str(fingerprint))
 
        Session().delete(ssh_key)
 

	
 
    def get_ssh_keys(self, user):
 
        user = User.guess_instance(user)
 
        user_ssh_keys = UserSshKeys.query() \
 
            .filter(UserSshKeys.user_id == user.user_id).all()
 
        return user_ssh_keys
 

	
 
    def write_authorized_keys(self):
 
        if not str2bool(config.get('ssh_enabled', False)):
 
            log.error("Will not write SSH authorized_keys file - ssh_enabled is not configured")
 
            return
 
        authorized_keys = config.get('ssh_authorized_keys')
 
        kallithea_cli_path = config.get('kallithea_cli_path', 'kallithea-cli')
 
        if not authorized_keys:
 
            log.error('Cannot write SSH authorized_keys file - ssh_authorized_keys is not configured')
 
            return
 
        log.info('Writing %s', authorized_keys)
 

	
 
        authorized_keys_dir = os.path.dirname(authorized_keys)
 
        try:
 
            os.makedirs(authorized_keys_dir)
 
            os.chmod(authorized_keys_dir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) # ~/.ssh/ must be 0700
 
        except OSError as exception:
 
            if exception.errno != errno.EEXIST:
 
                raise
 
        # Now, test that the directory is or was created in a readable way by previous.
 
        if not (os.path.isdir(authorized_keys_dir) and
 
                os.access(authorized_keys_dir, os.W_OK)):
 
            raise Exception("Directory of authorized_keys cannot be written to so authorized_keys file %s cannot be written" % (authorized_keys))
 

	
 
        # Make sure we don't overwrite a key file with important content
 
        if os.path.exists(authorized_keys):
 
            with open(authorized_keys) as f:
 
                for l in f:
 
                    if not l.strip() or l.startswith('#'):
 
                        pass # accept empty lines and comments
 
                    elif ssh.SSH_OPTIONS in l and ' ssh-serve ' in l:
 
                        pass # Kallithea entries are ok to overwrite
 
                    else:
 
                        raise Exception("Safety check failed, found %r in %s - please review and remove it" % (l.strip(), authorized_keys))
 

	
 
        fh, tmp_authorized_keys = tempfile.mkstemp('.authorized_keys', dir=os.path.dirname(authorized_keys))
 
        with os.fdopen(fh, 'w') as f:
 
            for key in UserSshKeys.query().join(UserSshKeys.user).filter(User.active == True):
 
                f.write(ssh.authorized_keys_line(kallithea_cli_path, config['__file__'], key))
 
        os.chmod(tmp_authorized_keys, stat.S_IRUSR | stat.S_IWUSR)
 
        # This preliminary remove is needed for Windows, not for Unix.
 
        # TODO In Python 3, the remove+rename sequence below should become os.replace.
 
        if os.path.exists(authorized_keys):
 
            os.remove(authorized_keys)
 
        os.rename(tmp_authorized_keys, authorized_keys)
kallithea/templates/admin/my_account/my_account_ssh_keys.html
Show inline comments
 
<table class="table">
 
    %if c.user_ssh_keys:
 
        <tr>
 
            <th>${_('Fingerprint')}</th>
 
            <th>${_('Description')}</th>
 
            <th>${_('Last Used')}</th>
 
            <th>${_('Action')}</th>
 
        </tr>
 
        %for ssh_key in c.user_ssh_keys:
 
          <tr>
 
            <td>
 
                ${ssh_key.fingerprint}
 
            </td>
 
            <td>
 
                ${ssh_key.description}
 
            </td>
 
            <td>
 
              %if ssh_key.last_seen:
 
                ${h.fmt_date(ssh_key.last_seen)}
 
              %else:
 
                ${_('Never')}
 
              %endif
 
            </td>
 
            <td>
 
                ${h.form(url('my_account_ssh_keys_delete'))}
 
                    ${h.hidden('del_public_key', ssh_key.public_key)}
 
                    ${h.hidden('del_public_key_fingerprint', ssh_key.fingerprint)}
 
                    <button class="btn btn-danger btn-xs" type="submit"
 
                            onclick="return confirm('${_('Confirm to remove this SSH key: %s') % ssh_key.fingerprint}');">
 
                        <i class="icon-trashcan"></i>
 
                        ${_('Remove')}
 
                    </button>
 
                ${h.end_form()}
 
            </td>
 
          </tr>
 
        %endfor
 
    %else:
 
        <tr>
 
            <td>
 
                <div class="ip">${_('No SSH keys have been added')}</div>
 
            </td>
 
        </tr>
 
    %endif
 
</table>
 

	
 
<div>
 
    ${h.form(url('my_account_ssh_keys'))}
 
    <div class="form">
 
            <div class="form-group">
 
                <label class="control-label">${_('New SSH key')}</label>
 
            </div>
 
            <div class="form-group">
 
                <label class="control-label" for="public_key">${_('Public key')}:</label>
 
                <div>
 
                    ${h.textarea('public_key', '', class_='form-control', placeholder=_('Public key (contents of e.g. ~/.ssh/id_rsa.pub)'), cols=80, rows=5)}
 
                </div>
 
            </div>
 
            <div class="form-group">
 
                <label class="control-label" for="description">${_('Description')}:</label>
 
                <div>
 
                    ${h.text('description', class_='form-control', placeholder=_('Description'))}
 
                </div>
 
            </div>
 
            <div class="form-group">
 
                <div class="buttons">
 
                    ${h.submit('save', _('Add'), class_="btn btn-default")}
 
                    ${h.reset('reset', _('Reset'), class_="btn btn-default")}
 
                </div>
 
            </div>
 
    </div>
 
    ${h.end_form()}
 
</div>
kallithea/templates/admin/users/user_edit_ssh_keys.html
Show inline comments
 
<table class="table">
 
    %if c.user_ssh_keys:
 
        <tr>
 
            <th>${_('Fingerprint')}</th>
 
            <th>${_('Description')}</th>
 
            <th>${_('Last Used')}</th>
 
            <th>${_('Action')}</th>
 
        </tr>
 
        %for ssh_key in c.user_ssh_keys:
 
          <tr>
 
            <td>
 
                ${ssh_key.fingerprint}
 
            </td>
 
            <td>
 
                ${ssh_key.description}
 
            </td>
 
            <td>
 
              %if ssh_key.last_seen:
 
                ${h.fmt_date(ssh_key.last_seen)}
 
              %else:
 
                ${_('Never')}
 
              %endif
 
            </td>
 
            <td>
 
                ${h.form(url('edit_user_ssh_keys_delete', id=c.user.user_id))}
 
                    ${h.hidden('del_public_key', ssh_key.public_key)}
 
                    ${h.hidden('del_public_key_fingerprint', ssh_key.fingerprint)}
 
                    <button class="btn btn-danger btn-xs" type="submit"
 
                            onclick="return confirm('${_('Confirm to remove this SSH key: %s') % ssh_key.fingerprint}');">
 
                        <i class="icon-trashcan"></i>
 
                        ${_('Remove')}
 
                    </button>
 
                ${h.end_form()}
 
            </td>
 
          </tr>
 
        %endfor
 
    %else:
 
        <tr>
 
            <td>
 
                <div class="ip">${_('No SSH keys have been added')}</div>
 
            </td>
 
        </tr>
 
    %endif
 
</table>
 

	
 
<div>
 
    ${h.form(url('edit_user_ssh_keys', id=c.user.user_id))}
 
    <div class="form">
 
            <div class="form-group">
 
                <label class="control-label">${_('New SSH key')}</label>
 
            </div>
 
            <div class="form-group">
 
                <label class="control-label" for="public_key">${_('Public key')}:</label>
 
                <div>
 
                    ${h.textarea('public_key', '', class_='form-control', placeholder=_('Public key (contents of e.g. ~/.ssh/id_rsa.pub)'), cols=80, rows=5)}
 
                </div>
 
            </div>
 
            <div class="form-group">
 
                <label class="control-label" for="description">${_('Description')}:</label>
 
                <div>
 
                    ${h.text('description', class_='form-control', placeholder=_('Description'))}
 
                </div>
 
            </div>
 
            <div class="form-group">
 
                <div class="buttons">
 
                    ${h.submit('save', _('Add'), class_="btn btn-default")}
 
                    ${h.reset('reset', _('Reset'), class_="btn btn-default")}
 
                </div>
 
            </div>
 
    </div>
 
    ${h.end_form()}
 
</div>
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
@@ -463,191 +463,191 @@ class TestAdminUsersController(TestContr
 
    ])
 
    def test_add_api_keys(self, desc, lifetime):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_api_keys_update', id=user_id),
 
                 {'description': desc, 'lifetime': lifetime, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        try:
 
            response = response.follow()
 
            user = User.get(user_id)
 
            for api_key in user.api_keys:
 
                response.mustcontain(api_key)
 
        finally:
 
            for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all():
 
                Session().delete(api_key)
 
                Session().commit()
 

	
 
    def test_remove_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_api_keys_update', id=user_id),
 
                {'description': 'desc', 'lifetime': -1, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        # now delete our key
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        assert 1 == len(keys)
 

	
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
 
                 {'del_api_key': keys[0].api_key, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        assert 0 == len(keys)
 

	
 
    def test_reset_main_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        api_key = user.api_key
 
        response = self.app.get(url('edit_user_api_keys', id=user_id))
 
        response.mustcontain(api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
 
                 {'del_api_key_builtin': api_key, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully reset')
 
        response = response.follow()
 
        response.mustcontain(no=[api_key])
 

	
 
    def test_add_ssh_key(self):
 
        description = u'something'
 
        public_key = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== me@localhost'
 
        fingerprint = u'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8'
 

	
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_ssh_keys', id=user_id),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 

	
 
        response = response.follow()
 
        response.mustcontain(fingerprint)
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.user_id == user_id).one()
 
        assert ssh_key.fingerprint == fingerprint
 
        assert ssh_key.description == description
 
        Session().delete(ssh_key)
 
        Session().commit()
 

	
 
    def test_remove_ssh_key(self):
 
        description = u''
 
        public_key = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== me@localhost'
 
        fingerprint = u'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8'
 

	
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_ssh_keys', id=user_id),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 
        response.follow()
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.user_id == user_id).one()
 
        assert ssh_key.description == u'me@localhost'
 

	
 
        response = self.app.post(url('edit_user_ssh_keys_delete', id=user_id),
 
                                 {'del_public_key': ssh_key.public_key,
 
                                 {'del_public_key_fingerprint': ssh_key.fingerprint,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key successfully deleted')
 
        keys = UserSshKeys.query().all()
 
        assert 0 == len(keys)
 

	
 

	
 
class TestAdminUsersController_unittest(TestController):
 
    """ Unit tests for the users controller """
 

	
 
    def test_get_user_or_raise_if_default(self, monkeypatch, test_context_fixture):
 
        # flash complains about an non-existing session
 
        def flash_mock(*args, **kwargs):
 
            pass
 
        monkeypatch.setattr(h, 'flash', flash_mock)
 

	
 
        u = UsersController()
 
        # a regular user should work correctly
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        assert u._get_user_or_raise_if_default(user.user_id) == user
 
        # the default user should raise
 
        with pytest.raises(HTTPNotFound):
 
            u._get_user_or_raise_if_default(User.get_default_user().user_id)
 

	
 

	
 
class TestAdminUsersControllerForDefaultUser(TestController):
 
    """
 
    Edit actions on the default user are not allowed.
 
    Validate that they throw a 404 exception.
 
    """
 
    def test_edit_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user', id=user.user_id), status=404)
 

	
 
    def test_edit_advanced_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
 

	
 
    # API keys
 
    def test_edit_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
 

	
 
    def test_add_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys_update', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    def test_delete_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    # Permissions
 
    def test_edit_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
 

	
 
    def test_update_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_perms_update', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    # Emails
 
    def test_edit_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_emails', id=user.user_id), status=404)
 

	
 
    def test_add_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_emails_update', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    def test_delete_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_emails_delete', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    # IP addresses
 
    # Add/delete of IP addresses for the default user is used to maintain
 
    # the global IP whitelist and thus allowed. Only 'edit' is forbidden.
 
    def test_edit_ip_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)
kallithea/tests/functional/test_my_account.py
Show inline comments
 
@@ -196,101 +196,101 @@ class TestMyAccountController(TestContro
 
        user = User.get(usr['user_id'])
 
        response = self.app.get(url('my_account_api_keys'))
 
        response.mustcontain(user.api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
    @parametrize('desc,lifetime', [
 
        ('forever', -1),
 
        ('5mins', 60*5),
 
        ('30days', 60*60*24*30),
 
    ])
 
    def test_my_account_add_api_keys(self, desc, lifetime):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        response = self.app.post(url('my_account_api_keys'),
 
                                 {'description': desc, 'lifetime': lifetime, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        try:
 
            response = response.follow()
 
            user = User.get(usr['user_id'])
 
            for api_key in user.api_keys:
 
                response.mustcontain(api_key)
 
        finally:
 
            for api_key in UserApiKeys.query().all():
 
                Session().delete(api_key)
 
                Session().commit()
 

	
 
    def test_my_account_remove_api_key(self):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        response = self.app.post(url('my_account_api_keys'),
 
                                 {'description': 'desc', 'lifetime': -1, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        # now delete our key
 
        keys = UserApiKeys.query().all()
 
        assert 1 == len(keys)
 

	
 
        response = self.app.post(url('my_account_api_keys_delete'),
 
                 {'del_api_key': keys[0].api_key, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().all()
 
        assert 0 == len(keys)
 

	
 
    def test_my_account_reset_main_api_key(self):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        api_key = user.api_key
 
        response = self.app.get(url('my_account_api_keys'))
 
        response.mustcontain(api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
        response = self.app.post(url('my_account_api_keys_delete'),
 
                 {'del_api_key_builtin': api_key, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully reset')
 
        response = response.follow()
 
        response.mustcontain(no=[api_key])
 

	
 
    def test_my_account_add_ssh_key(self):
 
        description = u'something'
 
        public_key = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== me@localhost'
 
        fingerprint = u'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8'
 

	
 
        self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        response = self.app.post(url('my_account_ssh_keys'),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 

	
 
        response = response.follow()
 
        response.mustcontain(fingerprint)
 
        user_id = response.session['authuser']['user_id']
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.user_id == user_id).one()
 
        assert ssh_key.fingerprint == fingerprint
 
        assert ssh_key.description == description
 
        Session().delete(ssh_key)
 
        Session().commit()
 

	
 
    def test_my_account_remove_ssh_key(self):
 
        description = u''
 
        public_key = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== me@localhost'
 
        fingerprint = u'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8'
 

	
 
        self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        response = self.app.post(url('my_account_ssh_keys'),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 
        response.follow()
 
        user_id = response.session['authuser']['user_id']
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.user_id == user_id).one()
 
        assert ssh_key.description == u'me@localhost'
 

	
 
        response = self.app.post(url('my_account_ssh_keys_delete'),
 
                                 {'del_public_key': ssh_key.public_key,
 
                                 {'del_public_key_fingerprint': ssh_key.fingerprint,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key successfully deleted')
 
        keys = UserSshKeys.query().all()
 
        assert 0 == len(keys)
0 comments (0 inline, 0 general)