Changeset - 3b147c38b674
[Not reviewed]
default
0 3 1
Christian Oyarzun - 11 years ago 2014-11-17 20:42:45
oyarzun@gmail.com
ssh: error checking for ssh key management

Based on work by Ilya Beda <ir4y.ix@gmail.com> on
https://bitbucket.org/ir4y/rhodecode/commits/branch/ssh_server_support , and
also heavily modified by Mads Kiilerich.
4 files changed with 142 insertions and 16 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/my_account.py
Show inline comments
 
@@ -46,7 +46,7 @@ from kallithea.model.forms import UserFo
 
from kallithea.model.user import UserModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model.ssh_key import SshKeyModel
 
from kallithea.model.ssh_key import SshKeyModel, SshKeyModelException
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 
@@ -272,16 +272,22 @@ class MyAccountController(BaseController
 
    def my_account_ssh_keys_add(self):
 
        description = request.POST.get('description')
 
        public_key = request.POST.get('public_key')
 
        new_ssh_key = SshKeyModel().create(request.authuser.user_id,
 
                                           description, public_key)
 
        Session().commit()
 
        h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        try:
 
            new_ssh_key = SshKeyModel().create(request.authuser.user_id,
 
                                               description, public_key)
 
            Session().commit()
 
            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('my_account_ssh_keys'))
 

	
 
    @IfSshEnabled
 
    def my_account_ssh_keys_delete(self):
 
        public_key = request.POST.get('del_public_key')
 
        SshKeyModel().delete(public_key, request.authuser.user_id)
 
        Session().commit()
 
        h.flash(_("SSH key successfully deleted"), category='success')
 
        try:
 
            SshKeyModel().delete(public_key, request.authuser.user_id)
 
            Session().commit()
 
            h.flash(_("SSH key successfully deleted"), category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('my_account_ssh_keys'))
kallithea/controllers/admin/users.py
Show inline comments
 
@@ -45,7 +45,7 @@ from kallithea.lib.auth import LoginRequ
 
from kallithea.lib import auth_modules
 
from kallithea.lib.base import BaseController, render, IfSshEnabled
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model.ssh_key import SshKeyModel
 
from kallithea.model.ssh_key import SshKeyModel, SshKeyModelException
 
from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm
 
from kallithea.model.forms import UserForm, CustomDefaultPermissionsForm
 
from kallithea.model.user import UserModel
 
@@ -448,10 +448,13 @@ class UsersController(BaseController):
 

	
 
        description = request.POST.get('description')
 
        public_key = request.POST.get('public_key')
 
        new_ssh_key = SshKeyModel().create(c.user.user_id,
 
                                       description, public_key)
 
        Session().commit()
 
        h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        try:
 
            new_ssh_key = SshKeyModel().create(c.user.user_id,
 
                                               description, public_key)
 
            Session().commit()
 
            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
 

	
 
    @IfSshEnabled
 
@@ -459,7 +462,10 @@ class UsersController(BaseController):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        public_key = request.POST.get('del_public_key')
 
        SshKeyModel().delete(public_key, c.user.user_id)
 
        Session().commit()
 
        h.flash(_("SSH key successfully deleted"), category='success')
 
        try:
 
            SshKeyModel().delete(public_key, c.user.user_id)
 
            Session().commit()
 
            h.flash(_("SSH key successfully deleted"), category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
kallithea/lib/ssh.py
Show inline comments
 
new file 100644
 
# -*- coding: utf-8 -*-
 
"""
 
    kallithea.lib.ssh
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    :created_on: Dec 10, 2012
 
    :author: ir4y
 
    :copyright: (C) 2012 Ilya Beda <ir4y.ix@gmail.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import binascii
 
import re
 

	
 
from tg.i18n import ugettext as _
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SshKeyParseError(Exception):
 
    """Exception raised by parse_pub_key"""
 

	
 

	
 
def parse_pub_key(ssh_key):
 
    r"""Parse SSH public key string, raise SshKeyParseError or return decoded keytype, data and comment
 

	
 
    >>> getfixture('doctest_mock_ugettext')
 
    >>> parse_pub_key('')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: SSH key is missing
 
    >>> parse_pub_key('''AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - it must have both a key type and a base64 part
 
    >>> parse_pub_key('''abc AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - failed to decode base64 part 'AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB2NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ==''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - base64 part is not 'ssh-rsa' as claimed but 'csh-rsa'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - unexpected characters in base64 part "AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ"
 
    >>> parse_pub_key(''' ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ== and a comment
 
    ... ''')
 
    ('ssh-rsa', '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x0bThis is fake!', 'and a comment\n')
 
    """
 
    if not ssh_key:
 
        raise SshKeyParseError(_("SSH key is missing"))
 

	
 
    parts = ssh_key.split(None, 2)
 
    if len(parts) < 2:
 
        raise SshKeyParseError(_("Incorrect SSH key - it must have both a key type and a base64 part"))
 

	
 
    keytype, keyvalue, comment = (parts + [''])[:3]
 
    if keytype not in ('ssh-rsa', 'ssh-dss', 'ssh-ed25519'):
 
        raise SshKeyParseError(_("Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'"))
 

	
 
    if re.search(r'[^a-zA-Z0-9+/=]', keyvalue):
 
        raise SshKeyParseError(_("Incorrect SSH key - unexpected characters in base64 part %r") % keyvalue)
 

	
 
    try:
 
        decoded = keyvalue.decode('base64')
 
    except binascii.Error:
 
        raise SshKeyParseError(_("Incorrect SSH key - failed to decode base64 part %r") % keyvalue)
 

	
 
    if not decoded.startswith('\x00\x00\x00\x07' + str(keytype) + '\x00'):
 
        raise SshKeyParseError(_("Incorrect SSH key - base64 part is not %r as claimed but %r") % (str(keytype), str(decoded[4:].split('\0', 1)[0])))
 

	
 
    return keytype, decoded, comment
kallithea/model/ssh_key.py
Show inline comments
 
@@ -21,11 +21,20 @@ SSH key model for Kallithea
 

	
 
import logging
 

	
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.model.db import UserSshKeys, User
 
from kallithea.model.meta import Session
 
from kallithea.lib import ssh
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SshKeyModelException(Exception):
 
    """Exception raised by SshKeyModel methods to report errors"""
 

	
 

	
 
class SshKeyModel(object):
 

	
 
    def create(self, user, description, public_key):
 
@@ -33,13 +42,24 @@ class SshKeyModel(object):
 
        :param user: user or user_id
 
        :param description: description of SshKey
 
        :param publickey: public key text
 
        Will raise SshKeyModelException on errors
 
        """
 
        try:
 
            ssh.parse_pub_key(public_key)
 
        except ssh.SshKeyParseError as e:
 
            raise SshKeyModelException(_('SSH key %r is invalid: %s') % (safe_str(public_key), e.message))
 

	
 
        user = User.guess_instance(user)
 

	
 
        new_ssh_key = UserSshKeys()
 
        new_ssh_key.user_id = user.user_id
 
        new_ssh_key.description = description
 
        new_ssh_key.public_key = public_key
 

	
 
        for ssh_key in UserSshKeys.query().filter(UserSshKeys.fingerprint == new_ssh_key.fingerprint).all():
 
            raise SshKeyModelException(_('SSH key %s is already used by %s') %
 
                                       (new_ssh_key.fingerprint, ssh_key.user.username))
 

	
 
        Session().add(new_ssh_key)
 

	
 
        return new_ssh_key
 
@@ -48,6 +68,7 @@ class SshKeyModel(object):
 
        """
 
        Deletes given public_key, if user is set it also filters the object for
 
        deletion by given user.
 
        Will raise SshKeyModelException on errors
 
        """
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys._public_key == public_key)
 

	
 
@@ -56,6 +77,8 @@ class SshKeyModel(object):
 
            ssh_key = ssh_key.filter(UserSshKeys.user_id == user.user_id)
 

	
 
        ssh_key = ssh_key.scalar()
 
        if ssh_key is None:
 
            raise SshKeyModelException(_('SSH key %r not found') % safe_str(public_key))
 
        Session().delete(ssh_key)
 

	
 
    def get_ssh_keys(self, user):
0 comments (0 inline, 0 general)