Changeset - 69e738523107
[Not reviewed]
default
0 1 0
Andrew Shadura - 10 years ago 2016-01-31 16:51:32
andrew@shadura.me
db: match case-insensitively using func.lower, not ilike

ilike() uses SQL ILIKE operator internally, which means it interprets
'%' and '_' in the match pattern as wildcards. Instead of ilike(), it's
better to turn both operands to the lower case and compare them.

This also unbreaks the test case introduced in 13d0fe6f751a.
1 file changed with 5 insertions and 5 deletions:
0 comments (0 inline, 0 general)
kallithea/model/db.py
Show inline comments
 
@@ -534,96 +534,96 @@ class User(Base, BaseModel):
 
        try:
 
            self._user_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.user_id, self.username)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_, allow_default=True):
 
        '''
 
        Overridden version of BaseModel.get_or_404, with an extra check on
 
        the default user.
 
        '''
 
        user = super(User, cls).get_or_404(id_)
 
        if allow_default == False:
 
            if user.username == User.DEFAULT_USER:
 
                raise DefaultUserException
 
        return user
 

	
 
    @classmethod
 
    def get_by_username(cls, username, case_insensitive=False, cache=False):
 
        if case_insensitive:
 
            q = cls.query().filter(cls.username.ilike(username))
 
            q = cls.query().filter(func.lower(cls.username) == func.lower(username))
 
        else:
 
            q = cls.query().filter(cls.username == username)
 

	
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_user_%s" % _hash_key(username)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_api_key(cls, api_key, cache=False, fallback=True):
 
        if len(api_key) != 40 or not api_key.isalnum():
 
            return None
 

	
 
        q = cls.query().filter(cls.api_key == api_key)
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_api_key_%s" % api_key))
 
        res = q.scalar()
 

	
 
        if fallback and not res:
 
            #fallback to additional keys
 
            _res = UserApiKeys.query() \
 
                .filter(UserApiKeys.api_key == api_key) \
 
                .filter(or_(UserApiKeys.expires == -1,
 
                            UserApiKeys.expires >= time.time())) \
 
                .first()
 
            if _res:
 
                res = _res.user
 
        return res
 

	
 
    @classmethod
 
    def get_by_email(cls, email, cache=False):
 
        q = cls.query().filter(cls.email.ilike(email))
 
        q = cls.query().filter(func.lower(cls.email) == func.lower(email))
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_email_key_%s" % email))
 

	
 
        ret = q.scalar()
 
        if ret is None:
 
            q = UserEmailMap.query()
 
            # try fetching in alternate email map
 
            q = q.filter(UserEmailMap.email.ilike(email))
 
            q = q.filter(func.lower(UserEmailMap.email) == func.lower(email))
 
            q = q.options(joinedload(UserEmailMap.user))
 
            if cache:
 
                q = q.options(FromCache("sql_cache_short",
 
                                        "get_email_map_key_%s" % email))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import email, author_name
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        if _email:
 
            user = cls.get_by_email(_email)
 
            if user is not None:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
@@ -830,49 +830,49 @@ class UserGroup(Base, BaseModel):
 
        if not self._group_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._group_data)
 
        except TypeError:
 
            return {}
 

	
 
    @group_data.setter
 
    def group_data(self, val):
 
        try:
 
            self._group_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.users_group_id,
 
                                      self.users_group_name)
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False,
 
                          case_insensitive=False):
 
        if case_insensitive:
 
            q = cls.query().filter(cls.users_group_name.ilike(group_name))
 
            q = cls.query().filter(func.lower(cls.users_group_name) == func.lower(group_name))
 
        else:
 
            q = cls.query().filter(cls.users_group_name == group_name)
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get(cls, user_group_id, cache=False):
 
        user_group = cls.query()
 
        if cache:
 
            user_group = user_group.options(FromCache("sql_cache_short",
 
                                    "get_users_group_%s" % user_group_id))
 
        return user_group.get(user_group_id)
 

	
 
    def get_api_data(self, with_members=True):
 
        user_group = self
 

	
 
        data = dict(
 
            users_group_id=user_group.users_group_id,
 
            group_name=user_group.users_group_name,
 
@@ -1499,49 +1499,49 @@ class RepoGroup(Base, BaseModel):
 
                                      self.group_name)
 

	
 
    @classmethod
 
    def _generate_choice(cls, repo_group):
 
        """Return tuple with group_id and name as html literal"""
 
        from webhelpers.html import literal
 
        if repo_group is None:
 
            return (-1, u'-- %s --' % _('top level'))
 
        return repo_group.group_id, literal(cls.SEP.join(repo_group.full_path_splitted))
 

	
 
    @classmethod
 
    def groups_choices(cls, groups):
 
        """Return tuples with group_id and name as html literal."""
 
        return sorted((cls._generate_choice(g) for g in groups),
 
                      key=lambda c: c[1].split(cls.SEP))
 

	
 
    @classmethod
 
    def url_sep(cls):
 
        return URL_SEP
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
 
        if case_insensitive:
 
            gr = cls.query() \
 
                .filter(cls.group_name.ilike(group_name))
 
                .filter(func.lower(cls.group_name) == func.lower(group_name))
 
        else:
 
            gr = cls.query() \
 
                .filter(cls.group_name == group_name)
 
        if cache:
 
            gr = gr.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                            )
 
            )
 
        return gr.scalar()
 

	
 
    @property
 
    def parents(self):
 
        parents_recursion_limit = 10
 
        groups = []
 
        if self.parent_group is None:
 
            return groups
 
        cur_gr = self.parent_group
 
        groups.insert(0, cur_gr)
 
        cnt = 0
 
        while 1:
 
            cnt += 1
 
            gr = getattr(cur_gr, 'parent_group', None)
 
            cur_gr = cur_gr.parent_group
0 comments (0 inline, 0 general)