Changeset - a87aa385f21c
[Not reviewed]
beta
0 7 0
Marcin Kuzminski - 14 years ago 2011-12-06 03:06:01
marcin@python-works.com
fixed repo_create permission by adding missing commit statements
- added few tests for checking permission in UserModel
- added __json__() into get_dict() to fetch from it hybrid_properties and any additional custom properties
- code garden
7 files changed with 194 insertions and 52 deletions:
0 comments (0 inline, 0 general)
rhodecode/controllers/admin/users.py
Show inline comments
 
@@ -197,7 +197,7 @@ class UsersController(BaseController):
 
            user_model.grant_perm(id, perm)
 
            h.flash(_("Granted 'repository create' permission to user"),
 
                    category='success')
 

	
 
            Session.commit()
 
        else:
 
            perm = Permission.get_by_key('hg.create.repository')
 
            user_model.revoke_perm(id, perm)
 
@@ -206,5 +206,5 @@ class UsersController(BaseController):
 
            user_model.grant_perm(id, perm)
 
            h.flash(_("Revoked 'repository create' permission to user"),
 
                    category='success')
 

	
 
            Session.commit()
 
        return redirect(url('edit_user', id=id))
rhodecode/lib/db_manage.py
Show inline comments
 
@@ -245,12 +245,20 @@ class DbManage(object):
 
            self.create_user(username, password, email, True)
 
        else:
 
            log.info('creating admin and regular test users')
 
            self.create_user('test_admin', 'test12',
 
                             'test_admin@mail.com', True)
 
            self.create_user('test_regular', 'test12',
 
                             'test_regular@mail.com', False)
 
            self.create_user('test_regular2', 'test12',
 
                             'test_regular2@mail.com', False)
 
            from rhodecode.tests import TEST_USER_ADMIN_LOGIN,\
 
            TEST_USER_ADMIN_PASS ,TEST_USER_ADMIN_EMAIL,TEST_USER_REGULAR_LOGIN,\
 
            TEST_USER_REGULAR_PASS,TEST_USER_REGULAR_EMAIL,\
 
            TEST_USER_REGULAR2_LOGIN,TEST_USER_REGULAR2_PASS,\
 
            TEST_USER_REGULAR2_EMAIL
 

	
 
            self.create_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS,
 
                             TEST_USER_ADMIN_EMAIL, True)
 
            
 
            self.create_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS,
 
                             TEST_USER_REGULAR_EMAIL, False)
 
            
 
            self.create_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS,
 
                             TEST_USER_REGULAR2_EMAIL, False)
 

	
 
    def create_ui_settings(self):
 
        """Creates ui settings, fills out hooks
rhodecode/model/db.py
Show inline comments
 
@@ -82,8 +82,8 @@ class ModelSerializer(json.JSONEncoder):
 
            return json.JSONEncoder.default(self, obj)
 

	
 
class BaseModel(object):
 
    """Base Model for all classess
 

	
 
    """
 
    Base Model for all classess
 
    """
 

	
 
    @classmethod
 
@@ -91,13 +91,20 @@ class BaseModel(object):
 
        """return column names for this model """
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """return dict with keys and values corresponding
 
        to this model data """
 
    def get_dict(self, serialized=False):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data 
 
        """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 
        
 
        # also use __json__() if present to get additional fields
 
        if hasattr(self, '__json__'):
 
            for k,val in self.__json__().iteritems():
 
                d[k] = val 
 
        return d
 

	
 
    def get_appstruct(self):
 
@@ -350,6 +357,11 @@ class User(Base, BaseModel):
 
        log.debug('updated user %s lastlogin', self.username)
 

	
 

	
 
    def __json__(self):
 
        return dict(email=self.email,
 
                    full_name=self.full_name)
 

	
 

	
 
class UserLog(Base, BaseModel):
 
    __tablename__ = 'user_logs'
 
    __table_args__ = {'extend_existing':True}
rhodecode/model/user.py
Show inline comments
 
@@ -281,9 +281,10 @@ class UserModel(BaseModel):
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, user_id):
 
    def delete(self, user):
 
        user = self.__get_user(user)
 
        
 
        try:
 
            user = self.get(user_id, cache=False)
 
            if user.username == 'default':
 
                raise DefaultUserException(
 
                                _("You can't remove this user since it's"
 
@@ -470,35 +471,37 @@ class UserModel(BaseModel):
 

	
 
        return user
 

	
 

	
 

	
 
    def has_perm(self, user, perm):
 
        if not isinstance(perm, Permission):
 
            raise Exception('perm needs to be an instance of Permission class')
 
            raise Exception('perm needs to be an instance of Permission class '
 
                            'got %s instead' % type(perm))
 

	
 
        user = self.__get_user(user)
 

	
 
        return UserToPerm.query().filter(UserToPerm.user == user.user)\
 
        return UserToPerm.query().filter(UserToPerm.user == user)\
 
            .filter(UserToPerm.permission == perm).scalar() is not None
 

	
 
    def grant_perm(self, user, perm):
 
        if not isinstance(perm, Permission):
 
            raise Exception('perm needs to be an instance of Permission class')
 
            raise Exception('perm needs to be an instance of Permission class '
 
                            'got %s instead' % type(perm))
 

	
 
        user = self.__get_user(user)
 

	
 
        new = UserToPerm()
 
        new.user = user.user
 
        new.user = user
 
        new.permission = perm
 
        self.sa.add(new)
 

	
 

	
 
    def revoke_perm(self, user, perm):
 
        if not isinstance(perm, Permission):
 
            raise Exception('perm needs to be an instance of Permission class')
 
            raise Exception('perm needs to be an instance of Permission class '
 
                            'got %s instead' % type(perm))
 
        
 
        user = self.__get_user(user)
 
        
 
        obj = UserToPerm.query().filter(UserToPerm.user == user.user)\
 
                .filter(UserToPerm.permission == perm).one()
 
        obj = UserToPerm.query().filter(UserToPerm.user == user)\
 
                .filter(UserToPerm.permission == perm).scalar()
 
        if obj:
 
        self.sa.delete(obj)
rhodecode/tests/__init__.py
Show inline comments
 
@@ -31,9 +31,13 @@ time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = ['environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
 
__all__ = [
 
    'environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
 
           'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK',
 
           'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS' ]
 
    'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL'
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 
@@ -48,6 +52,16 @@ environ = {}
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
rhodecode/tests/functional/test_admin_users.py
Show inline comments
 
from rhodecode.tests import *
 
from rhodecode.model.db import User
 
from rhodecode.model.db import User, Permission
 
from rhodecode.lib.auth import check_password
 
from sqlalchemy.orm.exc import NoResultFound
 
from rhodecode.model.user import UserModel
 

	
 
class TestAdminUsersController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('users'))
 
        # Test response...
 

	
 
@@ -21,7 +23,8 @@ class TestAdminUsersController(TestContr
 
        lastname = 'lastname'
 
        email = 'mail@mail.com'
 

	
 
        response = self.app.post(url('users'), {'username':username,
 
        response = self.app.post(url('users'), 
 
                                 {'username':username,
 
                                               'password':password,
 
                                               'password_confirmation':password_confirmation,
 
                                               'name':name,
 
@@ -30,21 +33,21 @@ class TestAdminUsersController(TestContr
 
                                               'email':email})
 

	
 

	
 
        assert '''created user %s''' % (username) in response.session['flash'][0], 'No flash message about new user'
 
        self.assertTrue('''created user %s''' % (username) in 
 
                        response.session['flash'][0])
 

	
 
        new_user = self.Session.query(User).filter(User.username == username).one()
 

	
 
        new_user = self.Session.query(User).\
 
            filter(User.username == username).one()
 

	
 
        assert new_user.username == username, 'wrong info about username'
 
        assert check_password(password, new_user.password) == True , 'wrong info about password'
 
        assert new_user.name == name, 'wrong info about name'
 
        assert new_user.lastname == lastname, 'wrong info about lastname'
 
        assert new_user.email == email, 'wrong info about email'
 

	
 
        self.assertEqual(new_user.username,username)
 
        self.assertEqual(check_password(password, new_user.password),True)
 
        self.assertEqual(new_user.name,name)
 
        self.assertEqual(new_user.lastname,lastname)
 
        self.assertEqual(new_user.email,email)
 

	
 
        response.follow()
 
        response = response.follow()
 
        assert """edit">newtestuser</a>""" in response.body
 
        self.assertTrue("""edit">newtestuser</a>""" in response.body)
 

	
 
    def test_create_err(self):
 
        self.log_user()
 
@@ -61,9 +64,9 @@ class TestAdminUsersController(TestContr
 
                                               'lastname':lastname,
 
                                               'email':email})
 

	
 
        assert """<span class="error-message">Invalid username</span>""" in response.body
 
        assert """<span class="error-message">Please enter a value</span>""" in response.body
 
        assert """<span class="error-message">An email address must contain a single @</span>""" in response.body
 
        self.assertTrue("""<span class="error-message">Invalid username</span>""" in response.body)
 
        self.assertTrue("""<span class="error-message">Please enter a value</span>""" in response.body)
 
        self.assertTrue("""<span class="error-message">An email address must contain a single @</span>""" in response.body)
 

	
 
        def get_user():
 
            self.Session.query(User).filter(User.username == username).one()
 
@@ -71,6 +74,7 @@ class TestAdminUsersController(TestContr
 
        self.assertRaises(NoResultFound, get_user), 'found user in database'
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_user'))
 

	
 
    def test_new_as_xml(self):
 
@@ -100,14 +104,17 @@ class TestAdminUsersController(TestContr
 

	
 
        response = response.follow()
 

	
 
        new_user = self.Session.query(User).filter(User.username == username).one()
 
        new_user = self.Session.query(User)\
 
            .filter(User.username == username).one()
 
        response = self.app.delete(url('user', id=new_user.user_id))
 

	
 
        assert """successfully deleted user""" in response.session['flash'][0], 'No info about user deletion'
 
        self.assertTrue("""successfully deleted user""" in 
 
                        response.session['flash'][0])
 

	
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('user', id=1), params=dict(_method='delete'))
 
        response = self.app.post(url('user', id=1), 
 
                                 params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
        response = self.app.get(url('user', id=1))
 
@@ -116,7 +123,57 @@ class TestAdminUsersController(TestContr
 
        response = self.app.get(url('formatted_user', id=1, format='xml'))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_user', id=1))
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        response = self.app.get(url('edit_user', id=user.user_id))
 

	
 

	
 
    def test_add_perm_create_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.create.none')
 
        perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 

	
 

	
 
        #User should have None permission on creation repository
 
        self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
        self.assertEqual(UserModel().has_perm(user, perm_create), False)
 

	
 
        response = self.app.post(url('user_perm', id=user.user_id),
 
                                 params=dict(_method='put',
 
                                             create_repo_perm=True))
 

	
 
        perm_none = Permission.get_by_key('hg.create.none')
 
        perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        #User should have None permission on creation repository
 
        self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
        self.assertEqual(UserModel().has_perm(user, perm_create), True)
 

	
 
    def test_revoke_perm_create_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.create.none')
 
        perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
        user = User.get_by_username(TEST_USER_REGULAR2_LOGIN)
 

	
 

	
 
        #User should have None permission on creation repository
 
        self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
        self.assertEqual(UserModel().has_perm(user, perm_create), False)
 

	
 
        response = self.app.post(url('user_perm', id=user.user_id),
 
                                 params=dict(_method='put'))
 

	
 
        perm_none = Permission.get_by_key('hg.create.none')
 
        perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
        user = User.get_by_username(TEST_USER_REGULAR2_LOGIN)
 
        #User should have None permission on creation repository
 
        self.assertEqual(UserModel().has_perm(user, perm_none), True)
 
        self.assertEqual(UserModel().has_perm(user, perm_create), False)
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_edit_user', id=1, format='xml'))
rhodecode/tests/test_models.py
Show inline comments
 
@@ -5,7 +5,7 @@ from rhodecode.tests import *
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \
 
    UsersGroup, UsersGroupMember
 
    UsersGroup, UsersGroupMember, Permission
 
from sqlalchemy.exc import IntegrityError
 
from rhodecode.model.user import UserModel
 

	
 
@@ -158,6 +158,9 @@ class TestReposGroups(unittest.TestCase)
 
        self.assertEqual(r.repo_name, os.path.join('g2', 'g1', r.just_name))
 

	
 
class TestUser(unittest.TestCase):
 
    def __init__(self, methodName='runTest'):
 
        Session.remove()
 
        super(TestUser, self).__init__(methodName=methodName)
 

	
 
    def test_create_and_remove(self):
 
        usr = UserModel().create_or_update(username=u'test_user', password=u'qweqwe',
 
@@ -184,6 +187,7 @@ class TestUser(unittest.TestCase):
 
class TestNotifications(unittest.TestCase):
 

	
 
    def __init__(self, methodName='runTest'):
 
        Session.remove()
 
        self.u1 = UserModel().create_or_update(username=u'u1',
 
                                        password=u'qweqwe',
 
                                        email=u'u1@rhodecode.org',
 
@@ -214,6 +218,8 @@ class TestNotifications(unittest.TestCas
 
        Session.commit()
 
        self.assertEqual(Notification.query().all(), [])
 

	
 
    def tearDown(self):
 
        self._clean_notifications()
 

	
 
    def test_create_notification(self):
 
        self.assertEqual([], Notification.query().all())
 
@@ -239,7 +245,6 @@ class TestNotifications(unittest.TestCas
 
        self.assertEqual(len(unotification), len(usrs))
 
        self.assertEqual([x.user.user_id for x in unotification], usrs)
 

	
 
        self._clean_notifications()
 

	
 
    def test_user_notifications(self):
 
        self.assertEqual([], Notification.query().all())
 
@@ -257,7 +262,6 @@ class TestNotifications(unittest.TestCas
 

	
 
        self.assertEqual(sorted([x.notification for x in u3.notifications]),
 
                         sorted([notification2, notification1]))
 
        self._clean_notifications()
 

	
 
    def test_delete_notifications(self):
 
        self.assertEqual([], Notification.query().all())
 
@@ -280,7 +284,6 @@ class TestNotifications(unittest.TestCas
 
                                             == notification).all()
 
        self.assertEqual(un, [])
 

	
 
        self._clean_notifications()
 

	
 
    def test_delete_association(self):
 

	
 
@@ -329,8 +332,6 @@ class TestNotifications(unittest.TestCas
 
                            .scalar()
 
        self.assertNotEqual(u2notification, None)
 

	
 
        self._clean_notifications()
 

	
 
    def test_notification_counter(self):
 
        self._clean_notifications()
 
        self.assertEqual([], Notification.query().all())
 
@@ -359,4 +360,51 @@ class TestNotifications(unittest.TestCas
 
                         .get_unread_cnt_for_user(self.u2), 1)
 
        self.assertEqual(NotificationModel()
 
                         .get_unread_cnt_for_user(self.u3), 2)
 
        self._clean_notifications()
 

	
 
class TestUsers(unittest.TestCase):
 

	
 
    def __init__(self, methodName='runTest'):
 
        super(TestUsers, self).__init__(methodName=methodName)
 

	
 
    def setUp(self):
 
        self.u1 = UserModel().create_or_update(username=u'u1',
 
                                        password=u'qweqwe',
 
                                        email=u'u1@rhodecode.org',
 
                                        name=u'u1', lastname=u'u1')        
 

	
 
    def tearDown(self):
 
        perm = Permission.query().all()
 
        for p in perm:
 
            UserModel().revoke_perm(self.u1, p)
 
            
 
        UserModel().delete(self.u1)
 
        Session.commit()
 

	
 
    def test_add_perm(self):
 
        perm = Permission.query().all()[0]
 
        UserModel().grant_perm(self.u1, perm)
 
        Session.commit()
 
        self.assertEqual(UserModel().has_perm(self.u1, perm), True)
 

	
 
    def test_has_perm(self):
 
        perm = Permission.query().all()
 
        for p in perm:
 
            has_p = UserModel().has_perm(self.u1, p)
 
            self.assertEqual(False, has_p)
 

	
 
    def test_revoke_perm(self):
 
        perm = Permission.query().all()[0]
 
        UserModel().grant_perm(self.u1, perm)
 
        Session.commit()
 
        self.assertEqual(UserModel().has_perm(self.u1, perm), True)
 

	
 
        #revoke
 
        UserModel().revoke_perm(self.u1, perm)
 
        Session.commit()
 
        self.assertEqual(UserModel().has_perm(self.u1, perm),False)
 
        
 
    
 

	
 
        
 

	
 

	
0 comments (0 inline, 0 general)