Changeset - 95c01895c006
[Not reviewed]
default
0 1 3
Tim Freund - 11 years ago 2014-11-17 20:40:35
tim@freunds.net
ssh: db models for ssh key management

Add database components for SSH based access. Actual use of this will be added
soon.

The work in this commit is based heavily off of the existing API key
code for the sake of consistency.

The original code has been heavily modified by Mads Kiilerich.

Updates to use User.guess_instance by Anton Schur <tonich.sh@gmail.com>.
4 files changed with 168 insertions and 0 deletions:
0 comments (0 inline, 0 general)
kallithea/alembic/versions/b74907136bc1_create_table_for_ssh_keys.py
Show inline comments
 
new file 100644
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""Create table for ssh keys
 

	
 
Revision ID: b74907136bc1
 
Revises: a020f7044fd6
 
Create Date: 2017-04-03 18:54:24.490346
 

	
 
"""
 

	
 
# The following opaque hexadecimal identifiers ("revisions") are used
 
# by Alembic to track this migration script and its relations to others.
 
revision = 'b74907136bc1'
 
down_revision = 'ad357ccd9521'
 
branch_labels = None
 
depends_on = None
 

	
 
from alembic import op
 
import sqlalchemy as sa
 

	
 

	
 
def upgrade():
 
    op.create_table('user_ssh_keys',
 
        sa.Column('user_ssh_key_id', sa.Integer(), nullable=False),
 
        sa.Column('user_id', sa.Integer(), nullable=False),
 
        sa.Column('public_key', sa.UnicodeText(), nullable=False),
 
        sa.Column('description', sa.UnicodeText(), nullable=False),
 
        sa.Column('fingerprint', sa.String(length=255), nullable=False),
 
        sa.Column('created_on', sa.DateTime(), nullable=False),
 
        sa.Column('last_seen', sa.DateTime(), nullable=True),
 
        sa.ForeignKeyConstraint(['user_id'], ['users.user_id'], name=op.f('fk_user_ssh_keys_user_id')),
 
        sa.PrimaryKeyConstraint('user_ssh_key_id', name=op.f('pk_user_ssh_keys')),
 
        sa.UniqueConstraint('fingerprint', name=op.f('uq_user_ssh_keys_fingerprint')),
 
    )
 
    with op.batch_alter_table('user_ssh_keys', schema=None) as batch_op:
 
        batch_op.create_index('usk_fingerprint_idx', ['fingerprint'], unique=False)
 

	
 
def downgrade():
 
    with op.batch_alter_table('user_ssh_keys', schema=None) as batch_op:
 
        batch_op.drop_index('usk_fingerprint_idx')
 
    op.drop_table('user_ssh_keys')
kallithea/model/db.py
Show inline comments
 
@@ -446,12 +446,13 @@ class User(Base, BaseDbModel):
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    # extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    # extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 
    ssh_keys = relationship('UserSshKeys', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
@@ -2512,6 +2513,39 @@ class Gist(Base, BaseDbModel):
 
    @property
 
    def scm_instance(self):
 
        from kallithea.lib.vcs import get_repo
 
        base_path = self.base_path()
 
        return get_repo(os.path.join(*map(safe_str,
 
                                          [base_path, self.gist_access_id])))
 

	
 

	
 
class UserSshKeys(Base, BaseDbModel):
 
    __tablename__ = 'user_ssh_keys'
 
    __table_args__ = (
 
        Index('usk_public_key_idx', 'public_key'),
 
        Index('usk_fingerprint_idx', 'fingerprint'),
 
        UniqueConstraint('fingerprint'),
 
        _table_args_default_dict
 
    )
 
    __mapper_args__ = {}
 

	
 
    user_ssh_key_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _public_key = Column('public_key', UnicodeText(), nullable=False)
 
    description = Column(UnicodeText(), nullable=False)
 
    fingerprint = Column(String(255), nullable=False, unique=True)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    last_seen = Column(DateTime(timezone=False), nullable=True)
 

	
 
    user = relationship('User')
 

	
 
    @property
 
    def public_key(self):
 
        return self._public_key
 

	
 
    @public_key.setter
 
    def public_key(self, full_key):
 
        # the full public key is too long to be suitable as database key - instead,
 
        # use fingerprints similar to 'ssh-keygen -E sha256 -lf ~/.ssh/id_rsa.pub'
 
        self._public_key = full_key
 
        enc_key = full_key.split(" ")[1]
 
        self.fingerprint = hashlib.sha256(enc_key.decode('base64')).digest().encode('base64').replace('\n', '').rstrip('=')
kallithea/model/ssh_key.py
Show inline comments
 
new file 100644
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.ssh_key
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
SSH key model for Kallithea
 

	
 
"""
 

	
 
import logging
 

	
 
from kallithea.model.db import UserSshKeys, User
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 

	
 
class SshKeyModel(object):
 

	
 
    def create(self, user, description, public_key):
 
        """
 
        :param user: user or user_id
 
        :param description: description of SshKey
 
        :param publickey: public key text
 
        """
 
        user = User.guess_instance(user)
 

	
 
        new_ssh_key = UserSshKeys()
 
        new_ssh_key.user_id = user.user_id
 
        new_ssh_key.description = description
 
        new_ssh_key.public_key = public_key
 
        Session().add(new_ssh_key)
 

	
 
        return new_ssh_key
 

	
 
    def delete(self, public_key, user=None):
 
        """
 
        Deletes given public_key, if user is set it also filters the object for
 
        deletion by given user.
 
        """
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys._public_key == public_key)
 

	
 
        if user:
 
            user = User.guess_instance(user)
 
            ssh_key = ssh_key.filter(UserSshKeys.user_id == user.user_id)
 

	
 
        ssh_key = ssh_key.scalar()
 
        Session().delete(ssh_key)
 

	
 
    def get_ssh_keys(self, user):
 
        user = User.guess_instance(user)
 
        user_ssh_keys = UserSshKeys.query() \
 
            .filter(UserSshKeys.user_id == user.user_id).all()
 
        return user_ssh_keys
kallithea/tests/models/test_user_ssh_keys.py
Show inline comments
 
new file 100644
 
from kallithea.model.db import UserSshKeys
 

	
 
from kallithea.tests.base import TestController
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 
public_key = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== kallithea@localhost'
 

	
 

	
 
class TestUserSshKeys(TestController):
 

	
 
    def test_fingerprint_generation(self):
 
        key_model = UserSshKeys()
 
        key_model.public_key = public_key
 
        expected = 'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8'
 
        assert expected == key_model.fingerprint
0 comments (0 inline, 0 general)