Changeset - 3cab6bc45cc3
[Not reviewed]
stable
0 7 0
Mads Kiilerich - 6 years ago 2019-12-29 15:11:13
mads@kiilerich.com
Grafted from: 7e353be4c536
ssh: use fingerprint when deleting public keys

Avoid relying on a database index of the full public key string.
7 files changed with 13 insertions and 13 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/my_account.py
Show inline comments
 
@@ -276,21 +276,21 @@ class MyAccountController(BaseController
 
        try:
 
            new_ssh_key = SshKeyModel().create(request.authuser.user_id,
 
                                               description, public_key)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('my_account_ssh_keys'))
 

	
 
    @IfSshEnabled
 
    def my_account_ssh_keys_delete(self):
 
        public_key = request.POST.get('del_public_key')
 
        fingerprint = request.POST.get('del_public_key_fingerprint')
 
        try:
 
            SshKeyModel().delete(public_key, request.authuser.user_id)
 
            SshKeyModel().delete(fingerprint, request.authuser.user_id)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key successfully deleted"), category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('my_account_ssh_keys'))
kallithea/controllers/admin/users.py
Show inline comments
 
@@ -453,21 +453,21 @@ class UsersController(BaseController):
 
                                               description, public_key)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
 

	
 
    @IfSshEnabled
 
    def ssh_keys_delete(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        public_key = request.POST.get('del_public_key')
 
        fingerprint = request.POST.get('del_public_key_fingerprint')
 
        try:
 
            SshKeyModel().delete(public_key, c.user.user_id)
 
            SshKeyModel().delete(fingerprint, c.user.user_id)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key successfully deleted"), category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
kallithea/model/ssh_key.py
Show inline comments
 
@@ -63,39 +63,39 @@ class SshKeyModel(object):
 
        new_ssh_key.user_id = user.user_id
 
        new_ssh_key.description = description
 
        new_ssh_key.public_key = public_key
 

	
 
        for ssh_key in UserSshKeys.query().filter(UserSshKeys.fingerprint == new_ssh_key.fingerprint).all():
 
            raise SshKeyModelException(_('SSH key %s is already used by %s') %
 
                                       (new_ssh_key.fingerprint, ssh_key.user.username))
 

	
 
        Session().add(new_ssh_key)
 

	
 
        return new_ssh_key
 

	
 
    def delete(self, public_key, user=None):
 
    def delete(self, fingerprint, user=None):
 
        """
 
        Deletes given public_key, if user is set it also filters the object for
 
        deletion by given user.
 
        Deletes ssh key with given fingerprint. If user is set, it also filters
 
        the object for deletion by given user.
 
        Will raise SshKeyModelException on errors
 
        """
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys._public_key == public_key)
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.fingerprint == fingerprint)
 

	
 
        if user:
 
            user = User.guess_instance(user)
 
            ssh_key = ssh_key.filter(UserSshKeys.user_id == user.user_id)
 

	
 
        ssh_key = ssh_key.scalar()
 
        if ssh_key is None:
 
            raise SshKeyModelException(_('SSH key %r not found') % safe_str(public_key))
 
            raise SshKeyModelException(_('SSH key with fingerprint %r found') % safe_str(fingerprint))
 
        Session().delete(ssh_key)
 

	
 
    def get_ssh_keys(self, user):
 
        user = User.guess_instance(user)
 
        user_ssh_keys = UserSshKeys.query() \
 
            .filter(UserSshKeys.user_id == user.user_id).all()
 
        return user_ssh_keys
 

	
 
    def write_authorized_keys(self):
 
        if not str2bool(config.get('ssh_enabled', False)):
 
            log.error("Will not write SSH authorized_keys file - ssh_enabled is not configured")
 
            return
kallithea/templates/admin/my_account/my_account_ssh_keys.html
Show inline comments
 
@@ -14,25 +14,25 @@
 
            <td>
 
                ${ssh_key.description}
 
            </td>
 
            <td>
 
              %if ssh_key.last_seen:
 
                ${h.fmt_date(ssh_key.last_seen)}
 
              %else:
 
                ${_('Never')}
 
              %endif
 
            </td>
 
            <td>
 
                ${h.form(url('my_account_ssh_keys_delete'))}
 
                    ${h.hidden('del_public_key', ssh_key.public_key)}
 
                    ${h.hidden('del_public_key_fingerprint', ssh_key.fingerprint)}
 
                    <button class="btn btn-danger btn-xs" type="submit"
 
                            onclick="return confirm('${_('Confirm to remove this SSH key: %s') % ssh_key.fingerprint}');">
 
                        <i class="icon-trashcan"></i>
 
                        ${_('Remove')}
 
                    </button>
 
                ${h.end_form()}
 
            </td>
 
          </tr>
 
        %endfor
 
    %else:
 
        <tr>
 
            <td>
kallithea/templates/admin/users/user_edit_ssh_keys.html
Show inline comments
 
@@ -14,25 +14,25 @@
 
            <td>
 
                ${ssh_key.description}
 
            </td>
 
            <td>
 
              %if ssh_key.last_seen:
 
                ${h.fmt_date(ssh_key.last_seen)}
 
              %else:
 
                ${_('Never')}
 
              %endif
 
            </td>
 
            <td>
 
                ${h.form(url('edit_user_ssh_keys_delete', id=c.user.user_id))}
 
                    ${h.hidden('del_public_key', ssh_key.public_key)}
 
                    ${h.hidden('del_public_key_fingerprint', ssh_key.fingerprint)}
 
                    <button class="btn btn-danger btn-xs" type="submit"
 
                            onclick="return confirm('${_('Confirm to remove this SSH key: %s') % ssh_key.fingerprint}');">
 
                        <i class="icon-trashcan"></i>
 
                        ${_('Remove')}
 
                    </button>
 
                ${h.end_form()}
 
            </td>
 
          </tr>
 
        %endfor
 
    %else:
 
        <tr>
 
            <td>
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
@@ -547,25 +547,25 @@ class TestAdminUsersController(TestContr
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_ssh_keys', id=user_id),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 
        response.follow()
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.user_id == user_id).one()
 
        assert ssh_key.description == u'me@localhost'
 

	
 
        response = self.app.post(url('edit_user_ssh_keys_delete', id=user_id),
 
                                 {'del_public_key': ssh_key.public_key,
 
                                 {'del_public_key_fingerprint': ssh_key.fingerprint,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key successfully deleted')
 
        keys = UserSshKeys.query().all()
 
        assert 0 == len(keys)
 

	
 

	
 
class TestAdminUsersController_unittest(TestController):
 
    """ Unit tests for the users controller """
 

	
 
    def test_get_user_or_raise_if_default(self, monkeypatch, test_context_fixture):
 
        # flash complains about an non-existing session
 
        def flash_mock(*args, **kwargs):
kallithea/tests/functional/test_my_account.py
Show inline comments
 
@@ -280,17 +280,17 @@ class TestMyAccountController(TestContro
 
        self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        response = self.app.post(url('my_account_ssh_keys'),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 
        response.follow()
 
        user_id = response.session['authuser']['user_id']
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.user_id == user_id).one()
 
        assert ssh_key.description == u'me@localhost'
 

	
 
        response = self.app.post(url('my_account_ssh_keys_delete'),
 
                                 {'del_public_key': ssh_key.public_key,
 
                                 {'del_public_key_fingerprint': ssh_key.fingerprint,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key successfully deleted')
 
        keys = UserSshKeys.query().all()
 
        assert 0 == len(keys)
0 comments (0 inline, 0 general)