Changeset - c59e914c4887
[Not reviewed]
default
0 6 0
Mads Kiilerich - 6 years ago 2020-02-03 16:08:50
mads@kiilerich.com
Grafted from: 54b0176aa9e6
py3: use exception .args instead of .message

Args seems slightly more fragile and *could* introduce problems for trivial use
if args is empty. But .message is gone.
6 files changed with 15 insertions and 15 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/my_account.py
Show inline comments
 
@@ -186,111 +186,111 @@ class MyAccountController(BaseController
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_perms(self):
 
        c.active = 'perms'
 
        self.__load_data()
 
        c.perm_user = AuthUser(user_id=request.authuser.user_id)
 

	
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_emails(self):
 
        c.active = 'emails'
 
        self.__load_data()
 

	
 
        c.user_email_map = UserEmailMap.query() \
 
            .filter(UserEmailMap.user == c.user).all()
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_emails_add(self):
 
        email = request.POST.get('new_email')
 

	
 
        try:
 
            UserModel().add_extra_email(request.authuser.user_id, email)
 
            Session().commit()
 
            h.flash(_("Added email %s to user") % email, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['email']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during email saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('my_account_emails'))
 

	
 
    def my_account_emails_delete(self):
 
        email_id = request.POST.get('del_email_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_email(request.authuser.user_id, email_id)
 
        Session().commit()
 
        h.flash(_("Removed email from user"), category='success')
 
        raise HTTPFound(location=url('my_account_emails'))
 

	
 
    def my_account_api_keys(self):
 
        c.active = 'api_keys'
 
        self.__load_data()
 
        show_expired = True
 
        c.lifetime_values = [
 
            (str(-1), _('Forever')),
 
            (str(5), _('5 minutes')),
 
            (str(60), _('1 hour')),
 
            (str(60 * 24), _('1 day')),
 
            (str(60 * 24 * 30), _('1 month')),
 
        ]
 
        c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
 
        c.user_api_keys = ApiKeyModel().get_api_keys(request.authuser.user_id,
 
                                                     show_expired=show_expired)
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_api_keys_add(self):
 
        lifetime = safe_int(request.POST.get('lifetime'), -1)
 
        description = request.POST.get('description')
 
        ApiKeyModel().create(request.authuser.user_id, description, lifetime)
 
        Session().commit()
 
        h.flash(_("API key successfully created"), category='success')
 
        raise HTTPFound(location=url('my_account_api_keys'))
 

	
 
    def my_account_api_keys_delete(self):
 
        api_key = request.POST.get('del_api_key')
 
        if request.POST.get('del_api_key_builtin'):
 
            user = User.get(request.authuser.user_id)
 
            user.api_key = generate_api_key()
 
            Session().commit()
 
            h.flash(_("API key successfully reset"), category='success')
 
        elif api_key:
 
            ApiKeyModel().delete(api_key, request.authuser.user_id)
 
            Session().commit()
 
            h.flash(_("API key successfully deleted"), category='success')
 

	
 
        raise HTTPFound(location=url('my_account_api_keys'))
 

	
 
    @IfSshEnabled
 
    def my_account_ssh_keys(self):
 
        c.active = 'ssh_keys'
 
        self.__load_data()
 
        c.user_ssh_keys = SshKeyModel().get_ssh_keys(request.authuser.user_id)
 
        return render('admin/my_account/my_account.html')
 

	
 
    @IfSshEnabled
 
    def my_account_ssh_keys_add(self):
 
        description = request.POST.get('description')
 
        public_key = request.POST.get('public_key')
 
        try:
 
            new_ssh_key = SshKeyModel().create(request.authuser.user_id,
 
                                               description, public_key)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        except SshKeyModelException as e:
 
            h.flash(e.args[0], category='error')
 
        raise HTTPFound(location=url('my_account_ssh_keys'))
 

	
 
    @IfSshEnabled
 
    def my_account_ssh_keys_delete(self):
 
        fingerprint = request.POST.get('del_public_key_fingerprint')
 
        try:
 
            SshKeyModel().delete(fingerprint, request.authuser.user_id)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key successfully deleted"), category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        except SshKeyModelException as e:
 
            h.flash(e.args[0], category='error')
 
        raise HTTPFound(location=url('my_account_ssh_keys'))
kallithea/controllers/admin/users.py
Show inline comments
 
@@ -359,113 +359,113 @@ class UsersController(BaseController):
 
        email = request.POST.get('new_email')
 
        user_model = UserModel()
 

	
 
        try:
 
            user_model.add_extra_email(id, email)
 
            Session().commit()
 
            h.flash(_("Added email %s to user") % email, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['email']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during email saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def delete_email(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 
        email_id = request.POST.get('del_email_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_email(id, email_id)
 
        Session().commit()
 
        h.flash(_("Removed email from user"), category='success')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def edit_ips(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'ips'
 
        c.user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == c.user).all()
 

	
 
        c.default_user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == User.get_default_user()).all()
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_ip(self, id):
 
        ip = request.POST.get('new_ip')
 
        user_model = UserModel()
 

	
 
        try:
 
            user_model.add_extra_ip(id, ip)
 
            Session().commit()
 
            h.flash(_("Added IP address %s to user whitelist") % ip, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['ip']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred while adding IP address'),
 
                    category='error')
 

	
 
        if 'default_user' in request.POST:
 
            raise HTTPFound(location=url('admin_permissions_ips'))
 
        raise HTTPFound(location=url('edit_user_ips', id=id))
 

	
 
    def delete_ip(self, id):
 
        ip_id = request.POST.get('del_ip_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_ip(id, ip_id)
 
        Session().commit()
 
        h.flash(_("Removed IP address from user whitelist"), category='success')
 

	
 
        if 'default_user' in request.POST:
 
            raise HTTPFound(location=url('admin_permissions_ips'))
 
        raise HTTPFound(location=url('edit_user_ips', id=id))
 

	
 
    @IfSshEnabled
 
    def edit_ssh_keys(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'ssh_keys'
 
        c.user_ssh_keys = SshKeyModel().get_ssh_keys(c.user.user_id)
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @IfSshEnabled
 
    def ssh_keys_add(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        description = request.POST.get('description')
 
        public_key = request.POST.get('public_key')
 
        try:
 
            new_ssh_key = SshKeyModel().create(c.user.user_id,
 
                                               description, public_key)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        except SshKeyModelException as e:
 
            h.flash(e.args[0], category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
 

	
 
    @IfSshEnabled
 
    def ssh_keys_delete(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        fingerprint = request.POST.get('del_public_key_fingerprint')
 
        try:
 
            SshKeyModel().delete(fingerprint, c.user.user_id)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key successfully deleted"), category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        except SshKeyModelException as e:
 
            h.flash(e.args[0], category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
kallithea/controllers/api/api.py
Show inline comments
 
@@ -2246,178 +2246,178 @@ class ApiController(JSONRPCController):
 
          result : null
 
          error :  {
 
            "failed to create gist"
 
          }
 

	
 
        """
 
        try:
 
            if isinstance(owner, Optional):
 
                owner = request.authuser.user_id
 

	
 
            owner = get_user_or_error(owner)
 
            description = Optional.extract(description)
 
            gist_type = Optional.extract(gist_type)
 
            lifetime = Optional.extract(lifetime)
 

	
 
            gist = GistModel().create(description=description,
 
                                      owner=owner,
 
                                      ip_addr=request.ip_addr,
 
                                      gist_mapping=files,
 
                                      gist_type=gist_type,
 
                                      lifetime=lifetime)
 
            Session().commit()
 
            return dict(
 
                msg='created new gist',
 
                gist=gist.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create gist')
 

	
 
    # def update_gist(self, gistid, files, owner=Optional(OAttr('apiuser')),
 
    #                 gist_type=Optional(Gist.GIST_PUBLIC),
 
    #                 gist_lifetime=Optional(-1), gist_description=Optional('')):
 
    #     gist = get_gist_or_error(gistid)
 
    #     updates = {}
 

	
 
    # permission check inside
 
    def delete_gist(self, gistid):
 
        """
 
        Deletes existing gist
 

	
 
        :param gistid: id of gist to delete
 
        :type gistid: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "deleted gist ID: <gist_id>",
 
            "gist": null
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete gist ID:<gist_id>"
 
          }
 

	
 
        """
 
        gist = get_gist_or_error(gistid)
 
        if not HasPermissionAny('hg.admin')():
 
            if gist.owner_id != request.authuser.user_id:
 
                raise JSONRPCError('gist `%s` does not exist' % (gistid,))
 

	
 
        try:
 
            GistModel().delete(gist)
 
            Session().commit()
 
            return dict(
 
                msg='deleted gist ID:%s' % (gist.gist_access_id,),
 
                gist=None
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete gist ID:%s'
 
                               % (gist.gist_access_id,))
 

	
 
    # permission check inside
 
    def get_changesets(self, repoid, start=None, end=None, start_date=None,
 
                       end_date=None, branch_name=None, reverse=False, with_file_list=False, max_revisions=None):
 
        repo = get_repo_or_error(repoid)
 
        if not HasRepoPermissionLevel('read')(repo.repo_name):
 
            raise JSONRPCError('Access denied to repo %s' % repo.repo_name)
 

	
 
        format = "%Y-%m-%dT%H:%M:%S"
 
        try:
 
            return [e.__json__(with_file_list) for e in
 
                repo.scm_instance.get_changesets(start,
 
                                                 end,
 
                                                 datetime.strptime(start_date, format) if start_date else None,
 
                                                 datetime.strptime(end_date, format) if end_date else None,
 
                                                 branch_name,
 
                                                 reverse, max_revisions)]
 
        except EmptyRepositoryError as e:
 
            raise JSONRPCError(e.message)
 
            raise JSONRPCError('Repository is empty')
 

	
 
    # permission check inside
 
    def get_changeset(self, repoid, raw_id, with_reviews=Optional(False)):
 
        repo = get_repo_or_error(repoid)
 
        if not HasRepoPermissionLevel('read')(repo.repo_name):
 
            raise JSONRPCError('Access denied to repo %s' % repo.repo_name)
 
        changeset = repo.get_changeset(raw_id)
 
        if isinstance(changeset, EmptyChangeset):
 
            raise JSONRPCError('Changeset %s does not exist' % raw_id)
 

	
 
        info = dict(changeset.as_dict())
 

	
 
        with_reviews = Optional.extract(with_reviews)
 
        if with_reviews:
 
            reviews = ChangesetStatusModel().get_statuses(
 
                                repo.repo_name, raw_id)
 
            info["reviews"] = reviews
 

	
 
        return info
 

	
 
    # permission check inside
 
    def get_pullrequest(self, pullrequest_id):
 
        """
 
        Get given pull request by id
 
        """
 
        pull_request = PullRequest.get(pullrequest_id)
 
        if pull_request is None:
 
            raise JSONRPCError('pull request `%s` does not exist' % (pullrequest_id,))
 
        if not HasRepoPermissionLevel('read')(pull_request.org_repo.repo_name):
 
            raise JSONRPCError('not allowed')
 
        return pull_request.get_api_data()
 

	
 
    # permission check inside
 
    def comment_pullrequest(self, pull_request_id, comment_msg=u'', status=None, close_pr=False):
 
        """
 
        Add comment, close and change status of pull request.
 
        """
 
        apiuser = get_user_or_error(request.authuser.user_id)
 
        pull_request = PullRequest.get(pull_request_id)
 
        if pull_request is None:
 
            raise JSONRPCError('pull request `%s` does not exist' % (pull_request_id,))
 
        if (not HasRepoPermissionLevel('read')(pull_request.org_repo.repo_name)):
 
            raise JSONRPCError('No permission to add comment. User needs at least reading permissions'
 
                               ' to the source repository.')
 
        owner = apiuser.user_id == pull_request.owner_id
 
        reviewer = apiuser.user_id in [reviewer.user_id for reviewer in pull_request.reviewers]
 
        if close_pr and not (apiuser.admin or owner):
 
            raise JSONRPCError('No permission to close pull request. User needs to be admin or owner.')
 
        if status and not (apiuser.admin or owner or reviewer):
 
            raise JSONRPCError('No permission to change pull request status. User needs to be admin, owner or reviewer.')
 
        if pull_request.is_closed():
 
            raise JSONRPCError('pull request is already closed')
 

	
 
        comment = ChangesetCommentsModel().create(
 
            text=comment_msg,
 
            repo=pull_request.org_repo.repo_id,
 
            author=apiuser.user_id,
 
            pull_request=pull_request.pull_request_id,
 
            f_path=None,
 
            line_no=None,
 
            status_change=ChangesetStatus.get_status_lbl(status),
 
            closing_pr=close_pr
 
        )
 
        action_logger(apiuser,
 
                      'user_commented_pull_request:%s' % pull_request_id,
 
                      pull_request.org_repo, request.ip_addr)
 
        if status:
 
            ChangesetStatusModel().set_status(
 
                pull_request.org_repo_id,
 
                status,
 
                apiuser.user_id,
 
                comment,
 
                pull_request=pull_request_id
 
            )
 
        if close_pr:
 
            PullRequestModel().close_pull_request(pull_request_id)
 
            action_logger(apiuser,
 
                          'user_closed_pull_request:%s' % pull_request_id,
 
                          pull_request.org_repo, request.ip_addr)
 
        Session().commit()
 
        return True
kallithea/lib/rcmail/response.py
Show inline comments
 
@@ -246,196 +246,196 @@ class MailResponse(object):
 
        """
 
        return self.to_message().as_string()
 

	
 
    def _encode_attachment(self, filename=None, content_type=None, data=None,
 
                           disposition=None, part=None):
 
        """
 
        Used internally to take the attachments mentioned in self.attachments
 
        and do the actual encoding in a lazy way when you call to_message.
 
        """
 
        if part:
 
            self.base.parts.append(part)
 
        elif filename:
 
            if not data:
 
                data = open(filename).read()
 

	
 
            self.base.attach_file(filename, data, content_type,
 
                                  disposition or 'attachment')
 
        else:
 
            self.base.attach_text(data, content_type)
 

	
 
        ctype = self.base.content_encoding['Content-Type'][0]
 

	
 
        if ctype and not ctype.startswith('multipart'):
 
            self.base.content_encoding['Content-Type'] = ('multipart/mixed', {})
 

	
 
    def to_message(self):
 
        """
 
        Figures out all the required steps to finally craft the
 
        message you need and return it.  The resulting message
 
        is also available as a self.base attribute.
 

	
 
        What is returned is a Python email API message you can
 
        use with those APIs.  The self.base attribute is the raw
 
        lamson.encoding.MailBase.
 
        """
 
        del self.base.parts[:]
 

	
 
        if self.Body and self.Html:
 
            self.multipart = True
 
            self.base.content_encoding['Content-Type'] = (
 
                'multipart/alternative', {})
 

	
 
        if self.multipart:
 
            self.base.body = None
 
            if self.Body:
 
                self.base.attach_text(self.Body, 'text/plain')
 

	
 
            if self.Html:
 
                self.base.attach_text(self.Html, 'text/html')
 

	
 
            for args in self.attachments:
 
                self._encode_attachment(**args)
 

	
 
        elif self.Body:
 
            self.base.body = self.Body
 
            self.base.content_encoding['Content-Type'] = ('text/plain', {})
 

	
 
        elif self.Html:
 
            self.base.body = self.Html
 
            self.base.content_encoding['Content-Type'] = ('text/html', {})
 

	
 
        return to_message(self.base, separator=self.separator)
 

	
 
    def all_parts(self):
 
        """
 
        Returns all the encoded parts.  Only useful for debugging
 
        or inspecting after calling to_message().
 
        """
 
        return self.base.parts
 

	
 
    def keys(self):
 
        return self.base.keys()
 

	
 

	
 
def to_message(mail, separator="; "):
 
    """
 
    Given a MailBase message, this will construct a MIMEPart
 
    that is canonicalized for use with the Python email API.
 
    """
 
    ctype, params = mail.content_encoding['Content-Type']
 

	
 
    if not ctype:
 
        if mail.parts:
 
            ctype = 'multipart/mixed'
 
        else:
 
            ctype = 'text/plain'
 
    else:
 
        if mail.parts:
 
            assert ctype.startswith(("multipart", "message")), \
 
                   "Content type should be multipart or message, not %r" % ctype
 

	
 
    # adjust the content type according to what it should be now
 
    mail.content_encoding['Content-Type'] = (ctype, params)
 

	
 
    try:
 
        out = MIMEPart(ctype, **params)
 
    except TypeError as exc:  # pragma: no cover
 
    except TypeError as e:  # pragma: no cover
 
        raise EncodingError("Content-Type malformed, not allowed: %r; "
 
                            "%r (Python ERROR: %s" %
 
                            (ctype, params, exc.message))
 
                            "%r (Python ERROR: %s)" %
 
                            (ctype, params, e.args[0]))
 

	
 
    for k in mail.keys():
 
        if k in ADDRESS_HEADERS_WHITELIST:
 
            out[k] = header_to_mime_encoding(
 
                                         mail[k],
 
                                         not_email=False,
 
                                         separator=separator
 
                                     )
 
        else:
 
            out[k] = header_to_mime_encoding(
 
                                         mail[k],
 
                                         not_email=True
 
                                    )
 

	
 
    out.extract_payload(mail)
 

	
 
    # go through the children
 
    for part in mail.parts:
 
        out.attach(to_message(part))
 

	
 
    return out
 

	
 

	
 
class MIMEPart(MIMEBase):
 
    """
 
    A reimplementation of nearly everything in email.mime to be more useful
 
    for actually attaching things.  Rather than one class for every type of
 
    thing you'd encode, there's just this one, and it figures out how to
 
    encode what you ask it.
 
    """
 
    def __init__(self, type, **params):
 
        self.maintype, self.subtype = type.split('/')
 
        MIMEBase.__init__(self, self.maintype, self.subtype, **params)
 

	
 
    def add_text(self, content):
 
        # this is text, so encode it in canonical form
 
        try:
 
            encoded = content.encode('ascii')
 
            charset = 'ascii'
 
        except UnicodeError:
 
            encoded = content.encode('utf-8')
 
            charset = 'utf-8'
 

	
 
        self.set_payload(encoded, charset=charset)
 

	
 
    def extract_payload(self, mail):
 
        if mail.body is None:
 
            return  # only None, '' is still ok
 

	
 
        ctype, ctype_params = mail.content_encoding['Content-Type']
 
        cdisp, cdisp_params = mail.content_encoding['Content-Disposition']
 

	
 
        assert ctype, ("Extract payload requires that mail.content_encoding "
 
                       "have a valid Content-Type.")
 

	
 
        if ctype.startswith("text/"):
 
            self.add_text(mail.body)
 
        else:
 
            if cdisp:
 
                # replicate the content-disposition settings
 
                self.add_header('Content-Disposition', cdisp, **cdisp_params)
 

	
 
            self.set_payload(mail.body)
 
            encoders.encode_base64(self)
 

	
 
    def __repr__(self):
 
        return "<MIMEPart '%s/%s': %r, %r, multipart=%r>" % (
 
            self.subtype,
 
            self.maintype,
 
            self['Content-Type'],
 
            self['Content-Disposition'],
 
            self.is_multipart())
 

	
 

	
 
def header_to_mime_encoding(value, not_email=False, separator=", "):
 
    if not value:
 
        return ""
 

	
 
    encoder = Charset(DEFAULT_ENCODING)
 
    if isinstance(value, list):
 
        return separator.join(properly_encode_header(
 
            v, encoder, not_email) for v in value)
 
    else:
 
        return properly_encode_header(value, encoder, not_email)
 

	
 

	
 
def properly_encode_header(value, encoder, not_email):
 
    """
 
    The only thing special (weird) about this function is that it tries
 
    to do a fast check to see if the header value has an email address in
 
    it.  Since random headers could have an email address, and email addresses
 
    have weird special formatting rules, we have to check for it.
 

	
 
    Normally this works fine, but in Librelist, we need to "obfuscate" email
 
    addresses by changing the '@' to '-AT-'.  This is where
 
    VALUE_IS_EMAIL_ADDRESS exists.  It's a simple lambda returning True/False
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
@@ -91,224 +91,224 @@ class MercurialRepository(BaseRepository
 
        return len(self.revisions) == 0
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def branches(self):
 
        return self._get_branches()
 

	
 
    @LazyProperty
 
    def closed_branches(self):
 
        return self._get_branches(normal=False, closed=True)
 

	
 
    @LazyProperty
 
    def allbranches(self):
 
        """
 
        List all branches, including closed branches.
 
        """
 
        return self._get_branches(closed=True)
 

	
 
    def _get_branches(self, normal=True, closed=False):
 
        """
 
        Gets branches for this repository
 
        Returns only not closed branches by default
 

	
 
        :param closed: return also closed branches for mercurial
 
        :param normal: return also normal branches
 
        """
 

	
 
        if self._empty:
 
            return {}
 

	
 
        bt = OrderedDict()
 
        for bn, _heads, node, isclosed in sorted(self._repo.branchmap().iterbranches()):
 
            if isclosed:
 
                if closed:
 
                    bt[safe_unicode(bn)] = ascii_str(mercurial.node.hex(node))
 
            else:
 
                if normal:
 
                    bt[safe_unicode(bn)] = ascii_str(mercurial.node.hex(node))
 
        return bt
 

	
 
    @LazyProperty
 
    def tags(self):
 
        """
 
        Gets tags for this repository
 
        """
 
        return self._get_tags()
 

	
 
    def _get_tags(self):
 
        if self._empty:
 
            return {}
 

	
 
        return OrderedDict(sorted(
 
            ((safe_unicode(n), ascii_str(mercurial.node.hex(h))) for n, h in self._repo.tags().items()),
 
            reverse=True,
 
            key=lambda x: x[0],  # sort by name
 
        ))
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None,
 
            **kwargs):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        if name in self.tags:
 
            raise TagAlreadyExistError("Tag %s already exists" % name)
 
        changeset = self.get_changeset(revision)
 
        local = kwargs.setdefault('local', False)
 

	
 
        if message is None:
 
            message = "Added tag %s for changeset %s" % (name,
 
                changeset.short_id)
 

	
 
        if date is None:
 
            date = safe_bytes(datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S'))
 

	
 
        try:
 
            mercurial.tags.tag(self._repo, safe_bytes(name), changeset._ctx.node(), safe_bytes(message), local, safe_bytes(user), date)
 
        except mercurial.error.Abort as e:
 
            raise RepositoryError(e.message)
 
            raise RepositoryError(e.args[0])
 

	
 
        # Reinitialize tags
 
        self.tags = self._get_tags()
 
        tag_id = self.tags[name]
 

	
 
        return self.get_changeset(revision=tag_id)
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        if name not in self.tags:
 
            raise TagDoesNotExistError("Tag %s does not exist" % name)
 
        if message is None:
 
            message = "Removed tag %s" % name
 
        if date is None:
 
            date = safe_bytes(datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S'))
 
        local = False
 

	
 
        try:
 
            mercurial.tags.tag(self._repo, safe_bytes(name), mercurial.commands.nullid, safe_bytes(message), local, safe_bytes(user), date)
 
            self.tags = self._get_tags()
 
        except mercurial.error.Abort as e:
 
            raise RepositoryError(e.message)
 
            raise RepositoryError(e.args[0])
 

	
 
    @LazyProperty
 
    def bookmarks(self):
 
        """
 
        Gets bookmarks for this repository
 
        """
 
        return self._get_bookmarks()
 

	
 
    def _get_bookmarks(self):
 
        if self._empty:
 
            return {}
 

	
 
        return OrderedDict(sorted(
 
            ((safe_unicode(n), ascii_str(h)) for n, h in self._repo._bookmarks.items()),
 
            reverse=True,
 
            key=lambda x: x[0],  # sort by name
 
        ))
 

	
 
    def _get_all_revisions(self):
 
        return [ascii_str(self._repo[x].hex()) for x in self._repo.filtered(b'visible').changelog.revs()]
 

	
 
    def get_diff(self, rev1, rev2, path='', ignore_whitespace=False,
 
                  context=3):
 
        """
 
        Returns (git like) *diff*, as plain text. Shows changes introduced by
 
        ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``. If negative value is passed-in, it will be
 
          set to ``0`` instead.
 
        """
 

	
 
        # Negative context values make no sense, and will result in
 
        # errors. Ensure this does not happen.
 
        if context < 0:
 
            context = 0
 

	
 
        if hasattr(rev1, 'raw_id'):
 
            rev1 = getattr(rev1, 'raw_id')
 

	
 
        if hasattr(rev2, 'raw_id'):
 
            rev2 = getattr(rev2, 'raw_id')
 

	
 
        # Check if given revisions are present at repository (may raise
 
        # ChangesetDoesNotExistError)
 
        if rev1 != self.EMPTY_CHANGESET:
 
            self.get_changeset(rev1)
 
        self.get_changeset(rev2)
 
        if path:
 
            file_filter = mercurial.match.exact(path)
 
        else:
 
            file_filter = None
 

	
 
        return b''.join(mercurial.patch.diff(self._repo, rev1, rev2, match=file_filter,
 
                          opts=mercurial.mdiff.diffopts(git=True,
 
                                        showfunc=True,
 
                                        ignorews=ignore_whitespace,
 
                                        context=context)))
 

	
 
    @classmethod
 
    def _check_url(cls, url, repoui=None):
 
        """
 
        Function will check given url and try to verify if it's a valid
 
        link. Sometimes it may happened that mercurial will issue basic
 
        auth request that can cause whole API to hang when used from python
 
        or other external calls.
 

	
 
        On failures it'll raise urllib2.HTTPError, exception is also thrown
 
        when the return code is non 200
 
        """
 
        # check first if it's not an local url
 
        if os.path.isdir(url) or url.startswith(b'file:'):
 
            return True
 

	
 
        if url.startswith(b'ssh:'):
 
            # in case of invalid uri or authentication issues, sshpeer will
 
            # throw an exception.
 
            mercurial.sshpeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            return True
 

	
 
        url_prefix = None
 
        if b'+' in url[:url.find(b'://')]:
 
            url_prefix, url = url.split(b'+', 1)
 

	
 
        handlers = []
 
        url_obj = mercurial.util.url(url)
 
        test_uri, authinfo = url_obj.authinfo()
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
kallithea/model/ssh_key.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.ssh_key
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
SSH key model for Kallithea
 

	
 
"""
 

	
 
import errno
 
import logging
 
import os
 
import stat
 
import tempfile
 

	
 
from tg import config
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import ssh
 
from kallithea.lib.utils2 import str2bool
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.model.db import User, UserSshKeys
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SshKeyModelException(RepositoryError):
 
    """Exception raised by SshKeyModel methods to report errors"""
 

	
 

	
 
class SshKeyModel(object):
 

	
 
    def create(self, user, description, public_key):
 
        """
 
        :param user: user or user_id
 
        :param description: description of SshKey
 
        :param publickey: public key text
 
        Will raise SshKeyModelException on errors
 
        """
 
        try:
 
            keytype, pub, comment = ssh.parse_pub_key(public_key)
 
        except ssh.SshKeyParseError as e:
 
            raise SshKeyModelException(_('SSH key %r is invalid: %s') % (public_key, e.message))
 
            raise SshKeyModelException(_('SSH key %r is invalid: %s') % (public_key, e.args[0]))
 
        if not description.strip():
 
            description = comment.strip()
 

	
 
        user = User.guess_instance(user)
 

	
 
        new_ssh_key = UserSshKeys()
 
        new_ssh_key.user_id = user.user_id
 
        new_ssh_key.description = description
 
        new_ssh_key.public_key = public_key
 

	
 
        for ssh_key in UserSshKeys.query().filter(UserSshKeys.fingerprint == new_ssh_key.fingerprint).all():
 
            raise SshKeyModelException(_('SSH key %s is already used by %s') %
 
                                       (new_ssh_key.fingerprint, ssh_key.user.username))
 

	
 
        Session().add(new_ssh_key)
 

	
 
        return new_ssh_key
 

	
 
    def delete(self, fingerprint, user):
 
        """
 
        Deletes ssh key with given fingerprint for the given user.
 
        Will raise SshKeyModelException on errors
 
        """
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.fingerprint == fingerprint)
 

	
 
        user = User.guess_instance(user)
 
        ssh_key = ssh_key.filter(UserSshKeys.user_id == user.user_id)
 

	
 
        ssh_key = ssh_key.scalar()
 
        if ssh_key is None:
 
            raise SshKeyModelException(_('SSH key with fingerprint %r found') % fingerprint)
 
        Session().delete(ssh_key)
 

	
 
    def get_ssh_keys(self, user):
 
        user = User.guess_instance(user)
 
        user_ssh_keys = UserSshKeys.query() \
 
            .filter(UserSshKeys.user_id == user.user_id).all()
 
        return user_ssh_keys
 

	
 
    def write_authorized_keys(self):
 
        if not str2bool(config.get('ssh_enabled', False)):
 
            log.error("Will not write SSH authorized_keys file - ssh_enabled is not configured")
 
            return
 
        authorized_keys = config.get('ssh_authorized_keys')
 
        kallithea_cli_path = config.get('kallithea_cli_path', 'kallithea-cli')
 
        if not authorized_keys:
 
            log.error('Cannot write SSH authorized_keys file - ssh_authorized_keys is not configured')
 
            return
 
        log.info('Writing %s', authorized_keys)
 

	
 
        authorized_keys_dir = os.path.dirname(authorized_keys)
 
        try:
 
            os.makedirs(authorized_keys_dir)
 
            os.chmod(authorized_keys_dir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) # ~/.ssh/ must be 0700
 
        except OSError as exception:
 
            if exception.errno != errno.EEXIST:
 
                raise
 
        # Now, test that the directory is or was created in a readable way by previous.
 
        if not (os.path.isdir(authorized_keys_dir) and
 
                os.access(authorized_keys_dir, os.W_OK)):
 
            raise SshKeyModelException("Directory of authorized_keys cannot be written to so authorized_keys file %s cannot be written" % (authorized_keys))
 

	
 
        # Make sure we don't overwrite a key file with important content
 
        if os.path.exists(authorized_keys):
 
            with open(authorized_keys) as f:
 
                for l in f:
 
                    if not l.strip() or l.startswith('#'):
 
                        pass # accept empty lines and comments
 
                    elif ssh.SSH_OPTIONS in l and ' ssh-serve ' in l:
 
                        pass # Kallithea entries are ok to overwrite
 
                    else:
 
                        raise SshKeyModelException("Safety check failed, found %r line in %s - please remove it if Kallithea should manage the file" % (l.strip(), authorized_keys))
 

	
 
        fh, tmp_authorized_keys = tempfile.mkstemp('.authorized_keys', dir=os.path.dirname(authorized_keys))
 
        with os.fdopen(fh, 'w') as f:
 
            f.write("# WARNING: This .ssh/authorized_keys file is managed by Kallithea. Manual editing or adding new entries will make Kallithea back off.\n")
 
            for key in UserSshKeys.query().join(UserSshKeys.user).filter(User.active == True):
 
                f.write(ssh.authorized_keys_line(kallithea_cli_path, config['__file__'], key))
 
        os.chmod(tmp_authorized_keys, stat.S_IRUSR | stat.S_IWUSR)
 
        # This preliminary remove is needed for Windows, not for Unix.
 
        # TODO In Python 3, the remove+rename sequence below should become os.replace.
 
        if os.path.exists(authorized_keys):
 
            os.remove(authorized_keys)
 
        os.rename(tmp_authorized_keys, authorized_keys)
0 comments (0 inline, 0 general)