Changeset - 5923d7474287
[Not reviewed]
default
0 3 0
Mads Kiilerich - 11 years ago 2015-02-06 03:35:40
madski@unity3d.com
[security fix] api: don't send internal data unless asked for it

This changeset fixes CVE-2015-0260.
See <https://kallithea-scm.org/security/cve-2015-0260.html>; for
more details.
3 files changed with 11 insertions and 8 deletions:
0 comments (0 inline, 0 general)
kallithea/model/db.py
Show inline comments
 
@@ -582,117 +582,120 @@ class User(Base, BaseModel):
 
            q = q.options(joinedload(UserEmailMap.user))
 
            if cache:
 
                q = q.options(FromCache("sql_cache_short",
 
                                        "get_email_map_key_%s" % email))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import email, author_name
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        if _email:
 
            user = cls.get_by_email(_email, case_insensitive=True)
 
            if user:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
        user = cls.get_by_username(_author, case_insensitive=True)
 
        if user:
 
            return user
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        self.last_login = datetime.datetime.now()
 
        Session().add(self)
 
        log.debug('updated user %s lastlogin' % self.username)
 

	
 
    @classmethod
 
    def get_first_admin(cls):
 
        user = User.query().filter(User.admin == True).first()
 
        if user is None:
 
            raise Exception('Missing administrative account!')
 
        return user
 

	
 
    @classmethod
 
    def get_default_user(cls, cache=False):
 
        user = User.get_by_username(User.DEFAULT_USER, cache=cache)
 
        if user is None:
 
            raise Exception('Missing default account!')
 
        return user
 

	
 
    def get_api_data(self):
 
    def get_api_data(self, details=False):
 
        """
 
        Common function for generating user related data for API
 
        """
 
        user = self
 
        data = dict(
 
            user_id=user.user_id,
 
            username=user.username,
 
            firstname=user.name,
 
            lastname=user.lastname,
 
            email=user.email,
 
            emails=user.emails,
 
            api_key=user.api_key,
 
            api_keys=user.api_keys,
 
            active=user.active,
 
            admin=user.admin,
 
        )
 
        if details:
 
            data.update(dict(
 
            extern_type=user.extern_type,
 
            extern_name=user.extern_name,
 
                api_key=user.api_key,
 
                api_keys=user.api_keys,
 
            last_login=user.last_login,
 
            ip_addresses=user.ip_addresses
 
        )
 
                ))
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
            full_name=self.full_name,
 
            full_name_or_username=self.full_name_or_username,
 
            short_contact=self.short_contact,
 
            full_contact=self.full_contact
 
        )
 
        data.update(self.get_api_data())
 
        return data
 

	
 

	
 
class UserApiKeys(Base, BaseModel):
 
    __tablename__ = 'user_api_keys'
 
    __table_args__ = (
 
        Index('uak_api_key_idx', 'api_key'),
 
        Index('uak_api_key_expires_idx', 'api_key', 'expires'),
 
        UniqueConstraint('api_key'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8', 'sqlite_autoincrement': True}
 
    )
 
    __mapper_args__ = {}
 

	
 
    user_api_key_id = Column("user_api_key_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
 
    api_key = Column("api_key", String(255, convert_unicode=False), nullable=False, unique=True)
 
    description = Column('description', UnicodeText(1024))
 
    expires = Column('expires', Float(53), nullable=False)
 
    created_on = Column('created_on', DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User', lazy='joined')
 

	
 
    @property
 
    def expired(self):
 
        if self.expires == -1:
 
            return False
 
        return time.time() > self.expires
 

	
 

	
 
class UserEmailMap(Base, BaseModel):
 
    __tablename__ = 'user_email_map'
 
    __table_args__ = (
 
        Index('uem_email_idx', 'email'),
 
        UniqueConstraint('email'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8', 'sqlite_autoincrement': True}
 
    )
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
@@ -84,117 +84,117 @@ class TestAdminUsersController(TestContr
 
        lastname = 'lastname'
 
        email = 'errmail.com'
 

	
 
        response = self.app.post(url('users'), {'username': username,
 
                                               'password': password,
 
                                               'name': name,
 
                                               'active': False,
 
                                               'lastname': lastname,
 
                                               'email': email})
 

	
 
        msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
 
        msg = h.html_escape(msg % {'username': 'new_user'})
 
        response.mustcontain("""<span class="error-message">%s</span>""" % msg)
 
        response.mustcontain("""<span class="error-message">Please enter a value</span>""")
 
        response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
 

	
 
        def get_user():
 
            Session().query(User).filter(User.username == username).one()
 

	
 
        self.assertRaises(NoResultFound, get_user), 'found user in database'
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_user'))
 

	
 
    @parameterized.expand(
 
        [('firstname', {'firstname': 'new_username'}),
 
         ('lastname', {'lastname': 'new_username'}),
 
         ('admin', {'admin': True}),
 
         ('admin', {'admin': False}),
 
         ('extern_type', {'extern_type': 'ldap'}),
 
         ('extern_type', {'extern_type': None}),
 
         ('extern_name', {'extern_name': 'test'}),
 
         ('extern_name', {'extern_name': None}),
 
         ('active', {'active': False}),
 
         ('active', {'active': True}),
 
         ('email', {'email': 'some@email.com'}),
 
        # ('new_password', {'new_password': 'foobar123',
 
        #                   'password_confirmation': 'foobar123'})
 
        ])
 
    def test_update(self, name, attrs):
 
        self.log_user()
 
        usr = fixture.create_user(self.test_user_1, password='qweqwe',
 
                                  email='testme@example.com',
 
                                  extern_type='internal',
 
                                  extern_name=self.test_user_1,
 
                                  skip_if_exists=True)
 
        Session().commit()
 
        params = usr.get_api_data()
 
        params = usr.get_api_data(True)
 
        params.update({'password_confirmation': ''})
 
        params.update({'new_password': ''})
 
        params.update(attrs)
 
        if name == 'email':
 
            params['emails'] = [attrs['email']]
 
        if name == 'extern_type':
 
            #cannot update this via form, expected value is original one
 
            params['extern_type'] = "internal"
 
        if name == 'extern_name':
 
            #cannot update this via form, expected value is original one
 
            params['extern_name'] = self.test_user_1
 
            # special case since this user is not
 
                                          # logged in yet his data is not filled
 
                                          # so we use creation data
 

	
 
        response = self.app.put(url('user', id=usr.user_id), params)
 
        self.checkSessionFlash(response, 'User updated successfully')
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data()
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
        updated_params.update({'new_password': ''})
 

	
 
        self.assertEqual(params, updated_params)
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        username = 'newtestuserdeleteme'
 

	
 
        fixture.create_user(name=username)
 

	
 
        new_user = Session().query(User)\
 
            .filter(User.username == username).one()
 
        response = self.app.delete(url('user', id=new_user.user_id))
 

	
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_repo_err(self):
 
        self.log_user()
 
        username = 'repoerr'
 
        reponame = 'repoerr_fail'
 

	
 
        fixture.create_user(name=username)
 
        fixture.create_repo(name=reponame, cur_user=username)
 

	
 
        new_user = Session().query(User)\
 
            .filter(User.username == username).one()
 
        response = self.app.delete(url('user', id=new_user.user_id))
 
        self.checkSessionFlash(response, 'User "%s" still '
 
                               'owns 1 repositories and cannot be removed. '
 
                               'Switch owners or remove those repositories: '
 
                               '%s' % (username, reponame))
 

	
 
        response = self.app.delete(url('repo', repo_name=reponame))
 
        self.checkSessionFlash(response, 'Deleted repository %s' % reponame)
 

	
 
        response = self.app.delete(url('user', id=new_user.user_id))
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_repo_group_err(self):
 
        self.log_user()
 
        username = 'repogrouperr'
 
        groupname = 'repogroup_fail'
 

	
 
        fixture.create_user(name=username)
 
        fixture.create_repo_group(name=groupname, cur_user=username)
 

	
 
        new_user = Session().query(User)\
kallithea/tests/functional/test_my_account.py
Show inline comments
 
@@ -61,113 +61,113 @@ class TestMyAccountController(TestContro
 
        self.checkSessionFlash(response, 'Please enter an email address')
 

	
 
    def test_my_account_my_emails_add_remove(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 

	
 
        response = self.app.post(url('my_account_emails'),
 
                                 {'new_email': 'foo@barz.com'})
 

	
 
        response = self.app.get(url('my_account_emails'))
 

	
 
        from kallithea.model.db import UserEmailMap
 
        email_id = UserEmailMap.query()\
 
            .filter(UserEmailMap.user == User.get_by_username(TEST_USER_ADMIN_LOGIN))\
 
            .filter(UserEmailMap.email == 'foo@barz.com').one().email_id
 

	
 
        response.mustcontain('foo@barz.com')
 
        response.mustcontain('<input id="del_email_id" name="del_email_id" type="hidden" value="%s" />' % email_id)
 

	
 
        response = self.app.post(url('my_account_emails'),
 
                                 {'del_email_id': email_id, '_method': 'delete'})
 
        self.checkSessionFlash(response, 'Removed email from user')
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 

	
 

	
 
    @parameterized.expand(
 
        [('firstname', {'firstname': 'new_username'}),
 
         ('lastname', {'lastname': 'new_username'}),
 
         ('admin', {'admin': True}),
 
         ('admin', {'admin': False}),
 
         ('extern_type', {'extern_type': 'ldap'}),
 
         ('extern_type', {'extern_type': None}),
 
         #('extern_name', {'extern_name': 'test'}),
 
         #('extern_name', {'extern_name': None}),
 
         ('active', {'active': False}),
 
         ('active', {'active': True}),
 
         ('email', {'email': 'some@email.com'}),
 
        # ('new_password', {'new_password': 'foobar123',
 
        #                   'password_confirmation': 'foobar123'})
 
        ])
 
    def test_my_account_update(self, name, attrs):
 
        usr = fixture.create_user(self.test_user_1, password='qweqwe',
 
                                  email='testme@example.com',
 
                                  extern_type='internal',
 
                                  extern_name=self.test_user_1,
 
                                  skip_if_exists=True)
 
        params = usr.get_api_data()  # current user data
 
        params = usr.get_api_data(True)  # current user data
 
        user_id = usr.user_id
 
        self.log_user(username=self.test_user_1, password='qweqwe')
 

	
 
        params.update({'password_confirmation': ''})
 
        params.update({'new_password': ''})
 
        params.update({'extern_type': 'internal'})
 
        params.update({'extern_name': self.test_user_1})
 

	
 
        params.update(attrs)
 
        response = self.app.post(url('my_account'), params)
 

	
 
        self.checkSessionFlash(response,
 
                               'Your account was updated successfully')
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data()
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
        updated_params.update({'new_password': ''})
 

	
 
        params['last_login'] = updated_params['last_login']
 
        if name == 'email':
 
            params['emails'] = [attrs['email']]
 
        if name == 'extern_type':
 
            #cannot update this via form, expected value is original one
 
            params['extern_type'] = "internal"
 
        if name == 'extern_name':
 
            #cannot update this via form, expected value is original one
 
            params['extern_name'] = str(user_id)
 
        if name == 'active':
 
            #my account cannot deactivate account
 
            params['active'] = True
 
        if name == 'admin':
 
            #my account cannot make you an admin !
 
            params['admin'] = False
 

	
 
        self.assertEqual(params, updated_params)
 

	
 
    def test_my_account_update_err_email_exists(self):
 
        self.log_user()
 

	
 
        new_email = 'test_regular@mail.com'  # already exisitn email
 
        response = self.app.post(url('my_account'),
 
                                params=dict(
 
                                    username='test_admin',
 
                                    new_password='test12',
 
                                    password_confirmation='test122',
 
                                    firstname='NewName',
 
                                    lastname='NewLastname',
 
                                    email=new_email,)
 
                                )
 

	
 
        response.mustcontain('This e-mail address is already taken')
 

	
 
    def test_my_account_update_err(self):
 
        self.log_user('test_regular2', 'test12')
 

	
 
        new_email = 'newmail.pl'
 
        response = self.app.post(url('my_account'),
 
                                 params=dict(
 
                                            username='test_admin',
 
                                            new_password='test12',
 
                                            password_confirmation='test122',
 
                                            firstname='NewName',
 
                                            lastname='NewLastname',
0 comments (0 inline, 0 general)