diff --git a/kallithea/alembic/versions/b74907136bc1_create_table_for_ssh_keys.py b/kallithea/alembic/versions/b74907136bc1_create_table_for_ssh_keys.py new file mode 100644 --- /dev/null +++ b/kallithea/alembic/versions/b74907136bc1_create_table_for_ssh_keys.py @@ -0,0 +1,52 @@ +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Create table for ssh keys + +Revision ID: b74907136bc1 +Revises: a020f7044fd6 +Create Date: 2017-04-03 18:54:24.490346 + +""" + +# The following opaque hexadecimal identifiers ("revisions") are used +# by Alembic to track this migration script and its relations to others. +revision = 'b74907136bc1' +down_revision = 'ad357ccd9521' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table('user_ssh_keys', + sa.Column('user_ssh_key_id', sa.Integer(), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('public_key', sa.UnicodeText(), nullable=False), + sa.Column('description', sa.UnicodeText(), nullable=False), + sa.Column('fingerprint', sa.String(length=255), nullable=False), + sa.Column('created_on', sa.DateTime(), nullable=False), + sa.Column('last_seen', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['user_id'], ['users.user_id'], name=op.f('fk_user_ssh_keys_user_id')), + sa.PrimaryKeyConstraint('user_ssh_key_id', name=op.f('pk_user_ssh_keys')), + sa.UniqueConstraint('fingerprint', name=op.f('uq_user_ssh_keys_fingerprint')), + ) + with op.batch_alter_table('user_ssh_keys', schema=None) as batch_op: + batch_op.create_index('usk_fingerprint_idx', ['fingerprint'], unique=False) + +def downgrade(): + with op.batch_alter_table('user_ssh_keys', schema=None) as batch_op: + batch_op.drop_index('usk_fingerprint_idx') + op.drop_table('user_ssh_keys') diff --git a/kallithea/model/db.py b/kallithea/model/db.py --- a/kallithea/model/db.py +++ b/kallithea/model/db.py @@ -449,6 +449,7 @@ class User(Base, BaseDbModel): user_emails = relationship('UserEmailMap', cascade='all') # extra API keys user_api_keys = relationship('UserApiKeys', cascade='all') + ssh_keys = relationship('UserSshKeys', cascade='all') @hybrid_property def email(self): @@ -2515,3 +2516,36 @@ class Gist(Base, BaseDbModel): base_path = self.base_path() return get_repo(os.path.join(*map(safe_str, [base_path, self.gist_access_id]))) + + +class UserSshKeys(Base, BaseDbModel): + __tablename__ = 'user_ssh_keys' + __table_args__ = ( + Index('usk_public_key_idx', 'public_key'), + Index('usk_fingerprint_idx', 'fingerprint'), + UniqueConstraint('fingerprint'), + _table_args_default_dict + ) + __mapper_args__ = {} + + user_ssh_key_id = Column(Integer(), primary_key=True) + user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False) + _public_key = Column('public_key', UnicodeText(), nullable=False) + description = Column(UnicodeText(), nullable=False) + fingerprint = Column(String(255), nullable=False, unique=True) + created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now) + last_seen = Column(DateTime(timezone=False), nullable=True) + + user = relationship('User') + + @property + def public_key(self): + return self._public_key + + @public_key.setter + def public_key(self, full_key): + # the full public key is too long to be suitable as database key - instead, + # use fingerprints similar to 'ssh-keygen -E sha256 -lf ~/.ssh/id_rsa.pub' + self._public_key = full_key + enc_key = full_key.split(" ")[1] + self.fingerprint = hashlib.sha256(enc_key.decode('base64')).digest().encode('base64').replace('\n', '').rstrip('=') diff --git a/kallithea/model/ssh_key.py b/kallithea/model/ssh_key.py new file mode 100644 --- /dev/null +++ b/kallithea/model/ssh_key.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +kallithea.model.ssh_key +~~~~~~~~~~~~~~~~~~~~~~~ + +SSH key model for Kallithea + +""" + +import logging + +from kallithea.model.db import UserSshKeys, User +from kallithea.model.meta import Session + +log = logging.getLogger(__name__) + +class SshKeyModel(object): + + def create(self, user, description, public_key): + """ + :param user: user or user_id + :param description: description of SshKey + :param publickey: public key text + """ + user = User.guess_instance(user) + + new_ssh_key = UserSshKeys() + new_ssh_key.user_id = user.user_id + new_ssh_key.description = description + new_ssh_key.public_key = public_key + Session().add(new_ssh_key) + + return new_ssh_key + + def delete(self, public_key, user=None): + """ + Deletes given public_key, if user is set it also filters the object for + deletion by given user. + """ + ssh_key = UserSshKeys.query().filter(UserSshKeys._public_key == public_key) + + if user: + user = User.guess_instance(user) + ssh_key = ssh_key.filter(UserSshKeys.user_id == user.user_id) + + ssh_key = ssh_key.scalar() + Session().delete(ssh_key) + + def get_ssh_keys(self, user): + user = User.guess_instance(user) + user_ssh_keys = UserSshKeys.query() \ + .filter(UserSshKeys.user_id == user.user_id).all() + return user_ssh_keys diff --git a/kallithea/tests/models/test_user_ssh_keys.py b/kallithea/tests/models/test_user_ssh_keys.py new file mode 100644 --- /dev/null +++ b/kallithea/tests/models/test_user_ssh_keys.py @@ -0,0 +1,17 @@ +from kallithea.model.db import UserSshKeys + +from kallithea.tests.base import TestController +from kallithea.tests.fixture import Fixture + +fixture = Fixture() + +public_key = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== kallithea@localhost' + + +class TestUserSshKeys(TestController): + + def test_fingerprint_generation(self): + key_model = UserSshKeys() + key_model.public_key = public_key + expected = 'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8' + assert expected == key_model.fingerprint