Changeset - 5725fa4cfecd
[Not reviewed]
default
0 14 0
Mads Kiilerich - 6 years ago 2019-08-04 00:07:10
mads@kiilerich.com
Grafted from: 7cc8ba8bc555
cleanup: minimize use of lambda expressions - we have 'def' for that purpose

Fix some flake8 warnings "E731 do not assign a lambda expression, use a def".
14 files changed with 71 insertions and 70 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/repo_groups.py
Show inline comments
 
@@ -83,57 +83,56 @@ class RepoGroupsController(BaseControlle
 
            data.update({'u_perm_%s' % p.user.username:
 
                             p.permission.permission_name})
 

	
 
        # fill repository group groups
 
        for p in repo_group.users_group_to_perm:
 
            data.update({'g_perm_%s' % p.users_group.users_group_name:
 
                             p.permission.permission_name})
 

	
 
        return data
 

	
 
    def _revoke_perms_on_yourself(self, form_result):
 
        _up = [u for u in form_result['perms_updates'] if request.authuser.username == u[0]]
 
        _new = [u for u in form_result['perms_new'] if request.authuser.username == u[0]]
 
        if _new and _new[0][1] != 'group.admin' or _up and _up[0][1] != 'group.admin':
 
            return True
 
        return False
 

	
 
    def index(self, format='html'):
 
        _list = RepoGroup.query(sorted=True).all()
 
        group_iter = RepoGroupList(_list, perm_level='admin')
 
        repo_groups_data = []
 
        _tmpl_lookup = app_globals.mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        repo_group_name = lambda repo_group_name, children_groups: (
 
            template.get_def("repo_group_name")
 
            .render_unicode(repo_group_name, children_groups, _=_, h=h, c=c)
 
        )
 
        repo_group_actions = lambda repo_group_id, repo_group_name, gr_count: (
 
            template.get_def("repo_group_actions")
 
            .render_unicode(repo_group_id, repo_group_name, gr_count, _=_, h=h, c=c,
 
                    ungettext=ungettext)
 
        )
 
        def repo_group_name(repo_group_name, children_groups):
 
            return template.get_def("repo_group_name") \
 
                .render_unicode(repo_group_name, children_groups, _=_, h=h, c=c)
 

	
 
        def repo_group_actions(repo_group_id, repo_group_name, gr_count):
 
            return template.get_def("repo_group_actions") \
 
                .render_unicode(repo_group_id, repo_group_name, gr_count, _=_, h=h, c=c,
 
                        ungettext=ungettext)
 

	
 
        for repo_gr in group_iter:
 
            children_groups = [g.name for g in repo_gr.parents] + [repo_gr.name]
 
            repo_count = repo_gr.repositories.count()
 
            repo_groups_data.append({
 
                "raw_name": repo_gr.group_name,
 
                "group_name": repo_group_name(repo_gr.group_name, children_groups),
 
                "desc": h.escape(repo_gr.group_description),
 
                "repos": repo_count,
 
                "owner": h.person(repo_gr.owner),
 
                "action": repo_group_actions(repo_gr.group_id, repo_gr.group_name,
 
                                             repo_count)
 
            })
 

	
 
        c.data = {
 
            "sort": None,
 
            "dir": "asc",
 
            "records": repo_groups_data
 
        }
 

	
 
        return render('admin/repo_groups/repo_groups.html')
 

	
 
    def create(self):
 
        self.__load_defaults()
kallithea/controllers/admin/user_groups.py
Show inline comments
 
@@ -68,58 +68,57 @@ class UserGroupsController(BaseControlle
 
        c.group_members = [(x.user_id, x.username) for x in c.group_members_obj]
 
        c.available_members = sorted(((x.user_id, x.username) for x in
 
                                      User.query().all()),
 
                                     key=lambda u: u[1].lower())
 

	
 
    def __load_defaults(self, user_group_id):
 
        """
 
        Load defaults settings for edit, and update
 

	
 
        :param user_group_id:
 
        """
 
        user_group = UserGroup.get_or_404(user_group_id)
 
        data = user_group.get_dict()
 
        return data
 

	
 
    def index(self, format='html'):
 
        _list = UserGroup.query() \
 
                        .order_by(func.lower(UserGroup.users_group_name)) \
 
                        .all()
 
        group_iter = UserGroupList(_list, perm_level='admin')
 
        user_groups_data = []
 
        _tmpl_lookup = app_globals.mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        user_group_name = lambda user_group_id, user_group_name: (
 
            template.get_def("user_group_name")
 
            .render_unicode(user_group_id, user_group_name, _=_, h=h, c=c)
 
        )
 
        user_group_actions = lambda user_group_id, user_group_name: (
 
            template.get_def("user_group_actions")
 
            .render_unicode(user_group_id, user_group_name, _=_, h=h, c=c)
 
        )
 
        def user_group_name(user_group_id, user_group_name):
 
            return template.get_def("user_group_name") \
 
                .render_unicode(user_group_id, user_group_name, _=_, h=h, c=c)
 

	
 
        def user_group_actions(user_group_id, user_group_name):
 
            return template.get_def("user_group_actions") \
 
                .render_unicode(user_group_id, user_group_name, _=_, h=h, c=c)
 

	
 
        for user_gr in group_iter:
 

	
 
            user_groups_data.append({
 
                "raw_name": user_gr.users_group_name,
 
                "group_name": user_group_name(user_gr.users_group_id,
 
                                              user_gr.users_group_name),
 
                "desc": h.escape(user_gr.user_group_description),
 
                "members": len(user_gr.members),
 
                "active": h.boolicon(user_gr.users_group_active),
 
                "owner": h.person(user_gr.owner.username),
 
                "action": user_group_actions(user_gr.users_group_id, user_gr.users_group_name)
 
            })
 

	
 
        c.data = {
 
            "sort": None,
 
            "dir": "asc",
 
            "records": user_groups_data
 
        }
 

	
 
        return render('admin/user_groups/user_groups.html')
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true')
 
    def create(self):
 
        users_group_form = UserGroupForm()()
 
        try:
 
            form_result = users_group_form.to_python(dict(request.POST))
kallithea/controllers/admin/users.py
Show inline comments
 
@@ -54,55 +54,55 @@ from kallithea.model.user import UserMod
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UsersController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 

	
 
    @LoginRequired()
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def _before(self, *args, **kwargs):
 
        super(UsersController, self)._before(*args, **kwargs)
 

	
 
    def index(self, format='html'):
 
        c.users_list = User.query().order_by(User.username) \
 
                        .filter_by(is_default_user=False) \
 
                        .order_by(func.lower(User.username)) \
 
                        .all()
 

	
 
        users_data = []
 
        _tmpl_lookup = app_globals.mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        grav_tmpl = '<div class="gravatar">%s</div>'
 

	
 
        username = lambda user_id, username: (
 
                template.get_def("user_name")
 
                .render_unicode(user_id, username, _=_, h=h, c=c))
 
        def username(user_id, username):
 
            return template.get_def("user_name") \
 
                .render_unicode(user_id, username, _=_, h=h, c=c)
 

	
 
        user_actions = lambda user_id, username: (
 
                template.get_def("user_actions")
 
                .render_unicode(user_id, username, _=_, h=h, c=c))
 
        def user_actions(user_id, username):
 
            return template.get_def("user_actions") \
 
                .render_unicode(user_id, username, _=_, h=h, c=c)
 

	
 
        for user in c.users_list:
 
            users_data.append({
 
                "gravatar": grav_tmpl % h.gravatar(user.email, size=20),
 
                "raw_name": user.username,
 
                "username": username(user.user_id, user.username),
 
                "firstname": h.escape(user.name),
 
                "lastname": h.escape(user.lastname),
 
                "last_login": h.fmt_date(user.last_login),
 
                "last_login_raw": datetime_to_time(user.last_login),
 
                "active": h.boolicon(user.active),
 
                "admin": h.boolicon(user.admin),
 
                "extern_type": user.extern_type,
 
                "extern_name": user.extern_name,
 
                "action": user_actions(user.user_id, user.username),
 
            })
 

	
 
        c.data = {
 
            "sort": None,
 
            "dir": "asc",
 
            "records": users_data
 
        }
 

	
 
        return render('admin/users/users.html')
kallithea/lib/auth_modules/auth_ldap.py
Show inline comments
 
@@ -306,49 +306,50 @@ class KallitheaAuthPlugin(auth_modules.K
 
            'base_dn': settings.get('base_dn', ''),
 
            'port': settings.get('port'),
 
            'bind_dn': settings.get('dn_user'),
 
            'bind_pass': settings.get('dn_pass'),
 
            'tls_kind': settings.get('tls_kind'),
 
            'tls_reqcert': settings.get('tls_reqcert'),
 
            'cacertdir': settings.get('cacertdir'),
 
            'ldap_filter': settings.get('filter'),
 
            'search_scope': settings.get('search_scope'),
 
            'attr_login': settings.get('attr_login'),
 
            'ldap_version': 3,
 
        }
 

	
 
        if kwargs['bind_dn'] and not kwargs['bind_pass']:
 
            log.debug('Using dynamic binding.')
 
            kwargs['bind_dn'] = kwargs['bind_dn'].replace('$login', username)
 
            kwargs['bind_pass'] = password
 
        log.debug('Checking for ldap authentication')
 

	
 
        try:
 
            aldap = AuthLdap(**kwargs)
 
            (user_dn, ldap_attrs) = aldap.authenticate_ldap(username, password)
 
            log.debug('Got ldap DN response %s', user_dn)
 

	
 
            get_ldap_attr = lambda k: ldap_attrs.get(settings.get(k), [''])[0]
 
            def get_ldap_attr(k):
 
                return ldap_attrs.get(settings.get(k), [''])[0]
 

	
 
            # old attrs fetched from Kallithea database
 
            admin = getattr(userobj, 'admin', False)
 
            email = getattr(userobj, 'email', '')
 
            firstname = getattr(userobj, 'firstname', '')
 
            lastname = getattr(userobj, 'lastname', '')
 

	
 
            user_data = {
 
                'username': username,
 
                'firstname': get_ldap_attr('attr_firstname') or firstname,
 
                'lastname': get_ldap_attr('attr_lastname') or lastname,
 
                'groups': [],
 
                'email': get_ldap_attr('attr_email') or email,
 
                'admin': admin,
 
                'extern_name': user_dn,
 
            }
 
            log.info('user %s authenticated correctly', user_data['username'])
 
            return user_data
 

	
 
        except LdapUsernameError:
 
            log.info('Error authenticating %s with LDAP: User not found', username)
 
        except LdapPasswordError:
 
            log.info('Error authenticating %s with LDAP: Password error', username)
 
        except LdapImportError:
kallithea/lib/celerylib/tasks.py
Show inline comments
 
@@ -45,64 +45,66 @@ from kallithea.lib.utils import action_l
 
from kallithea.lib.utils2 import ascii_bytes, str2bool
 
from kallithea.lib.vcs.utils import author_email
 
from kallithea.model.db import RepoGroup, Repository, Statistics, User
 

	
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats', 'send_email']
 

	
 

	
 
log = celery.utils.log.get_task_logger(__name__)
 

	
 

	
 
@celerylib.task
 
@celerylib.locked_task
 
@celerylib.dbsession
 
def whoosh_index(repo_location, full_index):
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    celerylib.get_session() # initialize database connection
 

	
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location) \
 
                         .run(full_index=full_index)
 

	
 

	
 
# for js data compatibility cleans the key for person from '
 
def akc(k):
 
    return person(k).replace('"', '')
 

	
 

	
 
@celerylib.task
 
@celerylib.dbsession
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
 
    DBS = celerylib.get_session()
 
    lockkey = celerylib.__get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    lockkey_path = config.get('cache_dir') or config['app_conf']['cache_dir']  # Backward compatibility for TurboGears < 2.4
 

	
 
    log.info('running task with lockkey %s', lockkey)
 

	
 
    try:
 
        lock = celerylib.DaemonLock(os.path.join(lockkey_path, lockkey))
 

	
 
        # for js data compatibility cleans the key for person from '
 
        akc = lambda k: person(k).replace('"', "")
 

	
 
        co_day_auth_aggr = {}
 
        commits_by_day_aggregate = {}
 
        repo = Repository.get_by_repo_name(repo_name)
 
        if repo is None:
 
            return True
 

	
 
        repo = repo.scm_instance
 
        repo_size = repo.count()
 
        # return if repo have no revisions
 
        if repo_size < 1:
 
            lock.release()
 
            return True
 

	
 
        skip_date_limit = True
 
        parse_limit = int(config.get('commit_parse_limit'))
 
        last_rev = None
 
        last_cs = None
 
        timegetter = itemgetter('time')
 

	
 
        dbrepo = DBS.query(Repository) \
 
            .filter(Repository.repo_name == repo_name).scalar()
 
        cur_stats = DBS.query(Statistics) \
 
            .filter(Statistics.repository == dbrepo).scalar()
 

	
kallithea/lib/helpers.py
Show inline comments
 
@@ -481,53 +481,61 @@ def flash(message, category, logf=None):
 
        # render to HTML for storing in cookie
 
        safe_message = str(message)
 
    else:
 
        # Apply str - the message might be an exception with __str__
 
        # Escape, so we can trust the result without further escaping, without any risk of injection
 
        safe_message = html_escape(str(message))
 
    if logf is None:
 
        logf = log.info
 
        if category == 'success':
 
            logf = log.debug
 

	
 
    logf('Flash %s: %s', category, safe_message)
 

	
 
    _session_flash_messages(append=(category, safe_message))
 

	
 

	
 
def pop_flash_messages():
 
    """Return all accumulated messages and delete them from the session.
 

	
 
    The return value is a list of ``Message`` objects.
 
    """
 
    return [_Message(category, message) for category, message in _session_flash_messages(clear=True)]
 

	
 

	
 
age = lambda x, y=False: _age(x, y)
 
capitalize = lambda x: x.capitalize()
 
def age(x, y=False):
 
    return _age(x, y)
 

	
 
def capitalize(x):
 
    return x.capitalize()
 

	
 
email = author_email
 
short_id = lambda x: x[:12]
 
hide_credentials = lambda x: ''.join(credentials_filter(x))
 

	
 
def short_id(x):
 
    return x[:12]
 

	
 
def hide_credentials(x):
 
    return ''.join(credentials_filter(x))
 

	
 

	
 
def show_id(cs):
 
    """
 
    Configurable function that shows ID
 
    by default it's r123:fffeeefffeee
 

	
 
    :param cs: changeset instance
 
    """
 
    from kallithea import CONFIG
 
    def_len = safe_int(CONFIG.get('show_sha_length', 12))
 
    show_rev = str2bool(CONFIG.get('show_revision_number', False))
 

	
 
    raw_id = cs.raw_id[:def_len]
 
    if show_rev:
 
        return 'r%s:%s' % (cs.revision, raw_id)
 
    else:
 
        return raw_id
 

	
 

	
 
def fmt_date(date):
 
    if date:
 
        return date.strftime("%Y-%m-%d %H:%M:%S")
 
    return ""
 
@@ -581,57 +589,54 @@ def email_or_none(author):
 
        return email
 

	
 
    # No valid email, not a valid user in the system, none!
 
    return None
 

	
 

	
 
def person(author, show_attr="username"):
 
    """Find the user identified by 'author', return one of the users attributes,
 
    default to the username attribute, None if there is no user"""
 
    from kallithea.model.db import User
 
    # if author is already an instance use it for extraction
 
    if isinstance(author, User):
 
        return getattr(author, show_attr)
 

	
 
    value = user_attr_or_none(author, show_attr)
 
    if value is not None:
 
        return value
 

	
 
    # Still nothing?  Just pass back the author name if any, else the email
 
    return author_name(author) or email(author)
 

	
 

	
 
def person_by_id(id_, show_attr="username"):
 
    from kallithea.model.db import User
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    # maybe it's an ID ?
 
    if str(id_).isdigit() or isinstance(id_, int):
 
        id_ = int(id_)
 
        user = User.get(id_)
 
        if user is not None:
 
            return person_getter(user)
 
            return getattr(user, show_attr)
 
    return id_
 

	
 

	
 
def boolicon(value):
 
    """Returns boolean value of a value, represented as small html image of true/false
 
    icons
 

	
 
    :param value: value
 
    """
 

	
 
    if value:
 
        return HTML.tag('i', class_="icon-ok")
 
    else:
 
        return HTML.tag('i', class_="icon-minus-circled")
 

	
 

	
 
def action_parser(user_log, feed=False, parse_cs=False):
 
    """
 
    This helper will action_map the specified string action into translated
 
    fancy names with icons and links
 

	
 
    :param user_log: user log instance
 
    :param feed: use output for feeds (no html and fancy icons)
 
    :param parse_cs: parse Changesets into VCS instances
 
@@ -841,52 +846,49 @@ def action_parser(user_log, feed=False, 
 
        'user_closed_pull_request':    (_('[closed] pull request for'),
 
                                        get_pull_request, 'icon-ok'),
 
        'push':                        (_('[pushed] into'),
 
                                        get_cs_links, 'icon-move-up'),
 
        'push_local':                  (_('[committed via Kallithea] into repository'),
 
                                        get_cs_links, 'icon-pencil'),
 
        'push_remote':                 (_('[pulled from remote] into repository'),
 
                                        get_cs_links, 'icon-move-up'),
 
        'pull':                        (_('[pulled] from'),
 
                                        None, 'icon-move-down'),
 
        'started_following_repo':      (_('[started following] repository'),
 
                                        None, 'icon-heart'),
 
        'stopped_following_repo':      (_('[stopped following] repository'),
 
                                        None, 'icon-heart-empty'),
 
    }
 

	
 
    action_str = action_map.get(action, action)
 
    if feed:
 
        action = action_str[0].replace('[', '').replace(']', '')
 
    else:
 
        action = action_str[0] \
 
            .replace('[', '<b>') \
 
            .replace(']', '</b>')
 

	
 
    action_params_func = lambda: ""
 

	
 
    if callable(action_str[1]):
 
        action_params_func = action_str[1]
 
    action_params_func = action_str[1] if callable(action_str[1]) else (lambda: "")
 

	
 
    def action_parser_icon():
 
        action = user_log.action
 
        action_params = None
 
        x = action.split(':')
 

	
 
        if len(x) > 1:
 
            action, action_params = x
 

	
 
        ico = action_map.get(action, ['', '', ''])[2]
 
        html = """<i class="%s"></i>""" % ico
 
        return literal(html)
 

	
 
    # returned callbacks we need to call to get
 
    return [lambda: literal(action), action_params_func, action_parser_icon]
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 
def gravatar_div(email_address, cls='', size=30, **div_attributes):
 
    """Return an html literal with a span around a gravatar if they are enabled.
 
    Extra keyword parameters starting with 'div_' will get the prefix removed
 
    and '_' changed to '-' and be used as attributes on the div. The default
 
@@ -1155,49 +1157,50 @@ def linkify_others(t, l):
 
    links = []
 
    for e in urls.split(t):
 
        if e.strip() and not urls.match(e):
 
            links.append('<a class="message-link" href="%s">%s</a>' % (l, e))
 
        else:
 
            links.append(e)
 

	
 
    return ''.join(links)
 

	
 

	
 
# Global variable that will hold the actual urlify_issues function body.
 
# Will be set on first use when the global configuration has been read.
 
_urlify_issues_f = None
 

	
 

	
 
def urlify_issues(newtext, repo_name):
 
    """Urlify issue references according to .ini configuration"""
 
    global _urlify_issues_f
 
    if _urlify_issues_f is None:
 
        from kallithea import CONFIG
 
        from kallithea.model.db import URL_SEP
 
        assert CONFIG['sqlalchemy.url'] # make sure config has been loaded
 

	
 
        # Build chain of urlify functions, starting with not doing any transformation
 
        tmp_urlify_issues_f = lambda s: s
 
        def tmp_urlify_issues_f(s):
 
            return s
 

	
 
        issue_pat_re = re.compile(r'issue_pat(.*)')
 
        for k in CONFIG:
 
            # Find all issue_pat* settings that also have corresponding server_link and prefix configuration
 
            m = issue_pat_re.match(k)
 
            if m is None:
 
                continue
 
            suffix = m.group(1)
 
            issue_pat = CONFIG.get(k)
 
            issue_server_link = CONFIG.get('issue_server_link%s' % suffix)
 
            issue_sub = CONFIG.get('issue_sub%s' % suffix)
 
            if not issue_pat or not issue_server_link or issue_sub is None: # issue_sub can be empty but should be present
 
                log.error('skipping incomplete issue pattern %r: %r -> %r %r', suffix, issue_pat, issue_server_link, issue_sub)
 
                continue
 

	
 
            # Wrap tmp_urlify_issues_f with substitution of this pattern, while making sure all loop variables (and compiled regexpes) are bound
 
            try:
 
                issue_re = re.compile(issue_pat)
 
            except re.error as e:
 
                log.error('skipping invalid issue pattern %r: %r -> %r %r. Error: %s', suffix, issue_pat, issue_server_link, issue_sub, str(e))
 
                continue
 

	
 
            log.debug('issue pattern %r: %r -> %r %r', suffix, issue_pat, issue_server_link, issue_sub)
 

	
 
@@ -1207,51 +1210,51 @@ def urlify_issues(newtext, repo_name):
 
                    issue_url = match_obj.expand(issue_server_link)
 
                except (IndexError, re.error) as e:
 
                    log.error('invalid issue_url setting %r -> %r %r. Error: %s', issue_pat, issue_server_link, issue_sub, str(e))
 
                    issue_url = issue_server_link
 
                issue_url = issue_url.replace('{repo}', repo_name)
 
                issue_url = issue_url.replace('{repo_name}', repo_name.split(URL_SEP)[-1])
 
                # if issue_sub is empty use the matched issue reference verbatim
 
                if not issue_sub:
 
                    issue_text = match_obj.group()
 
                else:
 
                    try:
 
                        issue_text = match_obj.expand(issue_sub)
 
                    except (IndexError, re.error) as e:
 
                        log.error('invalid issue_sub setting %r -> %r %r. Error: %s', issue_pat, issue_server_link, issue_sub, str(e))
 
                        issue_text = match_obj.group()
 

	
 
                return (
 
                    '<a class="issue-tracker-link" href="%(url)s">'
 
                    '%(text)s'
 
                    '</a>'
 
                    ) % {
 
                     'url': issue_url,
 
                     'text': issue_text,
 
                    }
 
            tmp_urlify_issues_f = (lambda s,
 
                                          issue_re=issue_re, issues_replace=issues_replace, chain_f=tmp_urlify_issues_f:
 
                                   issue_re.sub(issues_replace, chain_f(s)))
 

	
 
            def tmp_urlify_issues_f(s, issue_re=issue_re, issues_replace=issues_replace, chain_f=tmp_urlify_issues_f):
 
                return issue_re.sub(issues_replace, chain_f(s))
 

	
 
        # Set tmp function globally - atomically
 
        _urlify_issues_f = tmp_urlify_issues_f
 

	
 
    return _urlify_issues_f(newtext)
 

	
 

	
 
def render_w_mentions(source, repo_name=None):
 
    """
 
    Render plain text with revision hashes and issue references urlified
 
    and with @mention highlighting.
 
    """
 
    s = safe_str(source)
 
    s = urlify_text(s, repo_name=repo_name)
 
    return literal('<div class="formatted-fixed">%s</div>' % s)
 

	
 

	
 
def short_ref(ref_type, ref_name):
 
    if ref_type == 'rev':
 
        return short_id(ref_name)
 
    return ref_name
 

	
 

	
 
def link_to_ref(repo_name, ref_type, ref_name, rev=None):
kallithea/lib/rcmail/response.py
Show inline comments
 
@@ -23,49 +23,51 @@
 
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 
# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
# POSSIBILITY OF SUCH DAMAGE.
 

	
 
import mimetypes
 
import os
 
import string
 
from email import encoders
 
from email.charset import Charset
 
from email.mime.base import MIMEBase
 
from email.utils import parseaddr
 

	
 

	
 
ADDRESS_HEADERS_WHITELIST = ['From', 'To', 'Delivered-To', 'Cc']
 
DEFAULT_ENCODING = "utf-8"
 
VALUE_IS_EMAIL_ADDRESS = lambda v: '@' in v
 

	
 
def VALUE_IS_EMAIL_ADDRESS(v):
 
    return '@' in v
 

	
 

	
 
def normalize_header(header):
 
    return string.capwords(header.lower(), '-')
 

	
 

	
 
class EncodingError(Exception):
 
    """Thrown when there is an encoding error."""
 
    pass
 

	
 

	
 
class MailBase(object):
 
    """MailBase is used as the basis of lamson.mail and contains the basics of
 
    encoding an email.  You actually can do all your email processing with this
 
    class, but it's more raw.
 
    """
 
    def __init__(self, items=()):
 
        self.headers = dict(items)
 
        self.parts = []
 
        self.body = None
 
        self.content_encoding = {'Content-Type': (None, {}),
 
                                 'Content-Disposition': (None, {}),
 
                                 'Content-Transfer-Encoding': (None, {})}
 

	
kallithea/lib/vcs/backends/git/changeset.py
Show inline comments
 
@@ -414,50 +414,50 @@ class GitChangeset(BaseChangeset):
 
                filenodes.append(FileNode(obj_path, changeset=self, mode=stat))
 
            else:
 
                raise ChangesetError("Requested object should be Tree "
 
                                     "or Blob, is %r" % type(obj))
 
        nodes = dirnodes + filenodes
 
        for node in nodes:
 
            if node.path not in self.nodes:
 
                self.nodes[node.path] = node
 
        nodes.sort()
 
        return nodes
 

	
 
    def get_node(self, path):
 
        """
 
        Returns ``Node`` object from the given ``path``. If there is no node at
 
        the given ``path``, ``ChangesetError`` would be raised.
 
        """
 
        path = path.rstrip('/')
 
        if path not in self.nodes:
 
            try:
 
                id_ = self._get_id_for_path(path)
 
            except ChangesetError:
 
                raise NodeDoesNotExistError("Cannot find one of parents' "
 
                    "directories for a given path: %s" % path)
 

	
 
            _GL = lambda m: m and objects.S_ISGITLINK(m)
 
            if _GL(self._stat_modes.get(path)):
 
            stat = self._stat_modes.get(path)
 
            if stat and objects.S_ISGITLINK(stat):
 
                tree = self.repository._repo[self._tree_id]
 
                cf = ConfigFile.from_file(BytesIO(self.repository._repo.get_object(tree[b'.gitmodules'][1]).data))
 
                url = ascii_str(cf.get(('submodule', path), 'url'))
 
                node = SubModuleNode(path, url=url, changeset=ascii_str(id_),
 
                                     alias=self.repository.alias)
 
            else:
 
                obj = self.repository._repo.get_object(id_)
 

	
 
                if isinstance(obj, objects.Tree):
 
                    if path == '':
 
                        node = RootNode(changeset=self)
 
                    else:
 
                        node = DirNode(path, changeset=self)
 
                    node._tree = obj
 
                elif isinstance(obj, objects.Blob):
 
                    node = FileNode(path, changeset=self)
 
                    node._blob = obj
 
                else:
 
                    raise NodeDoesNotExistError("There is no file nor directory "
 
                        "at the given path: '%s' at revision %s"
 
                        % (path, self.short_id))
 
            # cache node
 
            self.nodes[path] = node
 
        return self.nodes[path]
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
@@ -337,69 +337,66 @@ class GitRepository(BaseRepository):
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            idx_loc = '' if self.bare else '.git'
 
            # fallback to filesystem
 
            in_path = os.path.join(self.path, idx_loc, "index")
 
            he_path = os.path.join(self.path, idx_loc, "HEAD")
 
            if os.path.exists(in_path):
 
                return os.stat(in_path).st_mtime
 
            else:
 
                return os.stat(he_path).st_mtime
 

	
 
    @LazyProperty
 
    def description(self):
 
        return safe_str(self._repo.get_description() or b'unknown')
 

	
 
    @LazyProperty
 
    def contact(self):
 
        undefined_contact = 'Unknown'
 
        return undefined_contact
 

	
 
    @property
 
    def branches(self):
 
        if not self.revisions:
 
            return {}
 
        sortkey = lambda ctx: ctx[0]
 
        _branches = [(safe_str(key), ascii_str(sha))
 
                     for key, (sha, type_) in self._parsed_refs.items() if type_ == b'H']
 
        return OrderedDict(sorted(_branches, key=sortkey, reverse=False))
 
        return OrderedDict(sorted(_branches, key=(lambda ctx: ctx[0]), reverse=False))
 

	
 
    @LazyProperty
 
    def closed_branches(self):
 
        return {}
 

	
 
    @LazyProperty
 
    def tags(self):
 
        return self._get_tags()
 

	
 
    def _get_tags(self):
 
        if not self.revisions:
 
            return {}
 

	
 
        sortkey = lambda ctx: ctx[0]
 
        _tags = [(safe_str(key), ascii_str(sha))
 
                 for key, (sha, type_) in self._parsed_refs.items() if type_ == b'T']
 
        return OrderedDict(sorted(_tags, key=sortkey, reverse=True))
 
        return OrderedDict(sorted(_tags, key=(lambda ctx: ctx[0]), reverse=True))
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None,
 
            **kwargs):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        if name in self.tags:
 
            raise TagAlreadyExistError("Tag %s already exists" % name)
 
        changeset = self.get_changeset(revision)
 
        message = message or "Added tag %s for commit %s" % (name,
 
            changeset.raw_id)
 
        self._repo.refs[b"refs/tags/%s" % safe_bytes(name)] = changeset._commit.id
 

	
 
        self._parsed_refs = self._get_parsed_refs()
 
        self.tags = self._get_tags()
 
        return changeset
kallithea/lib/vcs/conf/settings.py
Show inline comments
 
import os
 
import tempfile
 

	
 
from kallithea.lib.vcs.utils import aslist
 
from kallithea.lib.vcs.utils.paths import get_user_home
 

	
 

	
 
abspath = lambda * p: os.path.abspath(os.path.join(*p))
 
def abspath(*p):
 
    return os.path.abspath(os.path.join(*p))
 

	
 
VCSRC_PATH = os.environ.get('VCSRC_PATH')
 

	
 
if not VCSRC_PATH:
 
    HOME_ = get_user_home()
 
    if not HOME_:
 
        HOME_ = tempfile.gettempdir()
 

	
 
VCSRC_PATH = VCSRC_PATH or abspath(HOME_, '.vcsrc')
 
if os.path.isdir(VCSRC_PATH):
 
    VCSRC_PATH = os.path.join(VCSRC_PATH, '__init__.py')
 

	
 
# list of default encoding used in safe_str/safe_bytes methods
 
DEFAULT_ENCODINGS = aslist('utf-8')
 

	
 
# path to git executable run by run_git_command function
 
GIT_EXECUTABLE_PATH = 'git'
 
# can be also --branches --tags
 
GIT_REV_FILTER = '--all'
 

	
 
BACKENDS = {
 
    'hg': 'kallithea.lib.vcs.backends.hg.MercurialRepository',
 
    'git': 'kallithea.lib.vcs.backends.git.GitRepository',
 
}
kallithea/lib/vcs/utils/paths.py
Show inline comments
 
import os
 

	
 

	
 
abspath = lambda * p: os.path.abspath(os.path.join(*p))
 
def abspath(*p):
 
    return os.path.abspath(os.path.join(*p))
 

	
 

	
 
def get_dirs_for_path(*paths):
 
    """
 
    Returns list of directories, including intermediate.
 
    """
 
    for path in paths:
 
        head = path
 
        while head:
 
            head, _tail = os.path.split(head)
 
            if head:
 
                yield head
 
            else:
 
                # We don't need to yield empty path
 
                break
 

	
 

	
 
def get_dir_size(path):
 
    root_path = path
 
    size = 0
 
    for path, dirs, files in os.walk(root_path):
 
        for f in files:
 
            try:
 
                size += os.path.getsize(os.path.join(path, f))
kallithea/model/repo.py
Show inline comments
 
@@ -88,54 +88,53 @@ class RepoModel(object):
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_id))
 
        return repo.scalar()
 

	
 
    def get_repo(self, repository):
 
        return Repository.guess_instance(repository)
 

	
 
    def get_by_repo_name(self, repo_name, cache=False):
 
        repo = Repository.query() \
 
            .filter(Repository.repo_name == repo_name)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_name))
 
        return repo.scalar()
 

	
 
    def get_all_user_repos(self, user):
 
        """
 
        Gets all repositories that user have at least read access
 

	
 
        :param user:
 
        """
 
        from kallithea.lib.auth import AuthUser
 
        user = User.guess_instance(user)
 
        repos = AuthUser(dbuser=user).permissions['repositories']
 
        access_check = lambda r: r[1] in ['repository.read',
 
                                          'repository.write',
 
                                          'repository.admin']
 
        repos = [x[0] for x in filter(access_check, repos.items())]
 
        auth_user = AuthUser(dbuser=User.guess_instance(user))
 
        repos = [repo_name
 
            for repo_name, perm in auth_user.permissions['repositories'].items()
 
            if perm in ['repository.read', 'repository.write', 'repository.admin']
 
            ]
 
        return Repository.query().filter(Repository.repo_name.in_(repos))
 

	
 
    @classmethod
 
    def _render_datatable(cls, tmpl, *args, **kwargs):
 
        from tg import tmpl_context as c, request, app_globals
 
        from tg.i18n import ugettext as _
 

	
 
        _tmpl_lookup = app_globals.mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        tmpl = template.get_def(tmpl)
 
        kwargs.update(dict(_=_, h=h, c=c, request=request))
 
        return tmpl.render_unicode(*args, **kwargs)
 

	
 
    def get_repos_as_dict(self, repos_list, repo_groups_list=None,
 
                          admin=False,
 
                          short_name=False):
 
        """Return repository list for use by DataTable.
 
        repos_list: list of repositories - but will be filtered for read permission.
 
        repo_groups_list: added at top of list without permission check.
 
        admin: return data for action column.
 
        """
 
        _render = self._render_datatable
 
        from tg import tmpl_context as c, request
kallithea/model/validators.py
Show inline comments
 
@@ -165,53 +165,49 @@ def ValidUserGroup(edit=False, old_data=
 

	
 
    return _validator
 

	
 

	
 
def ValidRepoGroup(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'parent_group_id': _('Cannot assign this group as parent'),
 
            'group_exists': _('Group "%(group_name)s" already exists'),
 
            'repo_exists':
 
                _('Repository with name "%(group_name)s" already exists')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            # TODO WRITE VALIDATIONS
 
            group_name = value.get('group_name')
 
            parent_group_id = value.get('parent_group_id')
 

	
 
            # slugify repo group just in case :)
 
            slug = repo_name_slug(group_name)
 

	
 
            # check for parent of self
 
            parent_of_self = lambda: (
 
                old_data['group_id'] == parent_group_id
 
                if parent_group_id else False
 
            )
 
            if edit and parent_of_self():
 
            if edit and parent_group_id and old_data['group_id'] == parent_group_id:
 
                msg = self.message('parent_group_id', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
            old_gname = None
 
            if edit:
 
                old_gname = RepoGroup.get(old_data.get('group_id')).group_name
 

	
 
            if old_gname != group_name or not edit:
 

	
 
                # check group
 
                gr = RepoGroup.query() \
 
                      .filter(func.lower(RepoGroup.group_name) == func.lower(slug)) \
 
                      .filter(RepoGroup.parent_group_id == parent_group_id) \
 
                      .scalar()
 
                if gr is not None:
 
                    msg = self.message('group_exists', state, group_name=slug)
 
                    raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(group_name=msg)
 
                    )
 

	
 
                # check for same repo
 
                repo = Repository.query() \
setup.py
Show inline comments
 
#!/usr/bin/env python3
 
# -*- coding: utf-8 -*-
 
import os
 
import platform
 
import sys
 

	
 
import setuptools
 
# monkey patch setuptools to use distutils owner/group functionality
 
from setuptools.command import sdist
 

	
 

	
 
if sys.version_info < (3, 6):
 
    raise Exception('Kallithea requires Python 3.6 or later')
 

	
 

	
 
here = os.path.abspath(os.path.dirname(__file__))
 

	
 

	
 
def _get_meta_var(name, data, callback_handler=None):
 
    import re
 
    matches = re.compile(r'(?:%s)\s*=\s*(.*)' % name).search(data)
 
    if matches:
 
        if not callable(callback_handler):
 
            callback_handler = lambda v: v
 

	
 
        return callback_handler(eval(matches.groups()[0]))
 
        s = eval(matches.groups()[0])
 
        if callable(callback_handler):
 
            return callback_handler(s)
 
        return s
 

	
 
_meta = open(os.path.join(here, 'kallithea', '__init__.py'), 'r')
 
_metadata = _meta.read()
 
_meta.close()
 

	
 
callback = lambda V: ('.'.join(map(str, V[:3])) + '.'.join(V[3:]))
 
def callback(V):
 
    return '.'.join(map(str, V[:3])) + '.'.join(V[3:])
 
__version__ = _get_meta_var('VERSION', _metadata, callback)
 
__license__ = _get_meta_var('__license__', _metadata)
 
__author__ = _get_meta_var('__author__', _metadata)
 
__url__ = _get_meta_var('__url__', _metadata)
 
# defines current platform
 
__platform__ = platform.system()
 

	
 
is_windows = __platform__ in ['Windows']
 

	
 
requirements = [
 
    "alembic >= 1.0.10, < 1.5",
 
    "gearbox >= 0.1.0, < 1",
 
    "waitress >= 0.8.8, < 1.5",
 
    "WebOb >= 1.8, < 1.9",
 
    "backlash >= 0.1.2, < 1",
 
    "TurboGears2 >= 2.4, < 2.5",
 
    "tgext.routes >= 0.2.0, < 1",
 
    "Beaker >= 1.10.1, < 2",
 
    "WebHelpers2 >= 2.0, < 2.1",
 
    "FormEncode >= 1.3.1, < 1.4",
 
    "SQLAlchemy >= 1.2.9, < 1.4",
 
    "Mako >= 0.9.1, < 1.2",
 
    "Pygments >= 2.2.0, < 2.6",
 
    "Whoosh >= 2.7.1, < 2.8",
0 comments (0 inline, 0 general)