Changeset - 9ecd1d4836cc
[Not reviewed]
default
0 2 0
Mads Kiilerich - 11 years ago 2015-03-11 00:24:25
madski@unity3d.com
helpers: introduce user_or_none helper helper

Slight cleanup refactoring - will be useful later.
2 files changed with 24 insertions and 29 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/helpers.py
Show inline comments
 
@@ -401,241 +401,236 @@ def is_following_repo(repo_name, user_id
 

	
 
class _Message(object):
 
    """A message returned by ``Flash.pop_messages()``.
 

	
 
    Converting the message to a string returns the message text. Instances
 
    also have the following attributes:
 

	
 
    * ``message``: the message text.
 
    * ``category``: the category specified when the message was created.
 
    """
 

	
 
    def __init__(self, category, message):
 
        self.category = category
 
        self.message = message
 

	
 
    def __str__(self):
 
        return self.message
 

	
 
    __unicode__ = __str__
 

	
 
    def __html__(self):
 
        return escape(safe_unicode(self.message))
 

	
 
class Flash(_Flash):
 

	
 
    def pop_messages(self):
 
        """Return all accumulated messages and delete them from the session.
 

	
 
        The return value is a list of ``Message`` objects.
 
        """
 
        from pylons import session
 
        messages = session.pop(self.session_key, [])
 
        session.save()
 
        return [_Message(*m) for m in messages]
 

	
 
flash = Flash()
 

	
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from kallithea.lib.vcs.utils import author_name, author_email
 
from kallithea.lib.utils2 import credentials_filter, age as _age
 
from kallithea.model.db import User, ChangesetStatus
 

	
 
age = lambda  x, y=False: _age(x, y)
 
capitalize = lambda x: x.capitalize()
 
email = author_email
 
short_id = lambda x: x[:12]
 
hide_credentials = lambda x: ''.join(credentials_filter(x))
 

	
 

	
 
def show_id(cs):
 
    """
 
    Configurable function that shows ID
 
    by default it's r123:fffeeefffeee
 

	
 
    :param cs: changeset instance
 
    """
 
    from kallithea import CONFIG
 
    def_len = safe_int(CONFIG.get('show_sha_length', 12))
 
    show_rev = str2bool(CONFIG.get('show_revision_number', False))
 

	
 
    raw_id = cs.raw_id[:def_len]
 
    if show_rev:
 
        return 'r%s:%s' % (cs.revision, raw_id)
 
    else:
 
        return '%s' % (raw_id)
 

	
 

	
 
def fmt_date(date):
 
    if date:
 
        return date.strftime("%Y-%m-%d %H:%M:%S").decode('utf8')
 

	
 
    return ""
 

	
 

	
 
def is_git(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'git'
 

	
 

	
 
def is_hg(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'hg'
 

	
 

	
 
def user_or_none(author):
 
    email = author_email(author)
 
    if email is not None:
 
        user = User.get_by_email(email, case_insensitive=True, cache=True)
 
        if user is not None:
 
            return user
 

	
 
    user = User.get_by_username(author_name(author), case_insensitive=True, cache=True)
 
    if user is not None:
 
        return user
 

	
 
    return None
 

	
 
def email_or_none(author):
 
    if not author:
 
        return None
 
    # extract email from the commit string
 
    _email = email(author)
 
    if _email:
 
        # check it against Kallithea database, and use the MAIN email for this
 
        # user
 
        user = User.get_by_email(_email, case_insensitive=True, cache=True)
 
    user = user_or_none(author)
 
        if user is not None:
 
            return user.email
 
        return _email
 
        return user.email # always use main email address - not necessarily the one used to find user
 

	
 
    # See if it contains a username we can get an email from
 
    user = User.get_by_username(author_name(author), case_insensitive=True,
 
                                cache=True)
 
    if user is not None:
 
        return user.email
 
    # extract email from the commit string
 
    email = author_email(author)
 
    if email:
 
        return email
 

	
 
    # No valid email, not a valid user in the system, none!
 
    return None
 

	
 

	
 
def person(author, show_attr="username"):
 
    """Find the user identified by 'author', return one of the users attributes,
 
    default to the username attribute, None if there is no user"""
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    # if author is already an instance use it for extraction
 
    if isinstance(author, User):
 
        return person_getter(author)
 

	
 
    # Valid email in the attribute passed, see if they're in the system
 
    _email = email(author)
 
    if _email:
 
        user = User.get_by_email(_email, case_insensitive=True, cache=True)
 
        if user is not None:
 
            return person_getter(user)
 

	
 
    # Maybe it's a username?
 
    _author = author_name(author)
 
    user = User.get_by_username(_author, case_insensitive=True,
 
                                cache=True)
 
    user = user_or_none(author)
 
    if user is not None:
 
        return person_getter(user)
 

	
 
    # Still nothing?  Just pass back the author name if any, else the email
 
    return _author or _email
 
    return author_name(author) or email(author)
 

	
 

	
 
def person_by_id(id_, show_attr="username"):
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    #maybe it's an ID ?
 
    if str(id_).isdigit() or isinstance(id_, int):
 
        id_ = int(id_)
 
        user = User.get(id_)
 
        if user is not None:
 
            return person_getter(user)
 
    return id_
 

	
 

	
 
def desc_stylize(value):
 
    """
 
    converts tags from value into html equivalent
 

	
 
    :param value:
 
    """
 
    if not value:
 
        return ''
 

	
 
    value = re.sub(r'\[see\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
 
                   '<div class="metatag" tag="see">see =&gt; \\1 </div>', value)
 
    value = re.sub(r'\[license\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
 
                   '<div class="metatag" tag="license"><a href="http:\/\/www.opensource.org/licenses/\\1">\\1</a></div>', value)
 
    value = re.sub(r'\[(requires|recommends|conflicts|base)\ \=\>\ *([a-zA-Z0-9\-\/]*)\]',
 
                   '<div class="metatag" tag="\\1">\\1 =&gt; <a href="/\\2">\\2</a></div>', value)
 
    value = re.sub(r'\[(lang|language)\ \=\>\ *([a-zA-Z\-\/\#\+]*)\]',
 
                   '<div class="metatag" tag="lang">\\2</div>', value)
 
    value = re.sub(r'\[([a-z]+)\]',
 
                  '<div class="metatag" tag="\\1">\\1</div>', value)
 

	
 
    return value
 

	
 

	
 
def boolicon(value):
 
    """Returns boolean value of a value, represented as small html image of true/false
 
    icons
 

	
 
    :param value: value
 
    """
 

	
 
    if value:
 
        return HTML.tag('i', class_="icon-ok")
 
    else:
 
        return HTML.tag('i', class_="icon-minus-circled")
 

	
 

	
 
def action_parser(user_log, feed=False, parse_cs=False):
 
    """
 
    This helper will action_map the specified string action into translated
 
    fancy names with icons and links
 

	
 
    :param user_log: user log instance
 
    :param feed: use output for feeds (no html and fancy icons)
 
    :param parse_cs: parse Changesets into VCS instances
 
    """
 

	
 
    action = user_log.action
 
    action_params = ' '
 

	
 
    x = action.split(':')
 

	
 
    if len(x) > 1:
 
        action, action_params = x
 

	
 
    def get_cs_links():
 
        revs_limit = 3  # display this amount always
 
        revs_top_limit = 50  # show upto this amount of changesets hidden
 
        revs_ids = action_params.split(',')
 
        deleted = user_log.repository is None
 
        if deleted:
 
            return ','.join(revs_ids)
 

	
 
        repo_name = user_log.repository.repo_name
 

	
 
        def lnk(rev, repo_name):
 
            if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict):
 
                lazy_cs = True
 
                if getattr(rev, 'op', None) and getattr(rev, 'ref_name', None):
 
                    lazy_cs = False
 
                    lbl = '?'
 
                    if rev.op == 'delete_branch':
 
                        lbl = '%s' % _('Deleted branch: %s') % rev.ref_name
 
                        title = ''
 
                    elif rev.op == 'tag':
 
                        lbl = '%s' % _('Created tag: %s') % rev.ref_name
 
                        title = ''
 
                    _url = '#'
 

	
 
                else:
 
                    lbl = '%s' % (rev.short_id[:8])
 
                    _url = url('changeset_home', repo_name=repo_name,
kallithea/model/user.py
Show inline comments
 
@@ -254,197 +254,197 @@ class UserModel(BaseModel):
 
            raise DefaultUserException(
 
                _(u"You can't remove this user since it's"
 
                  " crucial for entire application")
 
            )
 
        if user.repositories:
 
            repos = [x.repo_name for x in user.repositories]
 
            raise UserOwnsReposException(
 
                _(u'User "%s" still owns %s repositories and cannot be '
 
                  'removed. Switch owners or remove those repositories: %s')
 
                % (user.username, len(repos), ', '.join(repos))
 
            )
 
        if user.repo_groups:
 
            repogroups = [x.group_name for x in user.repo_groups]
 
            raise UserOwnsReposException(
 
                _(u'User "%s" still owns %s repository groups and cannot be '
 
                  'removed. Switch owners or remove those repository groups: %s')
 
                % (user.username, len(repogroups), ', '.join(repogroups))
 
            )
 
        if user.user_groups:
 
            usergroups = [x.users_group_name for x in user.user_groups]
 
            raise UserOwnsReposException(
 
                _(u'User "%s" still owns %s user groups and cannot be '
 
                  'removed. Switch owners or remove those user groups: %s')
 
                % (user.username, len(usergroups), ', '.join(usergroups))
 
            )
 
        self.sa.delete(user)
 

	
 
        from kallithea.lib.hooks import log_delete_user
 
        log_delete_user(user.get_dict(), cur_user)
 

	
 
    def reset_password_link(self, data):
 
        from kallithea.lib.celerylib import tasks, run_task
 
        from kallithea.model.notification import EmailNotificationModel
 
        import kallithea.lib.helpers as h
 

	
 
        user_email = data['email']
 
        user = User.get_by_email(user_email)
 
        if user:
 
            log.debug('password reset user found %s' % user)
 
            link = h.canonical_url('reset_password_confirmation', key=user.api_key)
 
            reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
 
            body = EmailNotificationModel().get_email_tmpl(reg_type,
 
                                                           'txt',
 
                                                           user=user.short_contact,
 
                                                           reset_url=link)
 
            html_body = EmailNotificationModel().get_email_tmpl(reg_type,
 
                                                           'html',
 
                                                           user=user.short_contact,
 
                                                           reset_url=link)
 
            log.debug('sending email')
 
            run_task(tasks.send_email, [user_email],
 
                     _("Password reset link"), body, html_body)
 
            log.info('send new password mail to %s' % user_email)
 
        else:
 
            log.debug("password reset email %s not found" % user_email)
 

	
 
        return True
 

	
 
    def reset_password(self, data):
 
        from kallithea.lib.celerylib import tasks, run_task
 
        from kallithea.lib import auth
 
        user_email = data['email']
 
        user = User.get_by_email(user_email)
 
        new_passwd = auth.PasswordGenerator().gen_password(8,
 
                        auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
 
        if user:
 
            user.password = auth.get_crypt_password(new_passwd)
 
            Session().add(user)
 
            Session().commit()
 
            log.info('change password for %s' % user_email)
 
        if new_passwd is None:
 
            raise Exception('unable to generate new password')
 

	
 
        run_task(tasks.send_email, [user_email],
 
                 _('Your new password'),
 
                 _('Your new Kallithea password:%s') % (new_passwd,))
 
        log.info('send new password mail to %s' % user_email)
 

	
 
        return True
 

	
 
    def fill_data(self, auth_user, user_id=None, api_key=None, username=None):
 
        """
 
        Fetches auth_user by user_id,or api_key if present.
 
        Fills auth_user attributes with those taken from database.
 
        Additionally sets is_authenitated if lookup fails
 
        present in database
 

	
 
        :param auth_user: instance of user to set attributes
 
        :param user_id: user id to fetch by
 
        :param api_key: api key to fetch by
 
        :param username: username to fetch by
 
        """
 
        if user_id is None and api_key is None and username is None:
 
            raise Exception('You need to pass user_id, api_key or username')
 

	
 
        dbuser = None
 
        if user_id:
 
        if user_id is not None:
 
            dbuser = self.get(user_id)
 
        elif api_key:
 
        elif api_key is not None:
 
            dbuser = self.get_by_api_key(api_key)
 
        elif username:
 
        elif username is not None:
 
            dbuser = self.get_by_username(username)
 

	
 
        if dbuser is not None and dbuser.active:
 
            log.debug('filling %s data' % dbuser)
 
            for k, v in dbuser.get_dict().iteritems():
 
                if k not in ['api_keys', 'permissions']:
 
                    setattr(auth_user, k, v)
 
            return True
 
        return False
 

	
 
    def has_perm(self, user, perm):
 
        perm = self._get_perm(perm)
 
        user = self._get_user(user)
 

	
 
        return UserToPerm.query().filter(UserToPerm.user == user)\
 
            .filter(UserToPerm.permission == perm).scalar() is not None
 

	
 
    def grant_perm(self, user, perm):
 
        """
 
        Grant user global permissions
 

	
 
        :param user:
 
        :param perm:
 
        """
 
        user = self._get_user(user)
 
        perm = self._get_perm(perm)
 
        # if this permission is already granted skip it
 
        _perm = UserToPerm.query()\
 
            .filter(UserToPerm.user == user)\
 
            .filter(UserToPerm.permission == perm)\
 
            .scalar()
 
        if _perm:
 
            return
 
        new = UserToPerm()
 
        new.user = user
 
        new.permission = perm
 
        self.sa.add(new)
 
        return new
 

	
 
    def revoke_perm(self, user, perm):
 
        """
 
        Revoke users global permissions
 

	
 
        :param user:
 
        :param perm:
 
        """
 
        user = self._get_user(user)
 
        perm = self._get_perm(perm)
 

	
 
        obj = UserToPerm.query()\
 
                .filter(UserToPerm.user == user)\
 
                .filter(UserToPerm.permission == perm)\
 
                .scalar()
 
        if obj:
 
            self.sa.delete(obj)
 

	
 
    def add_extra_email(self, user, email):
 
        """
 
        Adds email address to UserEmailMap
 

	
 
        :param user:
 
        :param email:
 
        """
 
        from kallithea.model import forms
 
        form = forms.UserExtraEmailForm()()
 
        data = form.to_python(dict(email=email))
 
        user = self._get_user(user)
 

	
 
        obj = UserEmailMap()
 
        obj.user = user
 
        obj.email = data['email']
 
        self.sa.add(obj)
 
        return obj
 

	
 
    def delete_extra_email(self, user, email_id):
 
        """
 
        Removes email address from UserEmailMap
 

	
 
        :param user:
 
        :param email_id:
 
        """
 
        user = self._get_user(user)
 
        obj = UserEmailMap.query().get(email_id)
 
        if obj:
 
            self.sa.delete(obj)
 

	
 
    def add_extra_ip(self, user, ip):
 
        """
 
        Adds ip address to UserIpMap
 

	
 
        :param user:
 
        :param ip:
 
        """
 
        from kallithea.model import forms
 
        form = forms.UserExtraIpForm()()
 
        data = form.to_python(dict(ip=ip))
0 comments (0 inline, 0 general)