Changeset - bf3546d1cd77
[Not reviewed]
default
0 5 0
Søren Løvborg - 9 years ago 2016-11-09 14:17:34
sorenl@unity3d.com
db: clean up SQLAlchemy session flushes

Many calls to Session().flush() were completely superfluous and have
been removed. (See also the note on "flush" in the contributor docs.)
For the remaining calls, a comment has been added to explain why it's
necessary.
5 files changed with 15 insertions and 19 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth_modules/__init__.py
Show inline comments
 
@@ -279,25 +279,24 @@ class KallitheaExternalAuthPlugin(Kallit
 
                      self.name)
 
            user = UserModel().create_or_update(
 
                username=username,
 
                password=passwd,
 
                email=user_data["email"],
 
                firstname=user_data["firstname"],
 
                lastname=user_data["lastname"],
 
                active=user_data["active"],
 
                admin=user_data["admin"],
 
                extern_name=user_data["extern_name"],
 
                extern_type=self.name
 
            )
 
            Session().flush()
 
            # enforce user is just in given groups, all of them has to be ones
 
            # created from plugins. We store this info in _group_data JSON field
 
            groups = user_data['groups'] or []
 
            UserGroupModel().enforce_groups(user, groups, self.name)
 
            Session().commit()
 
        return user_data
 

	
 

	
 
def importplugin(plugin):
 
    """
 
    Imports and returns the authentication plugin in the module named by plugin
 
    (e.g., plugin='kallithea.lib.auth_modules.auth_internal'). Returns the
kallithea/model/gist.py
Show inline comments
 
@@ -113,25 +113,25 @@ class GistModel(BaseModel):
 
        gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 
        log.debug('set GIST expiration date to: %s',
 
                  time_to_datetime(gist_expires)
 
                   if gist_expires != -1 else 'forever')
 
        #create the Database version
 
        gist = Gist()
 
        gist.gist_description = description
 
        gist.gist_access_id = gist_id
 
        gist.gist_owner = owner.user_id
 
        gist.gist_expires = gist_expires
 
        gist.gist_type = safe_unicode(gist_type)
 
        self.sa.add(gist)
 
        self.sa.flush()
 
        self.sa.flush() # make database assign gist.gist_id
 
        if gist_type == Gist.GIST_PUBLIC:
 
            # use DB ID for easy to use GIST ID
 
            gist_id = safe_unicode(gist.gist_id)
 
            gist.gist_access_id = gist_id
 
            self.sa.add(gist)
 

	
 
        gist_repo_path = os.path.join(GIST_STORE_LOC, gist_id)
 
        log.debug('Creating new %s GIST repo in %s', gist_type, gist_repo_path)
 
        repo = RepoModel()._create_filesystem_repo(
 
            repo_name=gist_id, repo_type='hg', repo_group=GIST_STORE_LOC)
 

	
 
        processed_mapping = {}
 
@@ -202,26 +202,24 @@ class GistModel(BaseModel):
 
            elif v['org_filename'] and not v['filename']:
 
                op = 'del'
 
            else:
 
                op = 'mod'
 

	
 
            v['op'] = op
 
            gist_mapping_op[k] = v
 

	
 
        gist.gist_description = description
 
        gist.gist_expires = gist_expires
 
        gist.owner = owner
 
        gist.gist_type = gist_type
 
        self.sa.add(gist)
 
        self.sa.flush()
 

	
 
        message = 'updated file'
 
        message += 's: ' if len(gist_mapping) > 1 else ': '
 
        message += ', '.join([x for x in gist_mapping])
 

	
 
        #fake Kallithea Repository object
 
        fake_repo = AttributeDict(dict(
 
            repo_name=gist_repo.path,
 
            scm_instance_no_cache=lambda: gist_repo,
 
        ))
 

	
 
        self._store_metadata(gist_repo, gist.gist_id, gist.gist_access_id,
kallithea/model/user.py
Show inline comments
 
@@ -83,25 +83,26 @@ class UserModel(BaseModel):
 
        check_allowed_create_user(user_data, cur_user)
 
        from kallithea.lib.auth import get_crypt_password
 

	
 
        new_user = User()
 
        for k, v in form_data.items():
 
            if k == 'password':
 
                v = get_crypt_password(v)
 
            if k == 'firstname':
 
                k = 'name'
 
            setattr(new_user, k, v)
 

	
 
        new_user.api_key = generate_api_key()
 
        self.sa.add(new_user)
 
        Session().add(new_user)
 
        Session().flush() # make database assign new_user.user_id
 

	
 
        log_create_user(new_user.get_dict(), cur_user)
 
        return new_user
 

	
 
    def create_or_update(self, username, password, email, firstname=u'',
 
                         lastname=u'', active=True, admin=False,
 
                         extern_type=None, extern_name=None, cur_user=None):
 
        """
 
        Creates a new instance if not found, or updates current one
 

	
 
        :param username:
 
        :param password:
 
@@ -155,45 +156,45 @@ class UserModel(BaseModel):
 
            if not edit:
 
                new_user.api_key = generate_api_key()
 

	
 
            # set password only if creating an user or password is changed
 
            password_change = new_user.password and \
 
                not check_password(password, new_user.password)
 
            if not edit or password_change:
 
                reason = 'new password' if edit else 'new user'
 
                log.debug('Updating password reason=>%s', reason)
 
                new_user.password = get_crypt_password(password) \
 
                    if password else ''
 

	
 
            self.sa.add(new_user)
 
            if user is None:
 
                Session().add(new_user)
 
                Session().flush() # make database assign new_user.user_id
 

	
 
            if not edit:
 
                log_create_user(new_user.get_dict(), cur_user)
 

	
 
            return new_user
 
        except (DatabaseError,):
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create_registration(self, form_data):
 
        from kallithea.model.notification import NotificationModel
 
        import kallithea.lib.helpers as h
 

	
 
        form_data['admin'] = False
 
        form_data['extern_type'] = User.DEFAULT_AUTH_TYPE
 
        form_data['extern_name'] = ''
 
        new_user = self.create(form_data)
 

	
 
        self.sa.add(new_user)
 
        self.sa.flush()
 

	
 
        # notification to admins
 
        subject = _('New user registration')
 
        body = (
 
            u'New user registration\n'
 
            '---------------------\n'
 
            '- Username: {user.username}\n'
 
            '- Full Name: {user.full_name}\n'
 
            '- Email: {user.email}\n'
 
            ).format(user=new_user)
 
        edit_url = h.canonical_url('edit_user', id=new_user.user_id)
 
        email_kwargs = {
 
            'registered_user_url': edit_url,
kallithea/model/user_group.py
Show inline comments
 
@@ -123,36 +123,37 @@ class UserGroupModel(BaseModel):
 
            return new_user_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def update(self, user_group, form_data):
 

	
 
        try:
 
            user_group = self._get_user_group(user_group)
 

	
 
            for k, v in form_data.items():
 
                if k == 'users_group_members':
 
                    user_group.members = []
 
                    self.sa.flush()
 
                    members_list = []
 
                    if v:
 
                        v = [v] if isinstance(v, basestring) else v
 
                        for u_id in set(v):
 
                            member = UserGroupMember(user_group.users_group_id, u_id)
 
                            members_list.append(member)
 
                    setattr(user_group, 'members', members_list)
 
                            self.sa.add(member)
 
                    user_group.members = members_list
 
                setattr(user_group, k, v)
 

	
 
            self.sa.add(user_group)
 
            # Flush to make db assign users_group_member_id to newly
 
            # created UserGroupMembers.
 
            self.sa.flush()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, user_group, force=False):
 
        """
 
        Deletes user group, unless force flag is used
 
        raises exception if there are members in that group, else deletes
 
        group and users
 

	
 
        :param user_group:
 
        :param force:
kallithea/tests/models/test_settings.py
Show inline comments
 
from kallithea.model.meta import Session
 
from kallithea.model.db import Setting
 

	
 

	
 
name = 'spam-setting-name'
 

	
 
def test_passing_list_setting_value_results_in_string_valued_setting():
 
    assert Setting.get_by_name(name) is None
 
    setting = Setting.create_or_update(name, ['spam', 'eggs'])
 
    Session().flush()
 
    Session().flush() # must flush so we can delete it below
 
    try:
 
        assert Setting.get_by_name(name) is not None
 
        # Quirk: list value is stringified.
 
        assert Setting.get_by_name(name).app_settings_value \
 
               == "['spam', 'eggs']"
 
        assert Setting.get_by_name(name).app_settings_type == 'unicode'
 
    finally:
 
        Session().delete(setting)
 
        Session().flush()
 

	
 
def test_list_valued_setting_creation_requires_manual_value_formatting():
 
    assert Setting.get_by_name(name) is None
 
    # Quirk: need manual formatting of list setting value.
 
    setting = Setting.create_or_update(name, 'spam,eggs', type='list')
 
    Session().flush()
 
    Session().flush() # must flush so we can delete it below
 
    try:
 
        assert setting.app_settings_value == ['spam', 'eggs']
 
    finally:
 
        Session().delete(setting)
 
        Session().flush()
 

	
 
def test_list_valued_setting_update():
 
    assert Setting.get_by_name(name) is None
 
    setting = Setting.create_or_update(name, 'spam', type='list')
 
    Session().flush()
 
    Session().flush() # must flush so we can delete it below
 
    try:
 
        assert setting.app_settings_value == [u'spam']
 
        # Assign back setting value.
 
        setting.app_settings_value = setting.app_settings_value
 
        # Quirk: value is stringified on write and listified on read.
 
        assert setting.app_settings_value == ["[u'spam']"]
 
        setting.app_settings_value = setting.app_settings_value
 
        assert setting.app_settings_value == ["[u\"[u'spam']\"]"]
 
    finally:
 
        Session().delete(setting)
 
        Session().flush()
0 comments (0 inline, 0 general)