Changeset - 24db2cd42881
.hgtags
Show inline comments
 
c097458480a5972dd75d5695b61e855fd0ab371e rhodecode-0.0.0.7.0
 
8bdec09436cb7e4a764bd2ba50b84060e30eb34f rhodecode-0.0.0.7.1
 
1a18994cdc3bdd156ee93c7c0fb8d94a88f1f640 rhodecode-0.0.0.7.2
 
a3a7c3e03b76ee264a828cb1087970bb98bbffcd rhodecode-0.0.0.7.3
 
58b46f9194c347641bfc9a26697ef413a4761971 rhodecode-0.0.0.7.4
 
710e7a75bb6b8346cee3bd0ddda67592e4790268 rhodecode-0.0.0.7.5
 
ca80f8c0056211dad33483a50b913593516d7a6c rhodecode-0.0.0.7.6
 
0cf49c29c846fefeb4e1a222e4b1850e9e3eaa62 rhodecode-0.0.0.7.7
 
702c7e565c56a49c89414e81f28571c8e5b67408 rhodecode-0.0.0.7.8
 
c12f4d19c95065f313eefcd45eac9ef507f5fa55 rhodecode-0.0.0.7.9
 
558eb7c5028f24a90b5466ed16be13b213ba1fc2 rhodecode-0.0.0.8.0
 
a9814a642e11092b243ca01721254a04633a0ffc rhodecode-0.0.0.8.1
 
ccbe729908844884aea89d00fb14a6cb92e10c06 rhodecode-0.0.0.8.2
 
ca41d544dbdfd2f81bd0304168492a26276aadb6 rhodecode-0.0.0.8.3
 
2fa16ec5822da0c6fade3dd1ed9b6c0655e5dbbf rhodecode-0.0.0.8.4
 
16ba57d8fe2317c49dbd422afd07ab497687aa02 rhodecode-0.0.0.8.5
 
53128b6b9a4ddb6ee9554cbb83a082a6d1316b42 rhodecode-0.0.1.0.0rc4
 
afd98d1f817e6a6b52172735c22160239e615a6b rhodecode-0.0.1.0.0
 
bee56f209c40a6880f2f633b02227b5ee1f8ff5a rhodecode-0.0.1.0.1
 
d85b0948e53925ebbbc49e9f7967013a04f866e9 rhodecode-0.0.1.0.2
 
d9c8dddb96af521e346f05b88d515c536eef3d17 rhodecode-0.0.1.1.0
 
344f748517814ed0408a49e392dc625f4cc37fdc rhodecode-0.0.1.1.1
 
6c01c12eafb8cc72d4c4cbd121400fad755b2862 rhodecode-0.0.1.1.2
 
4fa80e0484ef5c33feaa9c39fc66916f410ba353 rhodecode-0.0.1.1.3
 
cb77867d69d3c5931712aac486c980a42ee90745 rhodecode-0.0.1.1.5
 
cb77867d69d3c5931712aac486c980a42ee90745 rhodecode-0.0.1.1.5
 
008bdfdd95c8bd31ae6d89f76c75c1f49cbcd0bc rhodecode-0.0.1.1.5
 
c5af1d3c861fb36b156224e75c2f55a97f54657d rhodecode-0.0.1.1.6
 
7327a0d1584cf28d33e738048af1f6809d499451 rhodecode-0.0.1.1.7
 
bd102f45950f779995a1beae42b6eb099cdd27b3 rhodecode-0.0.1.1.7
 
c8974135732aa0ceb841cee6df66e29f089b4963 rhodecode-0.0.1.1.8
 
c252049af24cd98eef5f4143fa3abbff3c912e29 rhodecode-0.0.1.2.0
 
0b8fba8ab90b01f811a50e6e7384989cced21d38 rhodecode-0.0.1.2.1
 
22273bec00ba2fd860c60a9277d3d7229e288e18 rhodecode-0.0.1.2.2
 
1ff606a7858dbd8a5f70b3da5cc89524bd0d84f9 rhodecode-0.0.1.2.3
 
a7a282a902b207ce34e830d643c79b7ab52e3b35 rhodecode-0.0.1.2.4
 
b6b611e7722e754abebaae6e265cbb4c823d344d rhodecode-0.0.1.2.5
 
dbc82e3362a25d2aece42060089824c4342efd17 rhodecode-0.0.1.3.0
 
79a95f338fd0115b2cdb77118f39e17d22ff505c rhodecode-0.0.1.3.1
 
9ab21c5ddb84935bea5c743b4e147ed5a398b30c rhodecode-0.0.1.3.2
 
934906f028b582a254e0028ba25e5d20dd32b9cd rhodecode-0.0.1.3.3
 
af21362474e3ab5aa0e2fbb1c872356f2c16c4f3 rhodecode-0.0.1.3.4
 
0e2792e04bd316fe64335cbe6a476031ac60b29b rhodecode-0.0.1.3.5
 
edfff9f37916389144d3a3644d0a7d7adfd79b11 rhodecode-0.0.1.3.6
 
9ae95fdeca184f2404205645f06c6597b74ef2db rhodecode-0.0.1.4.0
 
909143a4dde53c46d4f24abb426ec870471c7de1 rhodecode-0.0.1.4.1
 
d998cc84cf726798486a438763053f0e1dc1b646 rhodecode-0.0.1.4.2
 
3f5d40b9dd99ccb009ea2211ee2d4b594c634946 rhodecode-0.0.1.4.3
 
3148c08cf86f1849917e2d50f7ab7766c1550b0a rhodecode-0.0.1.4.4
 
a5f0bc867edc88be23eb808693e5393a97d4c54a rhodecode-0.0.1.5.0
 
3259dc7caea48687eab018ee646ae6ad7e7ef377 rhodecode-0.0.1.5.1
 
efe23d6c178c11d575a0214181276a3452776e48 rhodecode-0.0.1.5.2
 
1a498b11f1540f5b94b6f6009298f5dc3eaad9e9 rhodecode-0.0.1.5.3
 
3447862ad8c9ceba85857774c526e39fde3a2281 rhodecode-0.0.1.5.4
 
c15d7b336af58df9f1bbc8f8957464e7ea618d4c rhodecode-0.0.1.6.0rc1
 
78b53ee0d247f90d51b028307ff5717851b6c265 rhodecode-0.0.1.6.0
 
351ad34d56321349ff5bd38f537bd768b8efef2e rhodecode-0.0.1.7.0
 
1f71ef689d2a3c9978cea6591a1f4e9107a5ca83 rhodecode-0.0.1.7.1
 
cc48c1541c7e2e84114bf92a0f9cd4b8b1341545 0.0
 
d17e88a1a88a29f6fac948c94498129e405a40d3 0.1
 
ad0ce803b40cb17fc3988373052943e041030b02 0.2
 
c6e32714336345403adf76abb6ebf9b8116fcdc7 0.2.1
 
14f488a5dc4ca6647bc6acf12534fd137e968aa8 0.2.2
 
9b3e9e242f5c97cc0c7657e5ac93dce7de61ca16 0.3
 
9bf8eb837e785b6856ccfac264e977ce3ebe1535 0.3.1
 
a84d40e9481fcea4dafadee86b03f0dd401527d6 0.3.2
 
64ea7ea0923618a0c117acebb816a6f0d162bfdb 0.3.3
 
cf635c823ea059cc3a1581b82d8672e46b682384 0.3.4
 
4cca4cc6a0a97f4c4763317184cd41aca4297630 0.3.5
 
082c9b8f0f17bd34740eb90c69bdc4c80d4b5b31 0.3.6
 
a18445b85d407294da0b7f1d8be3bedef5ffdea6 0.3.7
 
8db761c407685e7b08b800c947890035b0d67025 0.4.0rc1
 
60f726162fd6c515bd819feb423be73cad01d7d3 0.4.0rc2
 
19086c5de05f4984d7a90cd31624c45dd893f6bb 0.4.0
 
da65398a62fff50f3d241796cbf17acdea2092ef 0.4.1
 
bfa0b0a814644f0af3f492d17a9ed169cc3b89fe 0.5.0
 
d01a8e92936dbd62c76505432f60efba432e9397 0.5.1
 
aa0a637fa6f635a5e024fa56b19ed2a2dacca857 0.5.2
CONTRIBUTORS
Show inline comments
 
List of contributors to Kallithea project:
 

	
 
    Thomas De Schampheleire <thomas.de_schampheleire@nokia.com> 2014-2020
 
    Mads Kiilerich <mads@kiilerich.com> 2016-2020
 
    Dennis Fink <dennis.fink@c3l.lu> 2020
 
    Andrej Shadura <andrew@shadura.me> 2012 2014-2017 2019
 
    Étienne Gilli <etienne.gilli@gmail.com> 2015-2017 2019
 
    Allan Nordhøy <epost@anotheragency.no> 2017-2019
 
    ssantos <ssantos@web.de> 2018-2019
 
    Adi Kriegisch <adi@cg.tuwien.ac.at> 2019
 
    Danni Randeris <danniranderis@gmail.com> 2019
 
    Edmund Wong <ewong@crazy-cat.org> 2019
 
    Elizabeth Sherrock <lizzyd710@gmail.com> 2019
 
    Hüseyin Tunç <huseyin.tunc@bulutfon.com> 2019
 
    leela <53352@protonmail.com> 2019
 
    Manuel Jacob <me@manueljacob.de> 2019
 
    Mateusz Mendel <mendelm9@gmail.com> 2019
 
    Nathan <bonnemainsnathan@gmail.com> 2019
 
    Oleksandr Shtalinberg <o.shtalinberg@gmail.com> 2019
 
    Private <adamantine.sword@gmail.com> 2019
 
    THANOS SIOURDAKIS <siourdakisthanos@gmail.com> 2019
 
    Wolfgang Scherer <wolfgang.scherer@gmx.de> 2019
 
    Христо Станев <hstanev@gmail.com> 2019
 
    Dominik Ruf <dominikruf@gmail.com> 2012 2014-2018
 
    Michal Čihař <michal@cihar.com> 2014-2015 2018
 
    Branko Majic <branko@majic.rs> 2015 2018
 
    Chris Rule <crule@aegistg.com> 2018
 
    Jesús Sánchez <jsanchezfdz95@gmail.com> 2018
 
    Patrick Vane <patrick_vane@lowentry.com> 2018
 
    Pheng Heong Tan <phtan90@gmail.com> 2018
 
    Максим Якимчук <xpinovo@gmail.com> 2018
 
    Марс Ямбар <mjambarmeta@gmail.com> 2018
 
    Mads Kiilerich <madski@unity3d.com> 2012-2017
 
    Unity Technologies 2012-2017
 
    Søren Løvborg <sorenl@unity3d.com> 2015-2017
 
    Sam Jaques <sam.jaques@me.com> 2015 2017
 
    Asterios Dimitriou <steve@pci.gr> 2016-2017
 
    Alessandro Molina <alessandro.molina@axant.it> 2017
 
    Anton Schur <tonich.sh@gmail.com> 2017
 
    Ching-Chen Mao <mao@lins.fju.edu.tw> 2017
 
    Eivind Tagseth <eivindt@gmail.com> 2017
 
    FUJIWARA Katsunori <foozy@lares.dti.ne.jp> 2017
 
    Holger Schramm <info@schramm.by> 2017
 
    Karl Goetz <karl@kgoetz.id.au> 2017
 
    Lars Kruse <devel@sumpfralle.de> 2017
 
    Marko Semet <markosemet@googlemail.com> 2017
 
    Viktar Vauchkevich <victorenator@gmail.com> 2017
 
    Takumi IINO <trot.thunder@gmail.com> 2012-2016
 
    Jan Heylen <heyleke@gmail.com> 2015-2016
 
    Robert Martinez <ntttq@inboxen.org> 2015-2016
 
    Robert Rauch <mail@robertrauch.de> 2015-2016
 
    Angel Ezquerra <angel.ezquerra@gmail.com> 2016
 
    Anton Shestakov <av6@dwimlabs.net> 2016
 
    Brandon Jones <bjones14@gmail.com> 2016
 
    Kateryna Musina <kateryna@unity3d.com> 2016
 
    Konstantin Veretennicov <kveretennicov@gmail.com> 2016
 
    Oscar Curero <oscar@naiandei.net> 2016
 
    Robert James Dennington <tinytimrob@googlemail.com> 2016
 
    timeless@gmail.com 2016
 
    YFdyh000 <yfdyh000@gmail.com> 2016
 
    Aras Pranckevičius <aras@unity3d.com> 2012-2013 2015
 
    Sean Farley <sean.michael.farley@gmail.com> 2013-2015
 
    Bradley M. Kuhn <bkuhn@sfconservancy.org> 2014-2015
 
    Christian Oyarzun <oyarzun@gmail.com> 2014-2015
 
    Joseph Rivera <rivera.d.joseph@gmail.com> 2014-2015
 
    Anatoly Bubenkov <bubenkoff@gmail.com> 2015
 
    Andrew Bartlett <abartlet@catalyst.net.nz> 2015
 
    Balázs Úr <urbalazs@gmail.com> 2015
 
    Ben Finney <ben@benfinney.id.au> 2015
 
    Daniel Hobley <danielh@unity3d.com> 2015
 
    David Avigni <david.avigni@ankapi.com> 2015
 
    Denis Blanchette <dblanchette@coveo.com> 2015
 
    duanhongyi <duanhongyi@doopai.com> 2015
 
    EriCSN Chang <ericsning@gmail.com> 2015
 
    Grzegorz Krason <grzegorz.krason@gmail.com> 2015
 
    Jiří Suchan <yed@vanyli.net> 2015
 
    Kazunari Kobayashi <kobanari@nifty.com> 2015
 
    Kevin Bullock <kbullock@ringworld.org> 2015
 
    kobanari <kobanari@nifty.com> 2015
 
    Marc Abramowitz <marc@marc-abramowitz.com> 2015
 
    Marc Villetard <marc.villetard@gmail.com> 2015
 
    Matthias Zilk <matthias.zilk@gmail.com> 2015
 
    Michael Pohl <michael@mipapo.de> 2015
 
    Michael V. DePalatis <mike@depalatis.net> 2015
 
    Morten Skaaning <mortens@unity3d.com> 2015
 
    Nick High <nick@silverchip.org> 2015
 
    Niemand Jedermann <predatorix@web.de> 2015
 
    Peter Vitt <petervitt@web.de> 2015
 
    Ronny Pfannschmidt <opensource@ronnypfannschmidt.de> 2015
 
    Tuux <tuxa@galaxie.eu.org> 2015
 
    Viktar Palstsiuk <vipals@gmail.com> 2015
 
    Ante Ilic <ante@unity3d.com> 2014
 
    Calinou <calinou@opmbx.org> 2014
 
    Daniel Anderson <daniel@dattrix.com> 2014
 
    Henrik Stuart <hg@hstuart.dk> 2014
 
    Ingo von Borstel <kallithea@planetmaker.de> 2014
 
    invision70 <invision70@gmail.com> 2014
 
    Jelmer Vernooij <jelmer@samba.org> 2014
 
    Jim Hague <jim.hague@acm.org> 2014
 
    Matt Fellows <kallithea@matt-fellows.me.uk> 2014
 
    Max Roman <max@choloclos.se> 2014
 
    Na'Tosha Bard <natosha@unity3d.com> 2014
 
    Rasmus Selsmark <rasmuss@unity3d.com> 2014
 
    SkryabinD <skryabind@gmail.com> 2014
 
    Tim Freund <tim@freunds.net> 2014
 
    Travis Burtrum <android@moparisthebest.com> 2014
 
    whosaysni <whosaysni@gmail.com> 2014
 
    Zoltan Gyarmati <mr.zoltan.gyarmati@gmail.com> 2014
 
    Marcin Kuźmiński <marcin@python-works.com> 2010-2013
 
    Nemcio <areczek01@gmail.com> 2012-2013
 
    xpol <xpolife@gmail.com> 2012-2013
 
    Andrey Mivrenik <myvrenik@gmail.com> 2013
 
    Aparkar <aparkar@icloud.com> 2013
 
    ArcheR <aleclitvinov1980@gmail.com> 2013
 
    Dennis Brakhane <brakhane@googlemail.com> 2013
 
    gnustavo <gustavo@gnustavo.com> 2013
 
    Grzegorz Rożniecki <xaerxess@gmail.com> 2013
 
    Ilya Beda <ir4y.ix@gmail.com> 2013
 
    ivlevdenis <ivlevdenis.ru@gmail.com> 2013
 
    Jonathan Sternberg <jonathansternberg@gmail.com> 2013
 
    Leonardo Carneiro <leonardo@unity3d.com> 2013
 
    Magnus Ericmats <magnus.ericmats@gmail.com> 2013
 
    Martin Vium <martinv@unity3d.com> 2013
 
    Mikhail Zholobov <legal90@gmail.com> 2013
 
    mokeev1995 <mokeev_andre@mail.ru> 2013
 
    Ruslan Bekenev <furyinbox@gmail.com> 2013
 
    shirou - しろう 2013
 
    Simon Lopez <simon.lopez@slopez.org> 2013
 
    softforwinxp <softforwinxp@gmail.com> 2013
 
    stephanj <info@stephan-jauernick.de> 2013
 
    Ton Plomp <tcplomp@gmail.com> 2013
 
    zhmylove <zhmylove@narod.ru> 2013
 
    こいんとす <tkondou@gmail.com> 2013
 
    Augusto Herrmann <augusto.herrmann@planejamento.gov.br> 2011-2012
 
    Augusto Herrmann <augusto.herrmann@gmail.com> 2012
 
    Dan Sheridan <djs@adelard.com> 2012
 
    Dies Koper <diesk@fast.au.fujitsu.com> 2012
 
    Erwin Kroon <e.kroon@smartmetersolutions.nl> 2012
 
    H Waldo G <gwaldo@gmail.com> 2012
 
    hppj <hppj@postmage.biz> 2012
 
    Indra Talip <indra.talip@gmail.com> 2012
 
    mikespook <mikespook@gmail.com> 2012
 
    nansenat16 <nansenat16@null.tw> 2012
 
    Nemcio <bogdan114@g.pl> 2012
 
    Philip Jameson <philip.j@hostdime.com> 2012
 
    Raoul Thill <raoul.thill@gmail.com> 2012
 
    Stefan Engel <mail@engel-stefan.de> 2012
 
    Tony Bussieres <t.bussieres@gmail.com> 2012
 
    Vincent Caron <vcaron@bearstech.com> 2012
 
    Vincent Duvert <vincent@duvert.net> 2012
 
    Vladislav Poluhin <nuklea@gmail.com> 2012
 
    Zachary Auclair <zach101@gmail.com> 2012
 
    Ankit Solanki <ankit.solanki@gmail.com> 2011
 
    Dmitri Kuznetsov 2011
 
    Jared Bunting <jared.bunting@peachjean.com> 2011
 
    Jason Harris <jason@jasonfharris.com> 2011
 
    Les Peabody <lpeabody@gmail.com> 2011
 
    Liad Shani <liadff@gmail.com> 2011
 
    Lorenzo M. Catucci <lorenzo@sancho.ccd.uniroma2.it> 2011
 
    Matt Zuba <matt.zuba@goodwillaz.org> 2011
 
    Nicolas VINOT <aeris@imirhil.fr> 2011
 
    Shawn K. O'Shea <shawn@eth0.net> 2011
 
    Thayne Harbaugh <thayne@fusionio.com> 2011
 
    Łukasz Balcerzak <lukaszbalcerzak@gmail.com> 2010
 
    Andrew Kesterson <andrew@aklabs.net>
 
    cejones
 
    David A. Sjøen <david.sjoen@westcon.no>
 
    James Rhodes <jrhodes@redpointsoftware.com.au>
 
    Jonas Oberschweiber <jonas.oberschweiber@d-velop.de>
 
    larikale
 
    RhodeCode GmbH
 
    Sebastian Kreutzberger <sebastian@rhodecode.com>
 
    Steve Romanow <slestak989@gmail.com>
 
    SteveCohen
 
    Thomas <thomas@rhodecode.com>
 
    Thomas Waldmann <tw-public@gmx.de>
kallithea/alembic/versions/4851d15bc437_db_migration_step_after_95c01895c006_.py
Show inline comments
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""db: migration step after 95c01895c006 failed to add usk_public_key_idx in alembic step b74907136bc1
 

	
 
Revision ID: 4851d15bc437
 
Revises: 151b4a4e8c48
 
Create Date: 2019-11-24 02:51:14.029583
 

	
 
"""
 

	
 
# The following opaque hexadecimal identifiers ("revisions") are used
 
# by Alembic to track this migration script and its relations to others.
 
revision = '4851d15bc437'
 
down_revision = '151b4a4e8c48'
 
branch_labels = None
 
depends_on = None
 

	
 
import sqlalchemy as sa
 
from alembic import op
 

	
 

	
 
def upgrade():
 
    pass
 
    # The following upgrade step turned out to be a bad idea. A later step
 
    # "d7ec25b66e47_ssh_drop_usk_public_key_idx_again" will remove the index
 
    # again if it exists ... but we shouldn't even try to create it.
 

	
 
    #meta = sa.MetaData()
 
    #meta.reflect(bind=op.get_bind())
 

	
 
    #if not any(i.name == 'usk_public_key_idx' for i in meta.tables['user_ssh_keys'].indexes):
 
    #    with op.batch_alter_table('user_ssh_keys', schema=None) as batch_op:
 
    #        batch_op.create_index('usk_public_key_idx', ['public_key'], unique=False)
 

	
 

	
 
def downgrade():
 
    meta = sa.MetaData()
 
    if any(i.name == 'usk_public_key_idx' for i in meta.tables['user_ssh_keys'].indexes):
 
        with op.batch_alter_table('user_ssh_keys', schema=None) as batch_op:
 
            batch_op.drop_index('usk_public_key_idx')
kallithea/controllers/admin/repo_groups.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.repo_groups
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Repository groups controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 23, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import app_globals, request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from tg.i18n import ungettext
 
from webob.exc import HTTPForbidden, HTTPFound, HTTPInternalServerError, HTTPNotFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoGroupPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseController, render
 
from kallithea.lib.utils2 import safe_int, safe_unicode
 
from kallithea.model.db import RepoGroup, Repository
 
from kallithea.model.forms import RepoGroupForm, RepoGroupPermsForm
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import AvailableRepoGroupChoices, RepoGroupList
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoGroupsController(BaseController):
 

	
 
    @LoginRequired(allow_default_user=True)
 
    def _before(self, *args, **kwargs):
 
        super(RepoGroupsController, self)._before(*args, **kwargs)
 

	
 
    def __load_defaults(self, extras=(), exclude=()):
 
        """extras is used for keeping current parent ignoring permissions
 
        exclude is used for not moving group to itself TODO: also exclude descendants
 
        Note: only admin can create top level groups
 
        """
 
        repo_groups = AvailableRepoGroupChoices([], 'admin', extras)
 
        exclude_group_ids = set(rg.group_id for rg in exclude)
 
        c.repo_groups = [rg for rg in repo_groups
 
                         if rg[0] not in exclude_group_ids]
 

	
 
    def __load_data(self, group_id):
 
        """
 
        Load defaults settings for edit, and update
 

	
 
        :param group_id:
 
        """
 
        repo_group = RepoGroup.get_or_404(group_id)
 
        data = repo_group.get_dict()
 
        data['group_name'] = repo_group.name
 

	
 
        # fill repository group users
 
        for p in repo_group.repo_group_to_perm:
 
            data.update({'u_perm_%s' % p.user.username:
 
                             p.permission.permission_name})
 

	
 
        # fill repository group groups
 
        for p in repo_group.users_group_to_perm:
 
            data.update({'g_perm_%s' % p.users_group.users_group_name:
 
                             p.permission.permission_name})
 

	
 
        return data
 

	
 
    def _revoke_perms_on_yourself(self, form_result):
 
        _up = [u for u in form_result['perms_updates'] if request.authuser.username == u[0]]
 
        _new = [u for u in form_result['perms_new'] if request.authuser.username == u[0]]
 
        if _new and _new[0][1] != 'group.admin' or _up and _up[0][1] != 'group.admin':
 
            return True
 
        return False
 

	
 
    def index(self, format='html'):
 
        _list = RepoGroup.query(sorted=True).all()
 
        group_iter = RepoGroupList(_list, perm_level='admin')
 
        repo_groups_data = []
 
        total_records = len(group_iter)
 
        _tmpl_lookup = app_globals.mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        repo_group_name = lambda repo_group_name, children_groups: (
 
            template.get_def("repo_group_name")
 
            .render_unicode(repo_group_name, children_groups, _=_, h=h, c=c)
 
        )
 
        repo_group_actions = lambda repo_group_id, repo_group_name, gr_count: (
 
            template.get_def("repo_group_actions")
 
            .render_unicode(repo_group_id, repo_group_name, gr_count, _=_, h=h, c=c,
 
                    ungettext=ungettext)
 
        )
 

	
 
        for repo_gr in group_iter:
 
            children_groups = [safe_unicode(g.name) for g in repo_gr.parents] + [safe_unicode(repo_gr.name)]
 
            repo_count = repo_gr.repositories.count()
 
            repo_groups_data.append({
 
                "raw_name": repo_gr.group_name,
 
                "group_name": repo_group_name(repo_gr.group_name, children_groups),
 
                "desc": h.escape(repo_gr.group_description),
 
                "repos": repo_count,
 
                "owner": h.person(repo_gr.owner),
 
                "action": repo_group_actions(repo_gr.group_id, repo_gr.group_name,
 
                                             repo_count)
 
            })
 

	
 
        c.data = {
 
            "sort": None,
 
            "dir": "asc",
 
            "records": repo_groups_data
 
        }
 

	
 
        return render('admin/repo_groups/repo_groups.html')
 

	
 
    def create(self):
 
        self.__load_defaults()
 

	
 
        # permissions for can create group based on parent_id are checked
 
        # here in the Form
 
        repo_group_form = RepoGroupForm(repo_groups=c.repo_groups)
 
        form_result = None
 
        try:
 
            form_result = repo_group_form.to_python(dict(request.POST))
 
            gr = RepoGroupModel().create(
 
                group_name=form_result['group_name'],
 
                group_description=form_result['group_description'],
 
                parent=form_result['parent_group_id'],
 
                owner=request.authuser.user_id, # TODO: make editable
 
                copy_permissions=form_result['group_copy_permissions']
 
            )
 
            Session().commit()
 
            # TODO: in future action_logger(, '', '', '')
 
        except formencode.Invalid as errors:
 
            return htmlfill.render(
 
                render('admin/repo_groups/repo_group_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during creation of repository group %s')
 
                    % request.POST.get('group_name'), category='error')
 
            if form_result is None:
 
                raise
 
            parent_group_id = form_result['parent_group_id']
 
            # TODO: maybe we should get back to the main view, not the admin one
 
            raise HTTPFound(location=url('repos_groups', parent_group=parent_group_id))
 
        h.flash(_('Created repository group %s') % gr.group_name,
 
                category='success')
 
        raise HTTPFound(location=url('repos_group_home', group_name=gr.group_name))
 

	
 
    def new(self):
 
        if HasPermissionAny('hg.admin')('group create'):
 
            # we're global admin, we're ok and we can create TOP level groups
 
            pass
 
        else:
 
            # we pass in parent group into creation form, thus we know
 
            # what would be the group, we can check perms here !
 
            group_id = safe_int(request.GET.get('parent_group'))
 
            group = RepoGroup.get(group_id) if group_id else None
 
            group_name = group.group_name if group else None
 
            if HasRepoGroupPermissionLevel('admin')(group_name, 'group create'):
 
                pass
 
            else:
 
                raise HTTPForbidden()
 

	
 
        self.__load_defaults()
 
        return render('admin/repo_groups/repo_group_add.html')
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def update(self, group_name):
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        self.__load_defaults(extras=[c.repo_group.parent_group],
 
                             exclude=[c.repo_group])
 

	
 
        # TODO: kill allow_empty_group - it is only used for redundant form validation!
 
        if HasPermissionAny('hg.admin')('group edit'):
 
            # we're global admin, we're ok and we can create TOP level groups
 
            allow_empty_group = True
 
        elif not c.repo_group.parent_group:
 
            allow_empty_group = True
 
        else:
 
            allow_empty_group = False
 
        repo_group_form = RepoGroupForm(
 
            edit=True,
 
            old_data=c.repo_group.get_dict(),
 
            repo_groups=c.repo_groups,
 
            can_create_in_root=allow_empty_group,
 
        )()
 
        try:
 
            form_result = repo_group_form.to_python(dict(request.POST))
 

	
 
            new_gr = RepoGroupModel().update(group_name, form_result)
 
            Session().commit()
 
            h.flash(_('Updated repository group %s')
 
                    % form_result['group_name'], category='success')
 
            # we now have new name !
 
            group_name = new_gr.group_name
 
            # TODO: in future action_logger(, '', '', '')
 
        except formencode.Invalid as errors:
 
            c.active = 'settings'
 
            return htmlfill.render(
 
                render('admin/repo_groups/repo_group_edit.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during update of repository group %s')
 
                    % request.POST.get('group_name'), category='error')
 

	
 
        raise HTTPFound(location=url('edit_repo_group', group_name=group_name))
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def delete(self, group_name):
 
        gr = c.repo_group = RepoGroup.guess_instance(group_name)
 
        repos = gr.repositories.all()
 
        if repos:
 
            h.flash(_('This group contains %s repositories and cannot be '
 
                      'deleted') % len(repos), category='warning')
 
            raise HTTPFound(location=url('repos_groups'))
 

	
 
        children = gr.children.all()
 
        if children:
 
            h.flash(_('This group contains %s subgroups and cannot be deleted'
 
                      % (len(children))), category='warning')
 
            raise HTTPFound(location=url('repos_groups'))
 

	
 
        try:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            h.flash(_('Removed repository group %s') % group_name,
 
                    category='success')
 
            # TODO: in future action_logger(, '', '', '')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during deletion of repository group %s')
 
                    % group_name, category='error')
 

	
 
        if gr.parent_group:
 
            raise HTTPFound(location=url('repos_group_home', group_name=gr.parent_group.group_name))
 
        raise HTTPFound(location=url('repos_groups'))
 

	
 
    def show_by_name(self, group_name):
 
        """
 
        This is a proxy that does a lookup group_name -> id, and shows
 
        the group by id view instead
 
        """
 
        group_name = group_name.rstrip('/')
 
        id_ = RepoGroup.get_by_group_name(group_name)
 
        if id_:
 
            return self.show(group_name)
 
        raise HTTPNotFound
 

	
 
    @HasRepoGroupPermissionLevelDecorator('read')
 
    def show(self, group_name):
 
        c.active = 'settings'
 

	
 
        c.group = c.repo_group = RepoGroup.guess_instance(group_name)
 

	
 
        groups = RepoGroup.query(sorted=True).filter_by(parent_group=c.group).all()
 
        repo_groups_list = self.scm_model.get_repo_groups(groups)
 

	
 
        repos_list = Repository.query(sorted=True).filter_by(group=c.group).all()
 
        c.data = RepoModel().get_repos_as_dict(repos_list,
 
                                               repo_groups_list=repo_groups_list,
 
                                               short_name=True)
 

	
 
        return render('admin/repo_groups/repo_group_show.html')
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit(self, group_name):
 
        c.active = 'settings'
 

	
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        self.__load_defaults(extras=[c.repo_group.parent_group],
 
                             exclude=[c.repo_group])
 
        defaults = self.__load_data(c.repo_group.group_id)
 

	
 
        return htmlfill.render(
 
            render('admin/repo_groups/repo_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit_repo_group_advanced(self, group_name):
 
        c.active = 'advanced'
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 

	
 
        return render('admin/repo_groups/repo_group_edit.html')
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit_repo_group_perms(self, group_name):
 
        c.active = 'perms'
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        self.__load_defaults()
 
        defaults = self.__load_data(c.repo_group.group_id)
 

	
 
        return htmlfill.render(
 
            render('admin/repo_groups/repo_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def update_perms(self, group_name):
 
        """
 
        Update permissions for given repository group
 

	
 
        :param group_name:
 
        """
 

	
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        valid_recursive_choices = ['none', 'repos', 'groups', 'all']
 
        form_result = RepoGroupPermsForm(valid_recursive_choices)().to_python(request.POST)
 
        if not request.authuser.is_admin:
 
            if self._revoke_perms_on_yourself(form_result):
 
                msg = _('Cannot revoke permission for yourself as admin')
 
                h.flash(msg, category='warning')
 
                raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
 
        recursive = form_result['recursive']
 
        # iterate over all members(if in recursive mode) of this groups and
 
        # set the permissions !
 
        # this can be potentially heavy operation
 
        RepoGroupModel()._update_permissions(c.repo_group,
 
                                             form_result['perms_new'],
 
                                             form_result['perms_updates'],
 
                                             recursive)
 
        # TODO: implement this
 
        #action_logger(request.authuser, 'admin_changed_repo_permissions',
 
        #              repo_name, request.ip_addr)
 
        Session().commit()
 
        h.flash(_('Repository group permissions updated'), category='success')
 
        raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def delete_perms(self, group_name):
 
        try:
 
            obj_type = request.POST.get('obj_type')
 
            obj_id = None
 
            if obj_type == 'user':
 
                obj_id = safe_int(request.POST.get('user_id'))
 
            elif obj_type == 'user_group':
 
                obj_id = safe_int(request.POST.get('user_group_id'))
 

	
 
            if not request.authuser.is_admin:
 
                if obj_type == 'user' and request.authuser.user_id == obj_id:
 
                    msg = _('Cannot revoke permission for yourself as admin')
 
                    h.flash(msg, category='warning')
 
                    raise Exception('revoke admin permission on self')
 
            recursive = request.POST.get('recursive', 'none')
 
            if obj_type == 'user':
 
                RepoGroupModel().delete_permission(repo_group=group_name,
 
                                                   obj=obj_id, obj_type='user',
 
                                                   recursive=recursive)
 
            elif obj_type == 'user_group':
 
                RepoGroupModel().delete_permission(repo_group=group_name,
 
                                                   obj=obj_id,
 
                                                   obj_type='user_group',
 
                                                   recursive=recursive)
 

	
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during revoking of permission'),
 
                    category='error')
 
            raise HTTPInternalServerError()
kallithea/controllers/pullrequests.py
Show inline comments
 
@@ -92,552 +92,553 @@ class PullrequestsController(BaseRepoCon
 
            # a revset not restricting to merge() would be better
 
            # (especially because it would get the branch point)
 
            # ... but is currently too expensive
 
            # including branches of children could be nice too
 
            peerbranches = set()
 
            for i in repo._repo.revs(
 
                b"sort(parents(branch(id(%s)) and merge()) - branch(id(%s)), -rev)",
 
                ascii_bytes(branch_rev), ascii_bytes(branch_rev),
 
            ):
 
                for abranch in repo.get_changeset(i).branches:
 
                    if abranch not in peerbranches:
 
                        n = 'branch:%s:%s' % (abranch, repo.get_changeset(abranch).raw_id)
 
                        peers.append((n, abranch))
 
                        peerbranches.add(abranch)
 

	
 
        selected = None
 
        tiprev = repo.tags.get('tip')
 
        tipbranch = None
 

	
 
        branches = []
 
        for abranch, branchrev in repo.branches.iteritems():
 
            n = 'branch:%s:%s' % (abranch, branchrev)
 
            desc = abranch
 
            if branchrev == tiprev:
 
                tipbranch = abranch
 
                desc = '%s (current tip)' % desc
 
            branches.append((n, desc))
 
            if rev == branchrev:
 
                selected = n
 
            if branch == abranch:
 
                if not rev:
 
                    selected = n
 
                branch = None
 
        if branch:  # branch not in list - it is probably closed
 
            branchrev = repo.closed_branches.get(branch)
 
            if branchrev:
 
                n = 'branch:%s:%s' % (branch, branchrev)
 
                branches.append((n, _('%s (closed)') % branch))
 
                selected = n
 
                branch = None
 
            if branch:
 
                log.debug('branch %r not found in %s', branch, repo)
 

	
 
        bookmarks = []
 
        for bookmark, bookmarkrev in repo.bookmarks.iteritems():
 
            n = 'book:%s:%s' % (bookmark, bookmarkrev)
 
            bookmarks.append((n, bookmark))
 
            if rev == bookmarkrev:
 
                selected = n
 

	
 
        tags = []
 
        for tag, tagrev in repo.tags.iteritems():
 
            if tag == 'tip':
 
                continue
 
            n = 'tag:%s:%s' % (tag, tagrev)
 
            tags.append((n, tag))
 
            # note: even if rev == tagrev, don't select the static tag - it must be chosen explicitly
 

	
 
        # prio 1: rev was selected as existing entry above
 

	
 
        # prio 2: create special entry for rev; rev _must_ be used
 
        specials = []
 
        if rev and selected is None:
 
            selected = 'rev:%s:%s' % (rev, rev)
 
            specials = [(selected, '%s: %s' % (_("Changeset"), rev[:12]))]
 

	
 
        # prio 3: most recent peer branch
 
        if peers and not selected:
 
            selected = peers[0][0]
 

	
 
        # prio 4: tip revision
 
        if not selected:
 
            if h.is_hg(repo):
 
                if tipbranch:
 
                    selected = 'branch:%s:%s' % (tipbranch, tiprev)
 
                else:
 
                    selected = 'tag:null:' + repo.EMPTY_CHANGESET
 
                    tags.append((selected, 'null'))
 
            else:
 
                if 'master' in repo.branches:
 
                    selected = 'branch:master:%s' % repo.branches['master']
 
                else:
 
                    k, v = list(repo.branches.items())[0]
 
                    selected = 'branch:%s:%s' % (k, v)
 

	
 
        groups = [(specials, _("Special")),
 
                  (peers, _("Peer branches")),
 
                  (bookmarks, _("Bookmarks")),
 
                  (branches, _("Branches")),
 
                  (tags, _("Tags")),
 
                  ]
 
        return [g for g in groups if g[0]], selected
 

	
 
    def _is_allowed_to_change_status(self, pull_request):
 
        if pull_request.is_closed():
 
            return False
 

	
 
        owner = request.authuser.user_id == pull_request.owner_id
 
        reviewer = PullRequestReviewer.query() \
 
            .filter(PullRequestReviewer.pull_request == pull_request) \
 
            .filter(PullRequestReviewer.user_id == request.authuser.user_id) \
 
            .count() != 0
 

	
 
        return request.authuser.admin or owner or reviewer
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def show_all(self, repo_name):
 
        c.from_ = request.GET.get('from_') or ''
 
        c.closed = request.GET.get('closed') or ''
 
        url_params = {}
 
        if c.from_:
 
            url_params['from_'] = 1
 
        if c.closed:
 
            url_params['closed'] = 1
 
        p = safe_int(request.GET.get('page'), 1)
 

	
 
        q = PullRequest.query(include_closed=c.closed, sorted=True)
 
        if c.from_:
 
            q = q.filter_by(org_repo=c.db_repo)
 
        else:
 
            q = q.filter_by(other_repo=c.db_repo)
 
        c.pull_requests = q.all()
 

	
 
        c.pullrequests_pager = Page(c.pull_requests, page=p, items_per_page=100, **url_params)
 

	
 
        return render('/pullrequests/pullrequest_show_all.html')
 

	
 
    @LoginRequired()
 
    def show_my(self):
 
        c.closed = request.GET.get('closed') or ''
 

	
 
        c.my_pull_requests = PullRequest.query(
 
            include_closed=c.closed,
 
            sorted=True,
 
        ).filter_by(owner_id=request.authuser.user_id).all()
 

	
 
        c.participate_in_pull_requests = []
 
        c.participate_in_pull_requests_todo = []
 
        done_status = set([ChangesetStatus.STATUS_APPROVED, ChangesetStatus.STATUS_REJECTED])
 
        for pr in PullRequest.query(
 
            include_closed=c.closed,
 
            reviewer_id=request.authuser.user_id,
 
            sorted=True,
 
        ):
 
            status = pr.user_review_status(request.authuser.user_id) # very inefficient!!!
 
            if status in done_status:
 
                c.participate_in_pull_requests.append(pr)
 
            else:
 
                c.participate_in_pull_requests_todo.append(pr)
 

	
 
        return render('/pullrequests/pullrequest_show_my.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self):
 
        org_repo = c.db_repo
 
        org_scm_instance = org_repo.scm_instance
 
        try:
 
            org_scm_instance.get_changeset()
 
        except EmptyRepositoryError as e:
 
            h.flash(_('There are no changesets yet'),
 
                    category='warning')
 
            raise HTTPFound(location=url('summary_home', repo_name=org_repo.repo_name))
 

	
 
        org_rev = request.GET.get('rev_end')
 
        # rev_start is not directly useful - its parent could however be used
 
        # as default for other and thus give a simple compare view
 
        rev_start = request.GET.get('rev_start')
 
        other_rev = None
 
        if rev_start:
 
            starters = org_repo.get_changeset(rev_start).parents
 
            if starters:
 
                other_rev = starters[0].raw_id
 
            else:
 
                other_rev = org_repo.scm_instance.EMPTY_CHANGESET
 
        branch = request.GET.get('branch')
 

	
 
        c.cs_repos = [(org_repo.repo_name, org_repo.repo_name)]
 
        c.default_cs_repo = org_repo.repo_name
 
        c.cs_refs, c.default_cs_ref = self._get_repo_refs(org_scm_instance, rev=org_rev, branch=branch)
 

	
 
        default_cs_ref_type, default_cs_branch, default_cs_rev = c.default_cs_ref.split(':')
 
        if default_cs_ref_type != 'branch':
 
            default_cs_branch = org_repo.get_changeset(default_cs_rev).branch
 

	
 
        # add org repo to other so we can open pull request against peer branches on itself
 
        c.a_repos = [(org_repo.repo_name, '%s (self)' % org_repo.repo_name)]
 

	
 
        if org_repo.parent:
 
            # add parent of this fork also and select it.
 
            # use the same branch on destination as on source, if available.
 
            c.a_repos.append((org_repo.parent.repo_name, '%s (parent)' % org_repo.parent.repo_name))
 
            c.a_repo = org_repo.parent
 
            c.a_refs, c.default_a_ref = self._get_repo_refs(
 
                    org_repo.parent.scm_instance, branch=default_cs_branch, rev=other_rev)
 

	
 
        else:
 
            c.a_repo = org_repo
 
            c.a_refs, c.default_a_ref = self._get_repo_refs(org_scm_instance, rev=other_rev)
 

	
 
        # gather forks and add to this list ... even though it is rare to
 
        # request forks to pull from their parent
 
        for fork in org_repo.forks:
 
            c.a_repos.append((fork.repo_name, fork.repo_name))
 

	
 
        return render('/pullrequests/pullrequest.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def repo_info(self, repo_name):
 
        repo = c.db_repo
 
        refs, selected_ref = self._get_repo_refs(repo.scm_instance)
 
        return {
 
            'description': repo.description.split('\n', 1)[0],
 
            'selected_ref': selected_ref,
 
            'refs': refs,
 
            }
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def create(self, repo_name):
 
        repo = c.db_repo
 
        try:
 
            _form = PullRequestForm(repo.repo_id)().to_python(request.POST)
 
        except formencode.Invalid as errors:
 
            log.error(traceback.format_exc())
 
            log.error(str(errors))
 
            msg = _('Error creating pull request: %s') % errors.msg
 
            h.flash(msg, 'error')
 
            raise HTTPBadRequest
 

	
 
        # heads up: org and other might seem backward here ...
 
        org_ref = _form['org_ref'] # will have merge_rev as rev but symbolic name
 
        org_repo = Repository.guess_instance(_form['org_repo'])
 

	
 
        other_ref = _form['other_ref'] # will have symbolic name and head revision
 
        other_repo = Repository.guess_instance(_form['other_repo'])
 

	
 
        reviewers = []
 

	
 
        title = _form['pullrequest_title']
 
        description = _form['pullrequest_desc'].strip()
 
        owner = User.get(request.authuser.user_id)
 

	
 
        try:
 
            cmd = CreatePullRequestAction(org_repo, other_repo, org_ref, other_ref, title, description, owner, reviewers)
 
        except CreatePullRequestAction.ValidationError as e:
 
            h.flash(e, category='error', logf=log.error)
 
            raise HTTPNotFound
 

	
 
        try:
 
            pull_request = cmd.execute()
 
            Session().commit()
 
        except Exception:
 
            h.flash(_('Error occurred while creating pull request'),
 
                    category='error')
 
            log.error(traceback.format_exc())
 
            raise HTTPFound(location=url('pullrequest_home', repo_name=repo_name))
 

	
 
        h.flash(_('Successfully opened new pull request'),
 
                category='success')
 
        raise HTTPFound(location=pull_request.url())
 

	
 
    def create_new_iteration(self, old_pull_request, new_rev, title, description, reviewers):
 
        owner = User.get(request.authuser.user_id)
 
        new_org_rev = self._get_ref_rev(old_pull_request.org_repo, 'rev', new_rev)
 
        new_other_rev = self._get_ref_rev(old_pull_request.other_repo, old_pull_request.other_ref_parts[0], old_pull_request.other_ref_parts[1])
 
        try:
 
            cmd = CreatePullRequestIterationAction(old_pull_request, new_org_rev, new_other_rev, title, description, owner, reviewers)
 
        except CreatePullRequestAction.ValidationError as e:
 
            h.flash(e, category='error', logf=log.error)
 
            raise HTTPNotFound
 

	
 
        try:
 
            pull_request = cmd.execute()
 
            Session().commit()
 
        except Exception:
 
            h.flash(_('Error occurred while creating pull request'),
 
                    category='error')
 
            log.error(traceback.format_exc())
 
            raise HTTPFound(location=old_pull_request.url())
 

	
 
        h.flash(_('New pull request iteration created'),
 
                category='success')
 
        raise HTTPFound(location=pull_request.url())
 

	
 
    # pullrequest_post for PR editing
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def post(self, repo_name, pull_request_id):
 
        pull_request = PullRequest.get_or_404(pull_request_id)
 
        if pull_request.is_closed():
 
            raise HTTPForbidden()
 
        assert pull_request.other_repo.repo_name == repo_name
 
        # only owner or admin can update it
 
        owner = pull_request.owner_id == request.authuser.user_id
 
        repo_admin = h.HasRepoPermissionLevel('admin')(c.repo_name)
 
        if not (h.HasPermissionAny('hg.admin')() or repo_admin or owner):
 
            raise HTTPForbidden()
 

	
 
        _form = PullRequestPostForm()().to_python(request.POST)
 

	
 
        cur_reviewers = set(pull_request.get_reviewer_users())
 
        new_reviewers = set(_get_reviewer(s) for s in _form['review_members'])
 
        old_reviewers = set(_get_reviewer(s) for s in _form['org_review_members'])
 

	
 
        other_added = cur_reviewers - old_reviewers
 
        other_removed = old_reviewers - cur_reviewers
 

	
 
        if other_added:
 
            h.flash(_('Meanwhile, the following reviewers have been added: %s') %
 
                    (', '.join(u.username for u in other_added)),
 
                    category='warning')
 
        if other_removed:
 
            h.flash(_('Meanwhile, the following reviewers have been removed: %s') %
 
                    (', '.join(u.username for u in other_removed)),
 
                    category='warning')
 

	
 
        if _form['updaterev']:
 
            return self.create_new_iteration(pull_request,
 
                                      _form['updaterev'],
 
                                      _form['pullrequest_title'],
 
                                      _form['pullrequest_desc'],
 
                                      new_reviewers)
 

	
 
        added_reviewers = new_reviewers - old_reviewers - cur_reviewers
 
        removed_reviewers = (old_reviewers - new_reviewers) & cur_reviewers
 

	
 
        old_description = pull_request.description
 
        pull_request.title = _form['pullrequest_title']
 
        pull_request.description = _form['pullrequest_desc'].strip() or _('No description')
 
        pull_request.owner = User.get_by_username(_form['owner'])
 
        user = User.get(request.authuser.user_id)
 

	
 
        PullRequestModel().mention_from_description(user, pull_request, old_description)
 
        PullRequestModel().add_reviewers(user, pull_request, added_reviewers)
 
        PullRequestModel().remove_reviewers(user, pull_request, removed_reviewers)
 

	
 
        Session().commit()
 
        h.flash(_('Pull request updated'), category='success')
 

	
 
        raise HTTPFound(location=pull_request.url())
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def delete(self, repo_name, pull_request_id):
 
        pull_request = PullRequest.get_or_404(pull_request_id)
 
        # only owner can delete it !
 
        if pull_request.owner_id == request.authuser.user_id:
 
            PullRequestModel().delete(pull_request)
 
            Session().commit()
 
            h.flash(_('Successfully deleted pull request'),
 
                    category='success')
 
            raise HTTPFound(location=url('my_pullrequests'))
 
        raise HTTPForbidden()
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def show(self, repo_name, pull_request_id, extra=None):
 
        c.pull_request = PullRequest.get_or_404(pull_request_id)
 
        c.allowed_to_change_status = self._is_allowed_to_change_status(c.pull_request)
 
        cc_model = ChangesetCommentsModel()
 
        cs_model = ChangesetStatusModel()
 

	
 
        # pull_requests repo_name we opened it against
 
        # ie. other_repo must match
 
        if repo_name != c.pull_request.other_repo.repo_name:
 
            raise HTTPNotFound
 

	
 
        # load compare data into template context
 
        c.cs_repo = c.pull_request.org_repo
 
        (c.cs_ref_type,
 
         c.cs_ref_name,
 
         c.cs_rev) = c.pull_request.org_ref.split(':')
 

	
 
        c.a_repo = c.pull_request.other_repo
 
        (c.a_ref_type,
 
         c.a_ref_name,
 
         c.a_rev) = c.pull_request.other_ref.split(':') # a_rev is ancestor
 

	
 
        org_scm_instance = c.cs_repo.scm_instance # property with expensive cache invalidation check!!!
 
        try:
 
            c.cs_ranges = []
 
            for x in c.pull_request.revisions:
 
        c.cs_ranges = []
 
        for x in c.pull_request.revisions:
 
            try:
 
                c.cs_ranges.append(org_scm_instance.get_changeset(x))
 
        except ChangesetDoesNotExistError:
 
            c.cs_ranges = []
 
            h.flash(_('Revision %s not found in %s') % (x, c.cs_repo.repo_name),
 
                'error')
 
            except ChangesetDoesNotExistError:
 
                c.cs_ranges = []
 
                h.flash(_('Revision %s not found in %s') % (x, c.cs_repo.repo_name),
 
                    'error')
 
                break
 
        c.cs_ranges_org = None # not stored and not important and moving target - could be calculated ...
 
        revs = [ctx.revision for ctx in reversed(c.cs_ranges)]
 
        c.jsdata = graph_data(org_scm_instance, revs)
 

	
 
        c.is_range = False
 
        try:
 
            if c.a_ref_type == 'rev': # this looks like a free range where target is ancestor
 
                cs_a = org_scm_instance.get_changeset(c.a_rev)
 
                root_parents = c.cs_ranges[0].parents
 
                c.is_range = cs_a in root_parents
 
                #c.merge_root = len(root_parents) > 1 # a range starting with a merge might deserve a warning
 
        except ChangesetDoesNotExistError: # probably because c.a_rev not found
 
            pass
 
        except IndexError: # probably because c.cs_ranges is empty, probably because revisions are missing
 
            pass
 

	
 
        avail_revs = set()
 
        avail_show = []
 
        c.cs_branch_name = c.cs_ref_name
 
        c.a_branch_name = None
 
        other_scm_instance = c.a_repo.scm_instance
 
        c.update_msg = ""
 
        c.update_msg_other = ""
 
        try:
 
            if not c.cs_ranges:
 
                c.update_msg = _('Error: changesets not found when displaying pull request from %s.') % c.cs_rev
 
            elif org_scm_instance.alias == 'hg' and c.a_ref_name != 'ancestor':
 
                if c.cs_ref_type != 'branch':
 
                    c.cs_branch_name = org_scm_instance.get_changeset(c.cs_ref_name).branch # use ref_type ?
 
                c.a_branch_name = c.a_ref_name
 
                if c.a_ref_type != 'branch':
 
                    try:
 
                        c.a_branch_name = other_scm_instance.get_changeset(c.a_ref_name).branch # use ref_type ?
 
                    except EmptyRepositoryError:
 
                        c.a_branch_name = 'null' # not a branch name ... but close enough
 
                # candidates: descendants of old head that are on the right branch
 
                #             and not are the old head itself ...
 
                #             and nothing at all if old head is a descendant of target ref name
 
                if not c.is_range and other_scm_instance._repo.revs('present(%s)::&%s', c.cs_ranges[-1].raw_id, c.a_branch_name):
 
                    c.update_msg = _('This pull request has already been merged to %s.') % c.a_branch_name
 
                elif c.pull_request.is_closed():
 
                    c.update_msg = _('This pull request has been closed and can not be updated.')
 
                else: # look for descendants of PR head on source branch in org repo
 
                    avail_revs = org_scm_instance._repo.revs('%s:: & branch(%s)',
 
                                                             revs[0], c.cs_branch_name)
 
                    if len(avail_revs) > 1: # more than just revs[0]
 
                        # also show changesets that not are descendants but would be merged in
 
                        targethead = other_scm_instance.get_changeset(c.a_branch_name).raw_id
 
                        if org_scm_instance.path != other_scm_instance.path:
 
                            # Note: org_scm_instance.path must come first so all
 
                            # valid revision numbers are 100% org_scm compatible
 
                            # - both for avail_revs and for revset results
 
                            hgrepo = mercurial.unionrepo.makeunionrepository(org_scm_instance.baseui,
 
                                                                   safe_bytes(org_scm_instance.path),
 
                                                                   safe_bytes(other_scm_instance.path))
 
                        else:
 
                            hgrepo = org_scm_instance._repo
 
                        show = set(hgrepo.revs('::%ld & !::parents(%s) & !::%s',
 
                                               avail_revs, revs[0], targethead))
 
                        if show:
 
                            c.update_msg = _('The following additional changes are available on %s:') % c.cs_branch_name
 
                        else:
 
                            c.update_msg = _('No additional changesets found for iterating on this pull request.')
 
                    else:
 
                        show = set()
 
                        avail_revs = set() # drop revs[0]
 
                        c.update_msg = _('No additional changesets found for iterating on this pull request.')
 

	
 
                    # TODO: handle branch heads that not are tip-most
 
                    brevs = org_scm_instance._repo.revs('%s - %ld - %s', c.cs_branch_name, avail_revs, revs[0])
 
                    if brevs:
 
                        # also show changesets that are on branch but neither ancestors nor descendants
 
                        show.update(org_scm_instance._repo.revs('::%ld - ::%ld - ::%s', brevs, avail_revs, c.a_branch_name))
 
                        show.add(revs[0]) # make sure graph shows this so we can see how they relate
 
                        c.update_msg_other = _('Note: Branch %s has another head: %s.') % (c.cs_branch_name,
 
                            h.short_id(org_scm_instance.get_changeset((max(brevs))).raw_id))
 

	
 
                    avail_show = sorted(show, reverse=True)
 

	
 
            elif org_scm_instance.alias == 'git':
 
                c.cs_repo.scm_instance.get_changeset(c.cs_rev) # check it exists - raise ChangesetDoesNotExistError if not
 
                c.update_msg = _("Git pull requests don't support iterating yet.")
 
        except ChangesetDoesNotExistError:
 
            c.update_msg = _('Error: some changesets not found when displaying pull request from %s.') % c.cs_rev
 

	
 
        c.avail_revs = avail_revs
 
        c.avail_cs = [org_scm_instance.get_changeset(r) for r in avail_show]
 
        c.avail_jsdata = graph_data(org_scm_instance, avail_show)
 

	
 
        raw_ids = [x.raw_id for x in c.cs_ranges]
 
        c.cs_comments = c.cs_repo.get_comments(raw_ids)
 
        c.cs_statuses = c.cs_repo.statuses(raw_ids)
 

	
 
        ignore_whitespace = request.GET.get('ignorews') == '1'
 
        line_context = safe_int(request.GET.get('context'), 3)
 
        c.ignorews_url = _ignorews_url
 
        c.context_url = _context_url
 
        fulldiff = request.GET.get('fulldiff')
 
        diff_limit = None if fulldiff else self.cut_off_limit
 

	
 
        # we swap org/other ref since we run a simple diff on one repo
 
        log.debug('running diff between %s and %s in %s',
 
                  c.a_rev, c.cs_rev, org_scm_instance.path)
 
        try:
 
            raw_diff = diffs.get_diff(org_scm_instance, rev1=safe_str(c.a_rev), rev2=safe_str(c.cs_rev),
 
                                      ignore_whitespace=ignore_whitespace, context=line_context)
 
        except ChangesetDoesNotExistError:
 
            raw_diff = safe_bytes(_("The diff can't be shown - the PR revisions could not be found."))
 
        diff_processor = diffs.DiffProcessor(raw_diff, diff_limit=diff_limit)
 
        c.limited_diff = diff_processor.limited_diff
 
        c.file_diff_data = []
 
        c.lines_added = 0
 
        c.lines_deleted = 0
 

	
 
        for f in diff_processor.parsed:
 
            st = f['stats']
 
            c.lines_added += st['added']
 
            c.lines_deleted += st['deleted']
 
            filename = f['filename']
 
            fid = h.FID('', filename)
 
            html_diff = diffs.as_html(enable_comments=True, parsed_lines=[f])
 
            c.file_diff_data.append((fid, None, f['operation'], f['old_filename'], filename, html_diff, st))
 

	
 
        # inline comments
 
        c.inline_cnt = 0
 
        c.inline_comments = cc_model.get_inline_comments(
 
                                c.db_repo.repo_id,
 
                                pull_request=pull_request_id)
 
        # count inline comments
 
        for __, lines in c.inline_comments:
 
            for comments in lines.values():
 
                c.inline_cnt += len(comments)
 
        # comments
 
        c.comments = cc_model.get_comments(c.db_repo.repo_id, pull_request=pull_request_id)
 

	
 
        # (badly named) pull-request status calculation based on reviewer votes
 
        (c.pull_request_reviewers,
 
         c.pull_request_pending_reviewers,
 
         c.current_voting_result,
 
         ) = cs_model.calculate_pull_request_result(c.pull_request)
 
        c.changeset_statuses = ChangesetStatus.STATUSES
 

	
 
        c.is_ajax_preview = False
 
        c.ancestors = None # [c.a_rev] ... but that is shown in an other way
 
        return render('/pullrequests/pullrequest_show.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def comment(self, repo_name, pull_request_id):
 
        pull_request = PullRequest.get_or_404(pull_request_id)
 
        allowed_to_change_status = self._is_allowed_to_change_status(pull_request)
 
        return create_cs_pr_comment(repo_name, pull_request=pull_request,
 
                allowed_to_change_status=allowed_to_change_status)
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def delete_comment(self, repo_name, comment_id):
 
        return delete_cs_pr_comment(repo_name, comment_id)
kallithea/i18n/lb/LC_MESSAGES/kallithea.po
Show inline comments
 
new file 100644
 
# Copyright (C) 2020 Various authors, licensing as GPLv3
 
# This file is distributed under the same license as the Kallithea project.
 

	
 
msgid ""
 
msgstr ""
 
"Report-Msgid-Bugs-To: translations@kallithea-scm.org\n"
 
"Language: lb\n"
 
"MIME-Version: 1.0\n"
 
"Content-Type: text/plain; charset=UTF-8\n"
 
"Content-Transfer-Encoding: 8bit\n"
 
"Plural-Forms: nplurals=2; plural=n != 1;\n"
 

	
 
msgid "There are no changesets yet"
 
msgstr "Et sinn nach keng Ännerungen do"
 

	
 
msgid "(closed)"
 
msgstr "(Zou)"
kallithea/lib/diffs.py
Show inline comments
 
@@ -177,503 +177,503 @@ def wrap_to_table(html):
 
                </tr>
 
              </table>''' % html
 

	
 

	
 
def wrapped_diff(filenode_old, filenode_new, diff_limit=None,
 
                ignore_whitespace=True, line_context=3,
 
                enable_comments=False):
 
    """
 
    Returns a file diff wrapped into a table.
 
    Checks for diff_limit and presents a message if the diff is too big.
 
    """
 
    if filenode_old is None:
 
        filenode_old = FileNode(filenode_new.path, '', EmptyChangeset())
 

	
 
    op = None
 
    a_path = filenode_old.path # default, might be overriden by actual rename in diff
 
    if filenode_old.is_binary or filenode_new.is_binary:
 
        html_diff = wrap_to_table(_('Binary file'))
 
        stats = (0, 0)
 

	
 
    elif diff_limit != -1 and (
 
            diff_limit is None or
 
            (filenode_old.size < diff_limit and filenode_new.size < diff_limit)):
 

	
 
        raw_diff = get_gitdiff(filenode_old, filenode_new,
 
                                ignore_whitespace=ignore_whitespace,
 
                                context=line_context)
 
        diff_processor = DiffProcessor(raw_diff)
 
        if diff_processor.parsed: # there should be exactly one element, for the specified file
 
            f = diff_processor.parsed[0]
 
            op = f['operation']
 
            a_path = f['old_filename']
 

	
 
        html_diff = as_html(parsed_lines=diff_processor.parsed, enable_comments=enable_comments)
 
        stats = diff_processor.stat()
 

	
 
    else:
 
        html_diff = wrap_to_table(_('Changeset was too big and was cut off, use '
 
                               'diff menu to display this diff'))
 
        stats = (0, 0)
 

	
 
    if not html_diff:
 
        submodules = [o for o in [filenode_new, filenode_old] if isinstance(o, SubModuleNode)]
 
        if submodules:
 
            html_diff = wrap_to_table(h.escape('Submodule %r' % submodules[0]))
 
        else:
 
            html_diff = wrap_to_table(_('No changes detected'))
 

	
 
    cs1 = filenode_old.changeset.raw_id
 
    cs2 = filenode_new.changeset.raw_id
 

	
 
    return cs1, cs2, a_path, html_diff, stats, op
 

	
 

	
 
def get_gitdiff(filenode_old, filenode_new, ignore_whitespace=True, context=3):
 
    """
 
    Returns git style diff between given ``filenode_old`` and ``filenode_new``.
 
    """
 
    # make sure we pass in default context
 
    context = context or 3
 
    submodules = [o for o in [filenode_new, filenode_old] if isinstance(o, SubModuleNode)]
 
    if submodules:
 
        return b''
 

	
 
    for filenode in (filenode_old, filenode_new):
 
        if not isinstance(filenode, FileNode):
 
            raise VCSError("Given object should be FileNode object, not %s"
 
                % filenode.__class__)
 

	
 
    repo = filenode_new.changeset.repository
 
    old_raw_id = getattr(filenode_old.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 
    new_raw_id = getattr(filenode_new.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 

	
 
    vcs_gitdiff = get_diff(repo, old_raw_id, new_raw_id, filenode_new.path,
 
                           ignore_whitespace, context)
 
    return vcs_gitdiff
 

	
 

	
 
def get_diff(scm_instance, rev1, rev2, path=None, ignore_whitespace=False, context=3):
 
    """
 
    A thin wrapper around vcs lib get_diff.
 
    """
 
    try:
 
        return scm_instance.get_diff(rev1, rev2, path=path,
 
                                     ignore_whitespace=ignore_whitespace, context=context)
 
    except MemoryError:
 
        h.flash('MemoryError: Diff is too big', category='error')
 
        return b''
 

	
 

	
 
NEW_FILENODE = 1
 
DEL_FILENODE = 2
 
MOD_FILENODE = 3
 
RENAMED_FILENODE = 4
 
COPIED_FILENODE = 5
 
CHMOD_FILENODE = 6
 
BIN_FILENODE = 7
 

	
 

	
 
class DiffProcessor(object):
 
    """
 
    Give it a unified or git diff and it returns a list of the files that were
 
    mentioned in the diff together with a dict of meta information that
 
    can be used to render it in a HTML template.
 
    """
 
    _diff_git_re = re.compile(b'^diff --git', re.MULTILINE)
 

	
 
    def __init__(self, diff, vcs='hg', diff_limit=None, inline_diff=True):
 
        """
 
        :param diff:   a text in diff format
 
        :param vcs: type of version control hg or git
 
        :param diff_limit: define the size of diff that is considered "big"
 
            based on that parameter cut off will be triggered, set to None
 
            to show full diff
 
        """
 
        if not isinstance(diff, bytes):
 
            raise Exception('Diff must be bytes - got %s' % type(diff))
 

	
 
        self._diff = diff
 
        self.adds = 0
 
        self.removes = 0
 
        self.diff_limit = diff_limit
 
        self.limited_diff = False
 
        self.vcs = vcs
 
        self.parsed = self._parse_gitdiff(inline_diff=inline_diff)
 

	
 
    def _parse_gitdiff(self, inline_diff):
 
        """Parse self._diff and return a list of dicts with meta info and chunks for each file.
 
        Might set limited_diff.
 
        Optionally, do an extra pass and to extra markup of one-liner changes.
 
        """
 
        _files = [] # list of dicts with meta info and chunks
 

	
 
        starts = [m.start() for m in self._diff_git_re.finditer(self._diff)]
 
        starts.append(len(self._diff))
 

	
 
        for start, end in zip(starts, starts[1:]):
 
            if self.diff_limit and end > self.diff_limit:
 
                self.limited_diff = True
 
                continue
 

	
 
            head, diff_lines = _get_header(self.vcs, buffer(self._diff, start, end - start))
 

	
 
            op = None
 
            stats = {
 
                'added': 0,
 
                'deleted': 0,
 
                'binary': False,
 
                'ops': {},
 
            }
 

	
 
            if head['deleted_file_mode']:
 
                op = 'removed'
 
                stats['binary'] = True
 
                stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
            elif head['new_file_mode']:
 
                op = 'added'
 
                stats['binary'] = True
 
                stats['ops'][NEW_FILENODE] = 'new file %s' % head['new_file_mode']
 
            else:  # modify operation, can be cp, rename, chmod
 
                # CHMOD
 
                if head['new_mode'] and head['old_mode']:
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][CHMOD_FILENODE] = ('modified file chmod %s => %s'
 
                                        % (head['old_mode'], head['new_mode']))
 
                # RENAME
 
                if (head['rename_from'] and head['rename_to']
 
                      and head['rename_from'] != head['rename_to']):
 
                    op = 'renamed'
 
                    stats['binary'] = True
 
                    stats['ops'][RENAMED_FILENODE] = ('file renamed from %s to %s'
 
                                    % (head['rename_from'], head['rename_to']))
 
                # COPY
 
                if head.get('copy_from') and head.get('copy_to'):
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][COPIED_FILENODE] = ('file copied from %s to %s'
 
                                        % (head['copy_from'], head['copy_to']))
 
                # FALL BACK: detect missed old style add or remove
 
                if op is None:
 
                    if not head['a_file'] and head['b_file']:
 
                        op = 'added'
 
                        stats['binary'] = True
 
                        stats['ops'][NEW_FILENODE] = 'new file'
 

	
 
                    elif head['a_file'] and not head['b_file']:
 
                        op = 'removed'
 
                        stats['binary'] = True
 
                        stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
                # it's not ADD not DELETE
 
                if op is None:
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][MOD_FILENODE] = 'modified file'
 

	
 
            # a real non-binary diff
 
            if head['a_file'] or head['b_file']:
 
                chunks, added, deleted = _parse_lines(diff_lines)
 
                stats['binary'] = False
 
                stats['added'] = added
 
                stats['deleted'] = deleted
 
                # explicit mark that it's a modified file
 
                if op == 'modified':
 
                    stats['ops'][MOD_FILENODE] = 'modified file'
 
            else:  # Git binary patch (or empty diff)
 
                # Git binary patch
 
                if head['bin_patch']:
 
                    stats['ops'][BIN_FILENODE] = 'binary diff not shown'
 
                chunks = []
 

	
 
            if op == 'removed' and chunks:
 
                # a way of seeing deleted content could perhaps be nice - but
 
                # not with the current UI
 
                chunks = []
 

	
 
            chunks.insert(0, [{
 
                'old_lineno': '',
 
                'new_lineno': '',
 
                'action':     'context',
 
                'line':       msg,
 
                } for _op, msg in stats['ops'].iteritems()
 
                  if _op not in [MOD_FILENODE]])
 

	
 
            _files.append({
 
                'old_filename':     head['a_path'],
 
                'filename':         head['b_path'],
 
                'old_revision':     head['a_blob_id'],
 
                'new_revision':     head['b_blob_id'],
 
                'chunks':           chunks,
 
                'operation':        op,
 
                'stats':            stats,
 
            })
 

	
 
        if not inline_diff:
 
            return _files
 

	
 
        # highlight inline changes when one del is followed by one add
 
        for diff_data in _files:
 
            for chunk in diff_data['chunks']:
 
                lineiter = iter(chunk)
 
                try:
 
                    peekline = lineiter.next()
 
                    while True:
 
                        # find a first del line
 
                        while peekline['action'] != 'del':
 
                            peekline = lineiter.next()
 
                        delline = peekline
 
                        peekline = lineiter.next()
 
                        # if not followed by add, eat all following del lines
 
                        if peekline['action'] != 'add':
 
                            while peekline['action'] == 'del':
 
                                peekline = lineiter.next()
 
                            continue
 
                        # found an add - make sure it is the only one
 
                        addline = peekline
 
                        try:
 
                            peekline = lineiter.next()
 
                        except StopIteration:
 
                            # add was last line - ok
 
                            _highlight_inline_diff(delline, addline)
 
                            raise
 
                        if peekline['action'] != 'add':
 
                            # there was only one add line - ok
 
                            _highlight_inline_diff(delline, addline)
 
                except StopIteration:
 
                    pass
 

	
 
        return _files
 

	
 
    def stat(self):
 
        """
 
        Returns tuple of added, and removed lines for this instance
 
        """
 
        return self.adds, self.removes
 

	
 

	
 
_escape_re = re.compile(r'(&)|(<)|(>)|(\t)|(\r)|(?<=.)( \n| $)')
 

	
 

	
 
def _escaper(string):
 
    """
 
    Do HTML escaping/markup
 
    """
 

	
 
    def substitute(m):
 
        groups = m.groups()
 
        if groups[0]:
 
            return '&amp;'
 
        if groups[1]:
 
            return '&lt;'
 
        if groups[2]:
 
            return '&gt;'
 
        if groups[3]:
 
            return '<u>\t</u>'
 
        if groups[4]:
 
            return '<u class="cr"></u>'
 
        if groups[5]:
 
            return ' <i></i>'
 
        assert False
 

	
 
    return _escape_re.sub(substitute, safe_unicode(string))
 

	
 

	
 
_git_header_re = re.compile(br"""
 
    ^diff[ ]--git[ ]a/(?P<a_path>.+?)[ ]b/(?P<b_path>.+?)\n
 
    (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
       ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
    (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%\n
 
       ^rename[ ]from[ ](?P<rename_from>.+)\n
 
       ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
    (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
    (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
    (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
        \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
    (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
    (?:^---[ ](a/(?P<a_file>.+?)|/dev/null)\t?(?:\n|$))?
 
    (?:^\+\+\+[ ](b/(?P<b_file>.+?)|/dev/null)\t?(?:\n|$))?
 
""", re.VERBOSE | re.MULTILINE)
 

	
 

	
 
_hg_header_re = re.compile(br"""
 
    ^diff[ ]--git[ ]a/(?P<a_path>.+?)[ ]b/(?P<b_path>.+?)\n
 
    (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
       ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
    (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%(?:\n|$))?
 
    (?:^rename[ ]from[ ](?P<rename_from>.+)\n
 
       ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
    (?:^copy[ ]from[ ](?P<copy_from>.+)\n
 
       ^copy[ ]to[ ](?P<copy_to>.+)(?:\n|$))?
 
    (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
    (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
    (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
        \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
    (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
    (?:^---[ ](a/(?P<a_file>.+?)|/dev/null)\t?(?:\n|$))?
 
    (?:^\+\+\+[ ](b/(?P<b_file>.+?)|/dev/null)\t?(?:\n|$))?
 
""", re.VERBOSE | re.MULTILINE)
 

	
 

	
 
_header_next_check = re.compile(br'''(?!@)(?!literal )(?!delta )''')
 

	
 

	
 
def _get_header(vcs, diff_chunk):
 
    """
 
    Parses a Git diff for a single file (header and chunks) and returns a tuple with:
 

	
 
    1. A dict with meta info:
 

	
 
        a_path, b_path, similarity_index, rename_from, rename_to,
 
        old_mode, new_mode, new_file_mode, deleted_file_mode,
 
        a_blob_id, b_blob_id, b_mode, a_file, b_file
 

	
 
    2. An iterator yielding lines with simple HTML markup.
 
    """
 
    match = None
 
    if vcs == 'git':
 
        match = _git_header_re.match(diff_chunk)
 
    elif vcs == 'hg':
 
        match = _hg_header_re.match(diff_chunk)
 
    if match is None:
 
        raise Exception('diff not recognized as valid %s diff' % vcs)
 
    meta_info = match.groupdict()
 
    rest = diff_chunk[match.end():]
 
    if rest and _header_next_check.match(rest):
 
        raise Exception('cannot parse %s diff header: %r followed by %r' % (vcs, diff_chunk[:match.end()], rest[:1000]))
 
    diff_lines = (_escaper(m.group(0)) for m in re.finditer(br'.*\n|.+$', rest)) # don't split on \r as str.splitlines do
 
    return meta_info, diff_lines
 

	
 

	
 
_chunk_re = re.compile(r'^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@(.*)')
 
_newline_marker = re.compile(r'^\\ No newline at end of file')
 

	
 

	
 
def _parse_lines(diff_lines):
 
    """
 
    Given an iterator of diff body lines, parse them and return a dict per
 
    line and added/removed totals.
 
    """
 
    added = deleted = 0
 
    old_line = old_end = new_line = new_end = None
 

	
 
    chunks = []
 
    try:
 
        chunks = []
 
        line = diff_lines.next()
 

	
 
        while True:
 
            lines = []
 
            chunks.append(lines)
 

	
 
            match = _chunk_re.match(line)
 

	
 
            if not match:
 
                raise Exception('error parsing diff @@ line %r' % line)
 

	
 
            gr = match.groups()
 
            (old_line, old_end,
 
             new_line, new_end) = [int(x or 1) for x in gr[:-1]]
 
            old_line -= 1
 
            new_line -= 1
 

	
 
            context = len(gr) == 5
 
            old_end += old_line
 
            new_end += new_line
 

	
 
            if context:
 
                # skip context only if it's first line
 
                if int(gr[0]) > 1:
 
                    lines.append({
 
                        'old_lineno': '...',
 
                        'new_lineno': '...',
 
                        'action':     'context',
 
                        'line':       line,
 
                    })
 

	
 
            line = diff_lines.next()
 

	
 
            while old_line < old_end or new_line < new_end:
 
                if not line:
 
                    raise Exception('error parsing diff - empty line at -%s+%s' % (old_line, new_line))
 

	
 
                affects_old = affects_new = False
 

	
 
                command = line[0]
 
                if command == '+':
 
                    affects_new = True
 
                    action = 'add'
 
                    added += 1
 
                elif command == '-':
 
                    affects_old = True
 
                    action = 'del'
 
                    deleted += 1
 
                elif command == ' ':
 
                    affects_old = affects_new = True
 
                    action = 'unmod'
 
                else:
 
                    raise Exception('error parsing diff - unknown command in line %r at -%s+%s' % (line, old_line, new_line))
 

	
 
                if not _newline_marker.match(line):
 
                    old_line += affects_old
 
                    new_line += affects_new
 
                    lines.append({
 
                        'old_lineno':   affects_old and old_line or '',
 
                        'new_lineno':   affects_new and new_line or '',
 
                        'action':       action,
 
                        'line':         line[1:],
 
                    })
 

	
 
                line = diff_lines.next()
 

	
 
                if _newline_marker.match(line):
 
                    # we need to append to lines, since this is not
 
                    # counted in the line specs of diff
 
                    lines.append({
 
                        'old_lineno':   '...',
 
                        'new_lineno':   '...',
 
                        'action':       'context',
 
                        'line':         line,
 
                    })
 
                    line = diff_lines.next()
 
            if old_line > old_end:
 
                raise Exception('error parsing diff - more than %s "-" lines at -%s+%s' % (old_end, old_line, new_line))
 
            if new_line > new_end:
 
                raise Exception('error parsing diff - more than %s "+" lines at -%s+%s' % (new_end, old_line, new_line))
 
    except StopIteration:
 
        pass
 
    if old_line != old_end or new_line != new_end:
 
        raise Exception('diff processing broken when old %s<>%s or new %s<>%s line %r' % (old_line, old_end, new_line, new_end, line))
 

	
 
    return chunks, added, deleted
 

	
 
# Used for inline highlighter word split, must match the substitutions in _escaper
 
_token_re = re.compile(r'()(&amp;|&lt;|&gt;|<u>\t</u>|<u class="cr"></u>| <i></i>|\W+?)')
 

	
 

	
 
def _highlight_inline_diff(old, new):
 
    """
 
    Highlight simple add/remove in two lines given as info dicts. They are
 
    modified in place and given markup with <del>/<ins>.
 
    """
 
    assert old['action'] == 'del'
 
    assert new['action'] == 'add'
 

	
 
    oldwords = _token_re.split(old['line'])
 
    newwords = _token_re.split(new['line'])
 
    sequence = difflib.SequenceMatcher(None, oldwords, newwords)
 

	
 
    oldfragments, newfragments = [], []
 
    for tag, i1, i2, j1, j2 in sequence.get_opcodes():
 
        oldfrag = ''.join(oldwords[i1:i2])
 
        newfrag = ''.join(newwords[j1:j2])
 
        if tag != 'equal':
 
            if oldfrag:
 
                oldfrag = '<del>%s</del>' % oldfrag
 
            if newfrag:
 
                newfrag = '<ins>%s</ins>' % newfrag
 
        oldfragments.append(oldfrag)
 
        newfragments.append(newfrag)
 

	
 
    old['line'] = "".join(oldfragments)
 
    new['line'] = "".join(newfragments)
kallithea/lib/utils2.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils2
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Some simple helper functions.
 
Note: all these functions should be independent of Kallithea classes, i.e.
 
models, controllers, etc.  to prevent import cycles.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 5, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import binascii
 
import datetime
 
import json
 
import os
 
import pwd
 
import re
 
import time
 
import urllib
 

	
 
import urlobject
 
from tg.i18n import ugettext as _
 
from tg.i18n import ungettext
 
from webhelpers2.text import collapse, remove_formatting, strip_tags
 

	
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, safe_bytes, safe_str, safe_unicode  # re-export
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
def str2bool(_str):
 
    """
 
    returns True/False value from given string, it tries to translate the
 
    string into boolean
 

	
 
    :param _str: string value to translate into boolean
 
    :rtype: boolean
 
    :returns: boolean from given string
 
    """
 
    if _str is None:
 
        return False
 
    if _str in (True, False):
 
        return _str
 
    _str = str(_str).strip().lower()
 
    return _str in ('t', 'true', 'y', 'yes', 'on', '1')
 

	
 

	
 
def aslist(obj, sep=None, strip=True):
 
    """
 
    Returns given string separated by sep as list
 

	
 
    :param obj:
 
    :param sep:
 
    :param strip:
 
    """
 
    if isinstance(obj, (basestring)):
 
        lst = obj.split(sep)
 
        if strip:
 
            lst = [v.strip() for v in lst]
 
        return lst
 
    elif isinstance(obj, (list, tuple)):
 
        return obj
 
    elif obj is None:
 
        return []
 
    else:
 
        return [obj]
 

	
 

	
 
def convert_line_endings(line, mode):
 
    """
 
    Converts a given line  "line end" according to given mode
 

	
 
    Available modes are::
 
        0 - Unix
 
        1 - Mac
 
        2 - DOS
 

	
 
    :param line: given line to convert
 
    :param mode: mode to convert to
 
    :rtype: str
 
    :return: converted line according to mode
 
    """
 
    if mode == 0:
 
        line = line.replace('\r\n', '\n')
 
        line = line.replace('\r', '\n')
 
    elif mode == 1:
 
        line = line.replace('\r\n', '\r')
 
        line = line.replace('\n', '\r')
 
    elif mode == 2:
 
        line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
 
    return line
 

	
 

	
 
def detect_mode(line, default):
 
    """
 
    Detects line break for given line, if line break couldn't be found
 
    given default value is returned
 

	
 
    :param line: str line
 
    :param default: default
 
    :rtype: int
 
    :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
 
    """
 
    if line.endswith('\r\n'):
 
        return 2
 
    elif line.endswith('\n'):
 
        return 0
 
    elif line.endswith('\r'):
 
        return 1
 
    else:
 
        return default
 

	
 

	
 
def generate_api_key():
 
    """
 
    Generates a random (presumably unique) API key.
 

	
 
    This value is used in URLs and "Bearer" HTTP Authorization headers,
 
    which in practice means it should only contain URL-safe characters
 
    (RFC 3986):
 

	
 
        unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
 
    """
 
    # Hexadecimal certainly qualifies as URL-safe.
 
    return ascii_str(binascii.hexlify(os.urandom(20)))
 

	
 

	
 
def safe_int(val, default=None):
 
    """
 
    Returns int() of val if val is not convertable to int use default
 
    instead
 

	
 
    :param val:
 
    :param default:
 
    """
 
    try:
 
        val = int(val)
 
    except (ValueError, TypeError):
 
        val = default
 
    return val
 

	
 

	
 
def remove_suffix(s, suffix):
 
    if s.endswith(suffix):
 
        s = s[:-1 * len(suffix)]
 
    return s
 

	
 

	
 
def remove_prefix(s, prefix):
 
    if s.startswith(prefix):
 
        s = s[len(prefix):]
 
    return s
 

	
 

	
 
def age(prevdate, show_short_version=False, now=None):
 
    """
 
    turns a datetime into an age string.
 
    If show_short_version is True, then it will generate a not so accurate but shorter string,
 
    example: 2days ago, instead of 2 days and 23 hours ago.
 

	
 
    :param prevdate: datetime object
 
    :param show_short_version: if it should approximate the date and return a shorter string
 
    :rtype: unicode
 
    :returns: unicode words describing age
 
    """
 
    now = now or datetime.datetime.now()
 
    order = ['year', 'month', 'day', 'hour', 'minute', 'second']
 
    deltas = {}
 
    future = False
 

	
 
    if prevdate > now:
 
        now, prevdate = prevdate, now
 
        future = True
 
    if future:
 
        prevdate = prevdate.replace(microsecond=0)
 
    # Get date parts deltas
 
    from dateutil import relativedelta
 
    for part in order:
 
        d = relativedelta.relativedelta(now, prevdate)
 
        deltas[part] = getattr(d, part + 's')
 

	
 
    # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
 
    # not 1 hour, -59 minutes and -59 seconds)
 
    for num, length in [(5, 60), (4, 60), (3, 24)]:  # seconds, minutes, hours
 
        part = order[num]
 
        carry_part = order[num - 1]
 

	
 
        if deltas[part] < 0:
 
            deltas[part] += length
 
            deltas[carry_part] -= 1
 

	
 
    # Same thing for days except that the increment depends on the (variable)
 
    # number of days in the month
 
    month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
 
    if deltas['day'] < 0:
 
        if prevdate.month == 2 and (prevdate.year % 4 == 0 and
 
            (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)
 
        ):
 
            deltas['day'] += 29
 
        else:
 
            deltas['day'] += month_lengths[prevdate.month - 1]
 

	
 
        deltas['month'] -= 1
 

	
 
    if deltas['month'] < 0:
 
        deltas['month'] += 12
 
        deltas['year'] -= 1
 

	
 
    # In short version, we want nicer handling of ages of more than a year
 
    if show_short_version:
 
        if deltas['year'] == 1:
 
            # ages between 1 and 2 years: show as months
 
            deltas['month'] += 12
 
            deltas['year'] = 0
 
        if deltas['year'] >= 2:
 
            # ages 2+ years: round
 
            if deltas['month'] > 6:
 
                deltas['year'] += 1
 
                deltas['month'] = 0
 

	
 
    # Format the result
 
    fmt_funcs = {
 
        'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
 
        'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
 
        'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
 
        'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
 
        'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
 
        'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
 
    }
 

	
 
    for i, part in enumerate(order):
 
        value = deltas[part]
 
        if value == 0:
 
            continue
 

	
 
        if i < 5:
 
            sub_part = order[i + 1]
 
            sub_value = deltas[sub_part]
 
        else:
 
            sub_value = 0
 

	
 
        if sub_value == 0 or show_short_version:
 
            if future:
 
                return _('in %s') % fmt_funcs[part](value)
 
            else:
 
                return _('%s ago') % fmt_funcs[part](value)
 
        if future:
 
            return _('in %s and %s') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 
        else:
 
            return _('%s and %s ago') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 

	
 
    return _('just now')
 

	
 

	
 
def uri_filter(uri):
 
    """
 
    Removes user:password from given url string
 

	
 
    :param uri:
 
    :rtype: unicode
 
    :returns: filtered list of strings
 
    """
 
    if not uri:
 
        return ''
 
        return []
 

	
 
    proto = ''
 

	
 
    for pat in ('https://', 'http://', 'git://'):
 
        if uri.startswith(pat):
 
            uri = uri[len(pat):]
 
            proto = pat
 
            break
 

	
 
    # remove passwords and username
 
    uri = uri[uri.find('@') + 1:]
 

	
 
    # get the port
 
    cred_pos = uri.find(':')
 
    if cred_pos == -1:
 
        host, port = uri, None
 
    else:
 
        host, port = uri[:cred_pos], uri[cred_pos + 1:]
 

	
 
    return [_f for _f in [proto, host, port] if _f]
 

	
 

	
 
def credentials_filter(uri):
 
    """
 
    Returns a url with removed credentials
 

	
 
    :param uri:
 
    """
 

	
 
    uri = uri_filter(uri)
 
    # check if we have port
 
    if len(uri) > 2 and uri[2]:
 
        uri[2] = ':' + uri[2]
 

	
 
    return ''.join(uri)
 

	
 

	
 
def get_clone_url(clone_uri_tmpl, prefix_url, repo_name, repo_id, username=None):
 
    parsed_url = urlobject.URLObject(prefix_url)
 
    prefix = safe_unicode(urllib.unquote(parsed_url.path.rstrip('/')))
 
    try:
 
        system_user = pwd.getpwuid(os.getuid()).pw_name
 
    except Exception: # TODO: support all systems - especially Windows
 
        system_user = 'kallithea' # hardcoded default value ...
 
    args = {
 
        'scheme': parsed_url.scheme,
 
        'user': safe_unicode(urllib.quote(safe_str(username or ''))),
 
        'netloc': parsed_url.netloc + prefix,  # like "hostname:port/prefix" (with optional ":port" and "/prefix")
 
        'prefix': prefix, # undocumented, empty or starting with /
 
        'repo': repo_name,
 
        'repoid': str(repo_id),
 
        'system_user': safe_unicode(system_user),
 
        'hostname': parsed_url.hostname,
 
    }
 
    url = re.sub('{([^{}]+)}', lambda m: args.get(m.group(1), m.group(0)), clone_uri_tmpl)
 

	
 
    # remove leading @ sign if it's present. Case of empty user
 
    url_obj = urlobject.URLObject(url)
 
    if not url_obj.username:
 
        url_obj = url_obj.with_username(None)
 

	
 
    return safe_unicode(url_obj)
 

	
 

	
 
def get_changeset_safe(repo, rev):
 
    """
 
    Safe version of get_changeset if this changeset doesn't exists for a
 
    repo it returns a Dummy one instead
 

	
 
    :param repo:
 
    :param rev:
 
    """
 
    from kallithea.lib.vcs.backends.base import BaseRepository
 
    from kallithea.lib.vcs.exceptions import RepositoryError
 
    from kallithea.lib.vcs.backends.base import EmptyChangeset
 
    if not isinstance(repo, BaseRepository):
 
        raise Exception('You must pass an Repository '
 
                        'object as first argument got %s' % type(repo))
 

	
 
    try:
 
        cs = repo.get_changeset(rev)
 
    except (RepositoryError, LookupError):
 
        cs = EmptyChangeset(requested_revision=rev)
 
    return cs
 

	
 

	
 
def datetime_to_time(dt):
 
    if dt:
 
        return time.mktime(dt.timetuple())
 

	
 

	
 
def time_to_datetime(tm):
 
    if tm:
 
        if isinstance(tm, basestring):
 
            try:
 
                tm = float(tm)
 
            except ValueError:
 
                return
 
        return datetime.datetime.fromtimestamp(tm)
 

	
 

	
 
# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
 
# Check char before @ - it must not look like we are in an email addresses.
 
# Matching is greedy so we don't have to look beyond the end.
 
MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')
 

	
 

	
 
def extract_mentioned_usernames(text):
 
    r"""
 
    Returns list of (possible) usernames @mentioned in given text.
 

	
 
    >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
 
    ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'gg', 'hh', 'zz']
 
    """
 
    return MENTIONS_REGEX.findall(text)
 

	
 

	
 
def extract_mentioned_users(text):
 
    """ Returns set of actual database Users @mentioned in given text. """
 
    from kallithea.model.db import User
 
    result = set()
 
    for name in extract_mentioned_usernames(text):
 
        user = User.get_by_username(name, case_insensitive=True)
 
        if user is not None and not user.is_default_user:
 
            result.add(user)
 
    return result
 

	
 

	
 
class AttributeDict(dict):
 
    def __getattr__(self, attr):
 
        return self.get(attr, None)
 
    __setattr__ = dict.__setitem__
 
    __delattr__ = dict.__delitem__
 

	
 

	
 
def obfuscate_url_pw(engine):
 
    from sqlalchemy.engine import url as sa_url
 
    from sqlalchemy.exc import ArgumentError
 
    try:
 
        _url = sa_url.make_url(engine or '')
 
    except ArgumentError:
 
        return engine
 
    if _url.password:
 
        _url.password = 'XXXXX'
 
    return str(_url)
 

	
 

	
 
class HookEnvironmentError(Exception): pass
 

	
 

	
 
def get_hook_environment():
 
    """
 
    Get hook context by deserializing the global KALLITHEA_EXTRAS environment
 
    variable.
 

	
 
    Called early in Git out-of-process hooks to get .ini config path so the
 
    basic environment can be configured properly. Also used in all hooks to get
 
    information about the action that triggered it.
 
    """
 

	
 
    try:
 
        kallithea_extras = os.environ['KALLITHEA_EXTRAS']
 
    except KeyError:
 
        raise HookEnvironmentError("Environment variable KALLITHEA_EXTRAS not found")
 

	
 
    extras = json.loads(kallithea_extras)
 
    try:
 
        for k in ['username', 'repository', 'scm', 'action', 'ip', 'config']:
 
    for k in ['username', 'repository', 'scm', 'action', 'ip', 'config']:
 
        try:
 
            extras[k]
 
    except KeyError:
 
        raise HookEnvironmentError('Missing key %s in KALLITHEA_EXTRAS %s' % (k, extras))
 
        except KeyError:
 
            raise HookEnvironmentError('Missing key %s in KALLITHEA_EXTRAS %s' % (k, extras))
 

	
 
    return AttributeDict(extras)
 

	
 

	
 
def set_hook_environment(username, ip_addr, repo_name, repo_alias, action=None):
 
    """Prepare global context for running hooks by serializing data in the
 
    global KALLITHEA_EXTRAS environment variable.
 

	
 
    Most importantly, this allow Git hooks to do proper logging and updating of
 
    caches after pushes.
 

	
 
    Must always be called before anything with hooks are invoked.
 
    """
 
    from kallithea import CONFIG
 
    extras = {
 
        'ip': ip_addr, # used in log_push/pull_action action_logger
 
        'username': username,
 
        'action': action or 'push_local', # used in log_push_action_raw_ids action_logger
 
        'repository': repo_name,
 
        'scm': repo_alias, # used to pick hack in log_push_action_raw_ids
 
        'config': CONFIG['__file__'], # used by git hook to read config
 
    }
 
    os.environ['KALLITHEA_EXTRAS'] = json.dumps(extras)
 

	
 

	
 
def get_current_authuser():
 
    """
 
    Gets kallithea user from threadlocal tmpl_context variable if it's
 
    defined, else returns None.
 
    """
 
    from tg import tmpl_context
 
    if hasattr(tmpl_context, 'authuser'):
 
        return tmpl_context.authuser
 

	
 
    return None
 

	
 

	
 
class OptionalAttr(object):
 
    """
 
    Special Optional Option that defines other attribute. Example::
 

	
 
        def test(apiuser, userid=Optional(OAttr('apiuser')):
 
            user = Optional.extract(userid)
 
            # calls
 

	
 
    """
 

	
 
    def __init__(self, attr_name):
 
        self.attr_name = attr_name
 

	
 
    def __repr__(self):
 
        return '<OptionalAttr:%s>' % self.attr_name
 

	
 
    def __call__(self):
 
        return self
 

	
 

	
 
# alias
 
OAttr = OptionalAttr
 

	
 

	
 
class Optional(object):
 
    """
 
    Defines an optional parameter::
 

	
 
        param = param.getval() if isinstance(param, Optional) else param
 
        param = param() if isinstance(param, Optional) else param
 

	
 
    is equivalent of::
 

	
 
        param = Optional.extract(param)
 

	
 
    """
 

	
 
    def __init__(self, type_):
 
        self.type_ = type_
 

	
 
    def __repr__(self):
 
        return '<Optional:%s>' % self.type_.__repr__()
 

	
 
    def __call__(self):
 
        return self.getval()
 

	
 
    def getval(self):
 
        """
 
        returns value from this Optional instance
 
        """
 
        if isinstance(self.type_, OAttr):
 
            # use params name
 
            return self.type_.attr_name
 
        return self.type_
 

	
 
    @classmethod
 
    def extract(cls, val):
 
        """
 
        Extracts value from Optional() instance
 

	
 
        :param val:
 
        :return: original value if it's not Optional instance else
 
            value of instance
 
        """
 
        if isinstance(val, cls):
 
            return val.getval()
 
        return val
 

	
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', safe_str(s)).rstrip('_')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
 
    else:
 
        str_ = str_.replace(replace * 2, replace)
 
        return recursive_replace(str_, replace)
 

	
 

	
 
def repo_name_slug(value):
 
    """
 
    Return slug of name of repository
 
    This function is called on each creation/modification
 
    of repository to prevent bad names in repo
 
    """
 

	
 
    slug = remove_formatting(value)
 
    slug = strip_tags(slug)
 

	
 
    for c in r"""`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
 
        slug = slug.replace(c, '-')
 
    slug = recursive_replace(slug, '-')
 
    slug = collapse(slug, '-')
 
    return slug
 

	
 

	
 
def ask_ok(prompt, retries=4, complaint='Yes or no please!'):
 
    while True:
 
        ok = raw_input(prompt)
 
        if ok in ('y', 'ye', 'yes'):
 
            return True
 
        if ok in ('n', 'no', 'nop', 'nope'):
 
            return False
 
        retries = retries - 1
 
        if retries < 0:
 
            raise IOError
 
        print(complaint)
kallithea/lib/vcs/backends/git/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import os
 

	
 
from kallithea.lib.hooks import log_pull_action
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.backends.ssh import BaseSshHandler
 
from kallithea.lib.vcs.utils import safe_str, safe_unicode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitSshHandler(BaseSshHandler):
 
    vcs_type = 'git'
 

	
 
    @classmethod
 
    def make(cls, ssh_command_parts):
 
        r"""
 
        >>> import shlex
 

	
 
        >>> GitSshHandler.make(shlex.split("git-upload-pack '/foo bar'")).repo_name
 
        u'foo bar'
 
        >>> GitSshHandler.make(shlex.split("git-upload-pack '/foo bar'")).verb
 
        'git-upload-pack'
 
        >>> GitSshHandler.make(shlex.split(" git-upload-pack /blåbærgrød ")).repo_name # might not be necessary to support no quoting ... but we can
 
        u'bl\xe5b\xe6rgr\xf8d'
 
        >>> GitSshHandler.make(shlex.split('''git-upload-pack "/foo'bar"''')).repo_name
 
        u"foo'bar"
 
        >>> GitSshHandler.make(shlex.split("git-receive-pack '/foo'")).repo_name
 
        u'foo'
 
        >>> GitSshHandler.make(shlex.split("git-receive-pack '/foo'")).verb
 
        'git-receive-pack'
 

	
 
        >>> GitSshHandler.make(shlex.split("/bin/git-upload-pack '/foo'")) # ssh-serve will report 'SSH command %r is not supported'
 
        >>> GitSshHandler.make(shlex.split('''git-upload-pack /foo bar''')) # ssh-serve will report 'SSH command %r is not supported'
 
        >>> shlex.split("git-upload-pack '/foo'bar' x") # ssh-serve will report: Error parsing SSH command "...": No closing quotation
 
        Traceback (most recent call last):
 
        ValueError: No closing quotation
 
        >>> GitSshHandler.make(shlex.split('hg -R foo serve --stdio')) # not handled here
 
        """
 
        if (len(ssh_command_parts) == 2 and
 
            ssh_command_parts[0] in ['git-upload-pack', 'git-receive-pack'] and
 
            ssh_command_parts[1].startswith('/')
 
        ):
 
            return cls(safe_unicode(ssh_command_parts[1][1:]), ssh_command_parts[0])
 

	
 
        return None
 

	
 
    def __init__(self, repo_name, verb):
 
        self.repo_name = repo_name
 
        BaseSshHandler.__init__(self, repo_name)
 
        self.verb = verb
 

	
 
    def _serve(self):
 
        if self.verb == 'git-upload-pack': # action 'pull'
 
            # base class called set_hook_environment - action is hardcoded to 'pull'
 
            log_pull_action(ui=make_ui(), repo=self.db_repo.scm_instance._repo)
 
        else: # probably verb 'git-receive-pack', action 'push'
 
            if not self.allow_push:
 
                self.exit('Push access to %r denied' % safe_str(self.repo_name))
 
            # Note: push logging is handled by Git post-receive hook
 

	
 
        # git shell is not a real shell but use shell inspired quoting *inside* the argument.
 
        # Per https://github.com/git/git/blob/v2.22.0/quote.c#L12 :
 
        # The path must be "'" quoted, but "'" and "!" must exit the quoting and be "\" escaped
 
        quoted_abspath = "'%s'" % self.db_repo.repo_full_path.replace("'", r"'\''").replace("!", r"'\!'")
 
        newcmd = ['git', 'shell', '-c', "%s %s" % (self.verb, quoted_abspath)]
 
        log.debug('Serving: %s', newcmd)
 
        os.execvp(newcmd[0], newcmd)
 
        self.exit("Failed to exec 'git' as %s" % newcmd)
kallithea/lib/vcs/backends/hg/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 

	
 
import mercurial.hg
 
import mercurial.wireprotoserver
 

	
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.backends.ssh import BaseSshHandler
 
from kallithea.lib.vcs.utils import safe_bytes, safe_unicode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialSshHandler(BaseSshHandler):
 
    vcs_type = 'hg'
 

	
 
    @classmethod
 
    def make(cls, ssh_command_parts):
 
        r"""
 
        >>> import shlex
 

	
 
        >>> MercurialSshHandler.make(shlex.split('hg -R "foo bar" serve --stdio')).repo_name
 
        u'foo bar'
 
        >>> MercurialSshHandler.make(shlex.split(' hg -R blåbærgrød serve --stdio ')).repo_name
 
        u'bl\xe5b\xe6rgr\xf8d'
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R 'foo"bar' serve --stdio''')).repo_name
 
        u'foo"bar'
 

	
 
        >>> MercurialSshHandler.make(shlex.split('/bin/hg -R "foo" serve --stdio'))
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R "foo"bar" serve --stdio''')) # ssh-serve will report: Error parsing SSH command "...": invalid syntax
 
        Traceback (most recent call last):
 
        ValueError: No closing quotation
 
        >>> MercurialSshHandler.make(shlex.split('git-upload-pack "/foo"')) # not handled here
 
        """
 
        if ssh_command_parts[:2] == ['hg', '-R'] and ssh_command_parts[3:] == ['serve', '--stdio']:
 
            return cls(safe_unicode(ssh_command_parts[2]))
 

	
 
        return None
 

	
 
    def __init__(self, repo_name):
 
        self.repo_name = repo_name
 

	
 
    def _serve(self):
 
        # Note: we want a repo with config based on .hg/hgrc and can thus not use self.db_repo.scm_instance._repo.ui
 
        baseui = make_ui(repo_path=self.db_repo.repo_full_path)
 
        if not self.allow_push:
 
            baseui.setconfig(b'hooks', b'pretxnopen._ssh_reject', b'python:kallithea.lib.hooks.rejectpush')
 
            baseui.setconfig(b'hooks', b'prepushkey._ssh_reject', b'python:kallithea.lib.hooks.rejectpush')
 

	
 
        repo = mercurial.hg.repository(baseui, safe_bytes(self.db_repo.repo_full_path))
 
        log.debug("Starting Mercurial sshserver for %s", self.db_repo.repo_full_path)
 
        mercurial.wireprotoserver.sshserver(baseui, repo).serve_forever()
kallithea/lib/vcs/backends/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
vcs.backends.ssh
 
~~~~~~~~~~~~~~~~~
 

	
 
SSH backend for all available SCMs
 
"""
 

	
 
import datetime
 
import logging
 
import sys
 

	
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyMiddleware
 
from kallithea.lib.utils2 import set_hook_environment
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.model.db import Repository, User, UserSshKeys
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class BaseSshHandler(object):
 
    # Protocol for setting properties:
 
    # Set by sub class:
 
    #   vcs_type: 'hg' or 'git'
 
    # Set by make() / __init__():
 
    #   repo_name: requested repo name - only validated by serve()
 
    # Set by serve() - must not be accessed before:
 
    #   db_repo: repository db object
 
    #   authuser: user that has been authenticated - like request.authuser ... which isn't used here
 
    #   allow_push: false for read-only access to the repo
 

	
 
    # Set defaults, in case .exit should be called early
 
    vcs_type = None
 
    repo_name = None
 

	
 
    @staticmethod
 
    def make(ssh_command):
 
        """Factory function. Given a command as invoked over SSH (and preserved
 
        in SSH_ORIGINAL_COMMAND when run as authorized_keys command), return a
 
        handler if the command looks ok, else return None.
 
        """
 
        raise NotImplementedError
 

	
 
    def __init__(self, repo_name):
 
        self.repo_name = repo_name.rstrip('/')
 

	
 
    def serve(self, user_id, key_id, client_ip):
 
        """Verify basic sanity of the repository, and that the user is
 
        valid and has access - then serve the native VCS protocol for
 
        repository access."""
 
        dbuser = User.get(user_id)
 
        if dbuser is None:
 
            self.exit('User %r not found' % user_id)
 
        self.authuser = AuthUser.make(dbuser=dbuser, ip_addr=client_ip)
 
        log.info('Authorized user %s from SSH %s trusting user id %s and key id %s for %r', dbuser, client_ip, user_id, key_id, self.repo_name)
 
        if self.authuser is None: # not ok ... but already kind of authenticated by SSH ... but not really not authorized ...
 
            self.exit('User %s from %s cannot be authorized' % (dbuser.username, client_ip))
 

	
 
        ssh_key = UserSshKeys.get(key_id)
 
        if ssh_key is None:
 
            self.exit('SSH key %r not found' % key_id)
 
        ssh_key.last_seen = datetime.datetime.now()
 
        Session().commit()
 

	
 
        if HasPermissionAnyMiddleware('repository.write',
 
                                      'repository.admin')(self.authuser, self.repo_name):
 
            self.allow_push = True
 
        elif HasPermissionAnyMiddleware('repository.read')(self.authuser, self.repo_name):
 
            self.allow_push = False
 
        else:
 
            self.exit('Access to %r denied' % safe_str(self.repo_name))
 

	
 
        self.db_repo = Repository.get_by_repo_name(self.repo_name)
 
        if self.db_repo is None:
 
            self.exit("Repository '%s' not found" % self.repo_name)
 
        assert self.db_repo.repo_name == self.repo_name
 

	
 
        # Set global hook environment up for 'push' actions.
 
        # If pull actions should be served, the actual hook invocation will be
 
        # hardcoded to 'pull' when log_pull_action is invoked (directly on Git,
 
        # or through the Mercurial 'outgoing' hook).
 
        # For push actions, the action in global hook environment is used (in
 
        # handle_git_post_receive when it is called as Git post-receive hook,
 
        # or in log_push_action through the Mercurial 'changegroup' hook).
 
        set_hook_environment(self.authuser.username, client_ip, self.repo_name, self.vcs_type, 'push')
 
        return self._serve()
 

	
 
    def _serve(self):
 
        """Serve the native protocol for repository access."""
 
        raise NotImplementedError
 

	
 
    def exit(self, error):
 
        log.info('abort serving %s %s: %s', self.vcs_type, self.repo_name, error)
 
        sys.stderr.write('abort: %s\n' % error)
 
        sys.exit(1)
kallithea/lib/vcs/nodes.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    vcs.nodes
 
    ~~~~~~~~~
 

	
 
    Module holding everything related to vcs nodes.
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import functools
 
import mimetypes
 
import posixpath
 
import stat
 

	
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.exceptions import NodeError, RemovedFileNodeError
 
from kallithea.lib.vcs.utils import safe_bytes, safe_str, safe_unicode
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
class NodeKind:
 
    SUBMODULE = -1
 
    DIR = 1
 
    FILE = 2
 

	
 

	
 
class NodeState:
 
    ADDED = u'added'
 
    CHANGED = u'changed'
 
    NOT_CHANGED = u'not changed'
 
    REMOVED = u'removed'
 

	
 

	
 
class NodeGeneratorBase(object):
 
    """
 
    Base class for removed added and changed filenodes, it's a lazy generator
 
    class that will create filenodes only on iteration or call
 

	
 
    The len method doesn't need to create filenodes at all
 
    """
 

	
 
    def __init__(self, current_paths, cs):
 
        self.cs = cs
 
        self.current_paths = current_paths
 

	
 
    def __getitem__(self, key):
 
        assert isinstance(key, slice), key
 
        for p in self.current_paths[key]:
 
            yield self.cs.get_node(p)
 

	
 
    def __len__(self):
 
        return len(self.current_paths)
 

	
 
    def __iter__(self):
 
        for p in self.current_paths:
 
            yield self.cs.get_node(p)
 

	
 

	
 
class AddedFileNodesGenerator(NodeGeneratorBase):
 
    """
 
    Class holding Added files for current changeset
 
    """
 
    pass
 

	
 

	
 
class ChangedFileNodesGenerator(NodeGeneratorBase):
 
    """
 
    Class holding Changed files for current changeset
 
    """
 
    pass
 

	
 

	
 
class RemovedFileNodesGenerator(NodeGeneratorBase):
 
    """
 
    Class holding removed files for current changeset
 
    """
 
    def __iter__(self):
 
        for p in self.current_paths:
 
            yield RemovedFileNode(path=p)
 

	
 
    def __getitem__(self, key):
 
        assert isinstance(key, slice), key
 
        for p in self.current_paths[key]:
 
            yield RemovedFileNode(path=p)
 

	
 

	
 
@functools.total_ordering
 
class Node(object):
 
    """
 
    Simplest class representing file or directory on repository.  SCM backends
 
    should use ``FileNode`` and ``DirNode`` subclasses rather than ``Node``
 
    directly.
 

	
 
    Node's ``path`` cannot start with slash as we operate on *relative* paths
 
    only. Moreover, every single node is identified by the ``path`` attribute,
 
    so it cannot end with slash, too. Otherwise, path could lead to mistakes.
 
    """
 

	
 
    def __init__(self, path, kind):
 
        if path.startswith('/'):
 
            raise NodeError("Cannot initialize Node objects with slash at "
 
                            "the beginning as only relative paths are supported")
 
        self.path = safe_str(path.rstrip('/'))  # we store paths as str
 
        if path == '' and kind != NodeKind.DIR:
 
            raise NodeError("Only DirNode and its subclasses may be "
 
                            "initialized with empty path")
 
        self.kind = kind
 
        #self.dirs, self.files = [], []
 
        if self.is_root() and not self.is_dir():
 
            raise NodeError("Root node cannot be FILE kind")
 

	
 
    @LazyProperty
 
    def parent(self):
 
        parent_path = self.get_parent_path()
 
        if parent_path:
 
            if self.changeset:
 
                return self.changeset.get_node(parent_path)
 
            return DirNode(parent_path)
 
        return None
 

	
 
    @LazyProperty
 
    def name(self):
 
        """
 
        Returns name of the node so if its path
 
        then only last part is returned.
 
        """
 
        return safe_unicode(self.path.rstrip('/').split('/')[-1])
 

	
 
    def _get_kind(self):
 
        return self._kind
 

	
 
    def _set_kind(self, kind):
 
        if hasattr(self, '_kind'):
 
            raise NodeError("Cannot change node's kind")
 
        else:
 
            self._kind = kind
 
            # Post setter check (path's trailing slash)
 
            if self.path.endswith('/'):
 
                raise NodeError("Node's path cannot end with slash")
 

	
 
    kind = property(_get_kind, _set_kind)
 

	
 
    def __eq__(self, other):
 
        if type(self) is not type(other):
 
            return False
 
        if self._kind != other._kind:
 
            return False
 
        if self.path != other.path:
 
            return False
 
        if self.is_file():
 
            return self.content == other.content
 
        else:
 
            # For DirNode's check without entering each dir
 
            self_nodes_paths = list(sorted(n.path for n in self.nodes))
 
            other_nodes_paths = list(sorted(n.path for n in self.nodes))
 
            return self_nodes_paths == other_nodes_paths
 

	
 
    def __lt__(self, other):
 
        if self._kind < other._kind:
 
            return True
 
        if self._kind > other._kind:
 
            return False
 
        if self.path < other.path:
 
            return True
 
        if self.path > other.path:
 
            return False
 
        if self.is_file():
 
            return self.content < other.content
 
        else:
 
            # For DirNode's check without entering each dir
 
            self_nodes_paths = list(sorted(n.path for n in self.nodes))
 
            other_nodes_paths = list(sorted(n.path for n in self.nodes))
 
            return self_nodes_paths < other_nodes_paths
 

	
 
    def __repr__(self):
 
        return '<%s %r>' % (self.__class__.__name__, self.path)
 

	
 
    def get_parent_path(self):
 
        """
 
        Returns node's parent path or empty string if node is root.
 
        """
 
        if self.is_root():
 
            return ''
 
        return posixpath.dirname(self.path.rstrip('/')) + '/'
 

	
 
    def is_file(self):
 
        """
 
        Returns ``True`` if node's kind is ``NodeKind.FILE``, ``False``
 
        otherwise.
 
        """
 
        return self.kind == NodeKind.FILE
 

	
 
    def is_dir(self):
 
        """
 
        Returns ``True`` if node's kind is ``NodeKind.DIR``, ``False``
 
        otherwise.
 
        """
 
        return self.kind == NodeKind.DIR
 

	
 
    def is_root(self):
 
        """
 
        Returns ``True`` if node is a root node and ``False`` otherwise.
 
        """
 
        return self.kind == NodeKind.DIR and self.path == ''
 

	
 
    def is_submodule(self):
 
        """
 
        Returns ``True`` if node's kind is ``NodeKind.SUBMODULE``, ``False``
 
        otherwise.
 
        """
 
        return self.kind == NodeKind.SUBMODULE
 

	
 
    @LazyProperty
 
    def added(self):
 
        return self.state is NodeState.ADDED
 

	
 
    @LazyProperty
 
    def changed(self):
 
        return self.state is NodeState.CHANGED
 

	
 
    @LazyProperty
 
    def not_changed(self):
 
        return self.state is NodeState.NOT_CHANGED
 

	
 
    @LazyProperty
 
    def removed(self):
 
        return self.state is NodeState.REMOVED
 

	
 

	
 
class FileNode(Node):
 
    """
 
    Class representing file nodes.
 

	
 
    :attribute: path: path to the node, relative to repository's root
 
    :attribute: content: if given arbitrary sets content of the file
 
    :attribute: changeset: if given, first time content is accessed, callback
 
    :attribute: mode: octal stat mode for a node. Default is 0100644.
 
    """
 

	
 
    def __init__(self, path, content=None, changeset=None, mode=None):
 
        """
 
        Only one of ``content`` and ``changeset`` may be given. Passing both
 
        would raise ``NodeError`` exception.
 

	
 
        :param path: relative path to the node
 
        :param content: content may be passed to constructor
 
        :param changeset: if given, will use it to lazily fetch content
 
        :param mode: octal representation of ST_MODE (i.e. 0100644)
 
        """
 

	
 
        if content and changeset:
 
            raise NodeError("Cannot use both content and changeset")
 
        super(FileNode, self).__init__(path, kind=NodeKind.FILE)
 
        self.changeset = changeset
 
        if not isinstance(content, bytes) and content is not None:
 
            # File content is one thing that inherently must be bytes ... but
 
            # VCS module tries to be "user friendly" and support unicode ...
 
            content = safe_bytes(content)
 
        self._content = content
 
        self._mode = mode or 0o100644
 

	
 
    @LazyProperty
 
    def mode(self):
 
        """
 
        Returns lazily mode of the FileNode. If ``changeset`` is not set, would
 
        use value given at initialization or 0100644 (default).
 
        """
 
        if self.changeset:
 
            mode = self.changeset.get_file_mode(self.path)
 
        else:
 
            mode = self._mode
 
        return mode
 

	
 
    @property
 
    def content(self):
 
        """
 
        Returns lazily byte content of the FileNode.
 
        """
 
        if self.changeset:
 
            content = self.changeset.get_file_content(self.path)
 
        else:
 
            content = self._content
 
        return content
 

	
 
    @LazyProperty
 
    def size(self):
 
        if self.changeset:
 
            return self.changeset.get_file_size(self.path)
 
        raise NodeError("Cannot retrieve size of the file without related "
 
            "changeset attribute")
 

	
 
    @LazyProperty
 
    def message(self):
 
        if self.changeset:
 
            return self.last_changeset.message
 
        raise NodeError("Cannot retrieve message of the file without related "
 
            "changeset attribute")
 

	
 
    @LazyProperty
 
    def last_changeset(self):
 
        if self.changeset:
 
            return self.changeset.get_file_changeset(self.path)
 
        raise NodeError("Cannot retrieve last changeset of the file without "
 
            "related changeset attribute")
 

	
 
    def get_mimetype(self):
 
        """
 
        Mimetype is calculated based on the file's content.
 
        """
 

	
 
        mtype, encoding = mimetypes.guess_type(self.name)
 

	
 
        if mtype is None:
 
            if self.is_binary:
 
                mtype = 'application/octet-stream'
 
                encoding = None
 
            else:
 
                mtype = 'text/plain'
 
                encoding = None
 

	
 
                # try with pygments
 
                from pygments import lexers
 
                try:
 
                    from pygments import lexers
 
                    mt = lexers.get_lexer_for_filename(self.name).mimetypes
 
                except lexers.ClassNotFound:
 
                    mt = None
 

	
 
                if mt:
 
                    mtype = mt[0]
 

	
 
        return mtype, encoding
 

	
 
    @LazyProperty
 
    def mimetype(self):
 
        """
 
        Wrapper around full mimetype info. It returns only type of fetched
 
        mimetype without the encoding part. use get_mimetype function to fetch
 
        full set of (type,encoding)
 
        """
 
        return self.get_mimetype()[0]
 

	
 
    @LazyProperty
 
    def mimetype_main(self):
 
        return self.mimetype.split('/')[0]
 

	
 
    @LazyProperty
 
    def lexer(self):
 
        """
 
        Returns pygment's lexer class. Would try to guess lexer taking file's
 
        content, name and mimetype.
 
        """
 
        from pygments import lexers
 
        try:
 
            lexer = lexers.guess_lexer_for_filename(self.name, safe_unicode(self.content), stripnl=False)
 
        except lexers.ClassNotFound:
 
            lexer = lexers.TextLexer(stripnl=False)
 
        # returns first alias
 
        return lexer
 

	
 
    @LazyProperty
 
    def lexer_alias(self):
 
        """
 
        Returns first alias of the lexer guessed for this file.
 
        """
 
        return self.lexer.aliases[0]
 

	
 
    @LazyProperty
 
    def history(self):
 
        """
 
        Returns a list of changeset for this file in which the file was changed
 
        """
 
        if self.changeset is None:
 
            raise NodeError('Unable to get changeset for this FileNode')
 
        return self.changeset.get_file_history(self.path)
 

	
 
    @LazyProperty
 
    def annotate(self):
 
        """
 
        Returns a list of three element tuples with lineno,changeset and line
 
        """
 
        if self.changeset is None:
 
            raise NodeError('Unable to get changeset for this FileNode')
 
        return self.changeset.get_file_annotate(self.path)
 

	
 
    @LazyProperty
 
    def state(self):
 
        if not self.changeset:
 
            raise NodeError("Cannot check state of the node if it's not "
 
                "linked with changeset")
 
        elif self.path in (node.path for node in self.changeset.added):
 
            return NodeState.ADDED
 
        elif self.path in (node.path for node in self.changeset.changed):
 
            return NodeState.CHANGED
 
        else:
 
            return NodeState.NOT_CHANGED
 

	
 
    @property
 
    def is_binary(self):
 
        """
 
        Returns True if file has binary content.
 
        """
 
        return b'\0' in self.content
 

	
 
    def is_browser_compatible_image(self):
 
        return self.mimetype in [
 
            "image/gif",
 
            "image/jpeg",
 
            "image/png",
 
            "image/bmp"
 
        ]
 

	
 
    @LazyProperty
 
    def extension(self):
 
        """Returns filenode extension"""
 
        return self.name.split('.')[-1]
 

	
 
    @property
 
    def is_executable(self):
 
        """
 
        Returns ``True`` if file has executable flag turned on.
 
        """
 
        return bool(self.mode & stat.S_IXUSR)
 

	
 
    def __repr__(self):
 
        return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
 
                                 getattr(self.changeset, 'short_id', ''))
 

	
 

	
 
class RemovedFileNode(FileNode):
 
    """
 
    Dummy FileNode class - trying to access any public attribute except path,
 
    name, kind or state (or methods/attributes checking those two) would raise
 
    RemovedFileNodeError.
 
    """
 
    ALLOWED_ATTRIBUTES = [
 
        'name', 'path', 'state', 'is_root', 'is_file', 'is_dir', 'kind',
 
        'added', 'changed', 'not_changed', 'removed'
 
    ]
 

	
 
    def __init__(self, path):
 
        """
 
        :param path: relative path to the node
 
        """
 
        super(RemovedFileNode, self).__init__(path=path)
 

	
 
    def __getattribute__(self, attr):
 
        if attr.startswith('_') or attr in RemovedFileNode.ALLOWED_ATTRIBUTES:
 
            return super(RemovedFileNode, self).__getattribute__(attr)
 
        raise RemovedFileNodeError("Cannot access attribute %s on "
 
            "RemovedFileNode" % attr)
 

	
 
    @LazyProperty
 
    def state(self):
 
        return NodeState.REMOVED
 

	
 

	
 
class DirNode(Node):
 
    """
 
    DirNode stores list of files and directories within this node.
 
    Nodes may be used standalone but within repository context they
 
    lazily fetch data within same repository's changeset.
 
    """
 

	
 
    def __init__(self, path, nodes=(), changeset=None):
 
        """
 
        Only one of ``nodes`` and ``changeset`` may be given. Passing both
 
        would raise ``NodeError`` exception.
 

	
 
        :param path: relative path to the node
 
        :param nodes: content may be passed to constructor
 
        :param changeset: if given, will use it to lazily fetch content
 
        :param size: always 0 for ``DirNode``
 
        """
 
        if nodes and changeset:
 
            raise NodeError("Cannot use both nodes and changeset")
 
        super(DirNode, self).__init__(path, NodeKind.DIR)
 
        self.changeset = changeset
 
        self._nodes = nodes
 

	
 
    @LazyProperty
 
    def content(self):
 
        raise NodeError("%s represents a dir and has no ``content`` attribute"
 
            % self)
 

	
 
    @LazyProperty
 
    def nodes(self):
 
        if self.changeset:
 
            nodes = self.changeset.get_nodes(self.path)
 
        else:
 
            nodes = self._nodes
 
        self._nodes_dict = dict((node.path, node) for node in nodes)
 
        return sorted(nodes)
 

	
 
    @LazyProperty
 
    def files(self):
 
        return sorted((node for node in self.nodes if node.is_file()))
 

	
 
    @LazyProperty
 
    def dirs(self):
 
        return sorted((node for node in self.nodes if node.is_dir()))
 

	
 
    def __iter__(self):
 
        for node in self.nodes:
 
            yield node
 

	
 
    def get_node(self, path):
 
        """
 
        Returns node from within this particular ``DirNode``, so it is now
 
        allowed to fetch, i.e. node located at 'docs/api/index.rst' from node
 
        'docs'. In order to access deeper nodes one must fetch nodes between
 
        them first - this would work::
 

	
 
           docs = root.get_node('docs')
 
           docs.get_node('api').get_node('index.rst')
 

	
 
        :param: path - relative to the current node
 

	
 
        .. note::
 
           To access lazily (as in example above) node have to be initialized
 
           with related changeset object - without it node is out of
 
           context and may know nothing about anything else than nearest
 
           (located at same level) nodes.
 
        """
 
        try:
 
            path = path.rstrip('/')
 
            if path == '':
 
                raise NodeError("Cannot retrieve node without path")
 
            self.nodes  # access nodes first in order to set _nodes_dict
 
            paths = path.split('/')
 
            if len(paths) == 1:
 
                if not self.is_root():
 
                    path = '/'.join((self.path, paths[0]))
 
                else:
 
                    path = paths[0]
 
                return self._nodes_dict[path]
 
            elif len(paths) > 1:
 
                if self.changeset is None:
 
                    raise NodeError("Cannot access deeper "
 
                                    "nodes without changeset")
 
                else:
 
                    path1, path2 = paths[0], '/'.join(paths[1:])
 
                    return self.get_node(path1).get_node(path2)
 
            else:
 
                raise KeyError
 
        except KeyError:
 
            raise NodeError("Node does not exist at %s" % path)
 

	
 
    @LazyProperty
 
    def state(self):
 
        raise NodeError("Cannot access state of DirNode")
 

	
 
    @LazyProperty
 
    def size(self):
 
        size = 0
 
        for root, dirs, files in self.changeset.walk(self.path):
 
            for f in files:
 
                size += f.size
 

	
 
        return size
 

	
 
    def __repr__(self):
 
        return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
 
                                 getattr(self.changeset, 'short_id', ''))
 

	
 

	
 
class RootNode(DirNode):
 
    """
 
    DirNode being the root node of the repository.
 
    """
 

	
 
    def __init__(self, nodes=(), changeset=None):
 
        super(RootNode, self).__init__(path='', nodes=nodes,
 
            changeset=changeset)
 

	
 
    def __repr__(self):
 
        return '<%s>' % self.__class__.__name__
 

	
 

	
 
class SubModuleNode(Node):
 
    """
 
    represents a SubModule of Git or SubRepo of Mercurial
 
    """
 
    is_binary = False
 
    size = 0
 

	
 
    def __init__(self, name, url, changeset=None, alias=None):
 
        # Note: Doesn't call Node.__init__!
 
        self.path = name
 
        self.kind = NodeKind.SUBMODULE
 
        self.alias = alias
 
        # we have to use emptyChangeset here since this can point to svn/git/hg
 
        # submodules we cannot get from repository
 
        self.changeset = EmptyChangeset(str(changeset), alias=alias)
 
        self.url = url
 

	
 
    def __repr__(self):
 
        return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
 
                                 getattr(self.changeset, 'short_id', ''))
 

	
 
    @LazyProperty
 
    def name(self):
 
        """
 
        Returns name of the node so if its path
 
        then only last part is returned.
 
        """
 
        org = safe_unicode(self.path.rstrip('/').split('/')[-1])
 
        return u'%s @ %s' % (org, self.changeset.short_id)
kallithea/templates/about.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
<%inherit file="/base/base.html"/>
 
<%block name="title">
 
    ${_('About')}
 
</%block>
 
<%block name="header_menu">
 
    ${self.menu('about')}
 
</%block>
 
<%def name="main()">
 

	
 
<div class="panel panel-primary">
 
  <div class="panel-heading">
 
    <h5 class="panel-title">${_('About')} Kallithea</h5>
 
  </div>
 

	
 
  <div class="panel-body panel-about">
 
  <p><a href="https://kallithea-scm.org/">Kallithea</a> is a project of the
 
  <a href="http://sfconservancy.org/">Software Freedom Conservancy, Inc.</a>
 
  and is released under the terms of the
 
  <a href="http://www.gnu.org/copyleft/gpl.html">GNU General Public License,
 
  v 3.0 (GPLv3)</a>.</p>
 

	
 
  <p>Kallithea is copyrighted by various authors, including but not
 
  necessarily limited to the following:</p>
 
  <ul>
 

	
 
  <li>Copyright &copy; 2012&ndash;2020, Mads Kiilerich</li>
 
  <li>Copyright &copy; 2014&ndash;2020, Thomas De Schampheleire</li>
 
  <li>Copyright &copy; 2020, Dennis Fink</li>
 
  <li>Copyright &copy; 2012, 2014&ndash;2017, 2019, Andrej Shadura</li>
 
  <li>Copyright &copy; 2015&ndash;2017, 2019, Étienne Gilli</li>
 
  <li>Copyright &copy; 2017&ndash;2019, Allan Nordhøy</li>
 
  <li>Copyright &copy; 2018&ndash;2019, ssantos</li>
 
  <li>Copyright &copy; 2019, Adi Kriegisch</li>
 
  <li>Copyright &copy; 2019, Danni Randeris</li>
 
  <li>Copyright &copy; 2019, Edmund Wong</li>
 
  <li>Copyright &copy; 2019, Elizabeth Sherrock</li>
 
  <li>Copyright &copy; 2019, Hüseyin Tunç</li>
 
  <li>Copyright &copy; 2019, leela</li>
 
  <li>Copyright &copy; 2019, Manuel Jacob</li>
 
  <li>Copyright &copy; 2019, Mateusz Mendel</li>
 
  <li>Copyright &copy; 2019, Nathan</li>
 
  <li>Copyright &copy; 2019, Oleksandr Shtalinberg</li>
 
  <li>Copyright &copy; 2019, Private</li>
 
  <li>Copyright &copy; 2019, THANOS SIOURDAKIS</li>
 
  <li>Copyright &copy; 2019, Wolfgang Scherer</li>
 
  <li>Copyright &copy; 2019, Христо Станев</li>
 
  <li>Copyright &copy; 2012, 2014&ndash;2018, Dominik Ruf</li>
 
  <li>Copyright &copy; 2014&ndash;2015, 2018, Michal Čihař</li>
 
  <li>Copyright &copy; 2015, 2018, Branko Majic</li>
 
  <li>Copyright &copy; 2018, Chris Rule</li>
 
  <li>Copyright &copy; 2018, Jesús Sánchez</li>
 
  <li>Copyright &copy; 2018, Patrick Vane</li>
 
  <li>Copyright &copy; 2018, Pheng Heong Tan</li>
 
  <li>Copyright &copy; 2018, Максим Якимчук</li>
 
  <li>Copyright &copy; 2018, Марс Ямбар</li>
 
  <li>Copyright &copy; 2012&ndash;2017, Unity Technologies</li>
 
  <li>Copyright &copy; 2015&ndash;2017, Søren Løvborg</li>
 
  <li>Copyright &copy; 2015, 2017, Sam Jaques</li>
 
  <li>Copyright &copy; 2016&ndash;2017, Asterios Dimitriou</li>
 
  <li>Copyright &copy; 2017, Alessandro Molina</li>
 
  <li>Copyright &copy; 2017, Anton Schur</li>
 
  <li>Copyright &copy; 2017, Ching-Chen Mao</li>
 
  <li>Copyright &copy; 2017, Eivind Tagseth</li>
 
  <li>Copyright &copy; 2017, FUJIWARA Katsunori</li>
 
  <li>Copyright &copy; 2017, Holger Schramm</li>
 
  <li>Copyright &copy; 2017, Karl Goetz</li>
 
  <li>Copyright &copy; 2017, Lars Kruse</li>
 
  <li>Copyright &copy; 2017, Marko Semet</li>
 
  <li>Copyright &copy; 2017, Viktar Vauchkevich</li>
 
  <li>Copyright &copy; 2012&ndash;2016, Takumi IINO</li>
 
  <li>Copyright &copy; 2015&ndash;2016, Jan Heylen</li>
 
  <li>Copyright &copy; 2015&ndash;2016, Robert Martinez</li>
 
  <li>Copyright &copy; 2015&ndash;2016, Robert Rauch</li>
 
  <li>Copyright &copy; 2016, Angel Ezquerra</li>
 
  <li>Copyright &copy; 2016, Anton Shestakov</li>
 
  <li>Copyright &copy; 2016, Brandon Jones</li>
 
  <li>Copyright &copy; 2016, Kateryna Musina</li>
 
  <li>Copyright &copy; 2016, Konstantin Veretennicov</li>
 
  <li>Copyright &copy; 2016, Oscar Curero</li>
 
  <li>Copyright &copy; 2016, Robert James Dennington</li>
 
  <li>Copyright &copy; 2016, timeless@gmail.com</li>
 
  <li>Copyright &copy; 2016, YFdyh000</li>
 
  <li>Copyright &copy; 2012&ndash;2013, 2015, Aras Pranckevičius</li>
 
  <li>Copyright &copy; 2014&ndash;2015, Bradley M. Kuhn</li>
 
  <li>Copyright &copy; 2014&ndash;2015, Christian Oyarzun</li>
 
  <li>Copyright &copy; 2014&ndash;2015, Joseph Rivera</li>
 
  <li>Copyright &copy; 2014&ndash;2015, Sean Farley</li>
 
  <li>Copyright &copy; 2015, Anatoly Bubenkov</li>
 
  <li>Copyright &copy; 2015, Andrew Bartlett</li>
 
  <li>Copyright &copy; 2015, Balázs Úr</li>
 
  <li>Copyright &copy; 2015, Ben Finney</li>
 
  <li>Copyright &copy; 2015, Daniel Hobley</li>
 
  <li>Copyright &copy; 2015, David Avigni</li>
 
  <li>Copyright &copy; 2015, Denis Blanchette</li>
 
  <li>Copyright &copy; 2015, duanhongyi</li>
 
  <li>Copyright &copy; 2015, EriCSN Chang</li>
 
  <li>Copyright &copy; 2015, Grzegorz Krason</li>
 
  <li>Copyright &copy; 2015, Jiří Suchan</li>
 
  <li>Copyright &copy; 2015, Kazunari Kobayashi</li>
 
  <li>Copyright &copy; 2015, Kevin Bullock</li>
 
  <li>Copyright &copy; 2015, kobanari</li>
 
  <li>Copyright &copy; 2015, Marc Abramowitz</li>
 
  <li>Copyright &copy; 2015, Marc Villetard</li>
 
  <li>Copyright &copy; 2015, Matthias Zilk</li>
 
  <li>Copyright &copy; 2015, Michael Pohl</li>
 
  <li>Copyright &copy; 2015, Michael V. DePalatis</li>
 
  <li>Copyright &copy; 2015, Morten Skaaning</li>
 
  <li>Copyright &copy; 2015, Nick High</li>
 
  <li>Copyright &copy; 2015, Niemand Jedermann</li>
 
  <li>Copyright &copy; 2015, Peter Vitt</li>
 
  <li>Copyright &copy; 2015, Ronny Pfannschmidt</li>
 
  <li>Copyright &copy; 2015, Tuux</li>
 
  <li>Copyright &copy; 2015, Viktar Palstsiuk</li>
 
  <li>Copyright &copy; 2014, Ante Ilic</li>
 
  <li>Copyright &copy; 2014, Calinou</li>
 
  <li>Copyright &copy; 2014, Daniel Anderson</li>
 
  <li>Copyright &copy; 2014, Henrik Stuart</li>
 
  <li>Copyright &copy; 2014, Ingo von Borstel</li>
 
  <li>Copyright &copy; 2014, invision70</li>
 
  <li>Copyright &copy; 2014, Jelmer Vernooij</li>
 
  <li>Copyright &copy; 2014, Jim Hague</li>
 
  <li>Copyright &copy; 2014, Matt Fellows</li>
 
  <li>Copyright &copy; 2014, Max Roman</li>
 
  <li>Copyright &copy; 2014, Na'Tosha Bard</li>
 
  <li>Copyright &copy; 2014, Rasmus Selsmark</li>
 
  <li>Copyright &copy; 2014, SkryabinD</li>
 
  <li>Copyright &copy; 2014, Tim Freund</li>
 
  <li>Copyright &copy; 2014, Travis Burtrum</li>
 
  <li>Copyright &copy; 2014, whosaysni</li>
 
  <li>Copyright &copy; 2014, Zoltan Gyarmati</li>
 
  <li>Copyright &copy; 2010&ndash;2013, Marcin Kuźmiński</li>
 
  <li>Copyright &copy; 2010&ndash;2013, RhodeCode GmbH</li>
 
  <li>Copyright &copy; 2011, 2013, Aparkar</li>
 
  <li>Copyright &copy; 2012&ndash;2013, Nemcio</li>
 
  <li>Copyright &copy; 2012&ndash;2013, xpol</li>
 
  <li>Copyright &copy; 2013, Andrey Mivrenik</li>
 
  <li>Copyright &copy; 2013, ArcheR</li>
 
  <li>Copyright &copy; 2013, Dennis Brakhane</li>
 
  <li>Copyright &copy; 2013, gnustavo</li>
 
  <li>Copyright &copy; 2013, Grzegorz Rożniecki</li>
 
  <li>Copyright &copy; 2013, Ilya Beda</li>
 
  <li>Copyright &copy; 2013, ivlevdenis</li>
 
  <li>Copyright &copy; 2013, Jonathan Sternberg</li>
 
  <li>Copyright &copy; 2013, Leonardo Carneiro</li>
 
  <li>Copyright &copy; 2013, Magnus Ericmats</li>
 
  <li>Copyright &copy; 2013, Martin Vium</li>
 
  <li>Copyright &copy; 2013, Mikhail Zholobov</li>
 
  <li>Copyright &copy; 2013, mokeev1995</li>
 
  <li>Copyright &copy; 2013, Ruslan Bekenev</li>
 
  <li>Copyright &copy; 2013, shirou - しろう</li>
 
  <li>Copyright &copy; 2013, Simon Lopez</li>
 
  <li>Copyright &copy; 2013, softforwinxp</li>
 
  <li>Copyright &copy; 2013, stephanj</li>
 
  <li>Copyright &copy; 2013, zhmylove</li>
 
  <li>Copyright &copy; 2013, こいんとす</li>
 
  <li>Copyright &copy; 2011&ndash;2012, Augusto Herrmann</li>
 
  <li>Copyright &copy; 2012, Dan Sheridan</li>
 
  <li>Copyright &copy; 2012, H Waldo G</li>
 
  <li>Copyright &copy; 2012, hppj</li>
 
  <li>Copyright &copy; 2012, Indra Talip</li>
 
  <li>Copyright &copy; 2012, mikespook</li>
 
  <li>Copyright &copy; 2012, nansenat16</li>
 
  <li>Copyright &copy; 2012, Philip Jameson</li>
 
  <li>Copyright &copy; 2012, Raoul Thill</li>
 
  <li>Copyright &copy; 2012, Tony Bussieres</li>
 
  <li>Copyright &copy; 2012, Vincent Duvert</li>
 
  <li>Copyright &copy; 2012, Vladislav Poluhin</li>
 
  <li>Copyright &copy; 2012, Zachary Auclair</li>
 
  <li>Copyright &copy; 2011, Ankit Solanki</li>
 
  <li>Copyright &copy; 2011, Dmitri Kuznetsov</li>
 
  <li>Copyright &copy; 2011, Jared Bunting</li>
 
  <li>Copyright &copy; 2011, Jason Harris</li>
 
  <li>Copyright &copy; 2011, Les Peabody</li>
 
  <li>Copyright &copy; 2011, Liad Shani</li>
 
  <li>Copyright &copy; 2011, Lorenzo M. Catucci</li>
 
  <li>Copyright &copy; 2011, Matt Zuba</li>
 
  <li>Copyright &copy; 2011, Nicolas VINOT</li>
 
  <li>Copyright &copy; 2011, Shawn K. O'Shea</li>
 
  <li>Copyright &copy; 2010, Łukasz Balcerzak</li>
 

	
 
## We did not list the following copyright holders, given that they appeared
 
## to use for-profit company affiliations in their contribution in the
 
## Mercurial log and therefore I didn't know if copyright was theirs or
 
## their company's.
 
## Copyright &copy; 2011 Thayne Harbaugh <thayne@fusionio.com>
 
## Copyright &copy; 2012 Dies Koper <diesk@fast.au.fujitsu.com>
 
## Copyright &copy; 2012 Erwin Kroon <e.kroon@smartmetersolutions.nl>
 
## Copyright &copy; 2012 Vincent Caron <vcaron@bearstech.com>
 
##
 
## These contributors' contributions may not be copyrightable:
 
## philip.j@hostdime.com in 2012
 
## Stefan Engel <mail@engel-stefan.de> in 2012
 
## Ton Plomp <tcplomp@gmail.com> in 2013
 
##
 
  </ul>
 

	
 
  <p>The above are the copyright holders who have submitted direct
 
  contributions to the Kallithea repository.</p>
 

	
 
  <p>In the <a href="https://kallithea-scm.org/repos/kallithea">Kallithea
 
  source code</a>, there is a
 
  <a href="https://kallithea-scm.org/repos/kallithea/files/tip/LICENSE.md">list
 
  of third-party libraries and code that Kallithea incorporates</a>.</p>
 

	
 
  <p>The front-end contains a <a href="${h.url('/LICENSES.txt')}">list of
 
  software that is used to build the front-end</a> but isn't distributed as a
 
  part of Kallithea.</p>
 

	
 
  </div>
 
</div>
 

	
 
</%def>
kallithea/tests/functional/test_login.py
Show inline comments
 
@@ -32,502 +32,503 @@ class TestLoginController(base.TestContr
 
    def test_login_admin_ok(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
 
                                  'password': base.TEST_USER_ADMIN_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, base.TEST_USER_ADMIN_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % base.HG_REPO)
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
 
                                  'password': base.TEST_USER_REGULAR_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, base.TEST_USER_REGULAR_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % base.HG_REPO)
 

	
 
    def test_login_regular_email_ok(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_EMAIL,
 
                                  'password': base.TEST_USER_REGULAR_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, base.TEST_USER_REGULAR_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % base.HG_REPO)
 

	
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(base.url(controller='login', action='index',
 
                                     came_from=test_came_from),
 
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
 
                                  'password': base.TEST_USER_ADMIN_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        assert response.status == '302 Found'
 
        response = response.follow()
 

	
 
        assert response.status == '200 OK'
 
        response.mustcontain('Users Administration')
 

	
 
    def test_login_do_not_remember(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
 
                                  'password': base.TEST_USER_REGULAR_PASS,
 
                                  'remember': False,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        assert 'Set-Cookie' in response.headers
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            assert not re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r has expiration date, but should be a session cookie' % cookie
 

	
 
    def test_login_remember(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
 
                                  'password': base.TEST_USER_REGULAR_PASS,
 
                                  'remember': True,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        assert 'Set-Cookie' in response.headers
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            assert re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r should have expiration date, but is a session cookie' % cookie
 

	
 
    def test_logout(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
 
                                  'password': base.TEST_USER_REGULAR_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        # Verify that a login session has been established.
 
        response = self.app.get(base.url(controller='login', action='index'))
 
        response = response.follow()
 
        assert 'authuser' in response.session
 

	
 
        response.click('Log Out')
 

	
 
        # Verify that the login session has been terminated.
 
        response = self.app.get(base.url(controller='login', action='index'))
 
        assert 'authuser' not in response.session
 

	
 
    @base.parametrize('url_came_from', [
 
          ('data:text/html,<script>window.alert("xss")</script>',),
 
          ('mailto:test@example.com',),
 
          ('file:///etc/passwd',),
 
          ('ftp://ftp.example.com',),
 
          ('http://other.example.com/bl%C3%A5b%C3%A6rgr%C3%B8d',),
 
          ('//evil.example.com/',),
 
          ('/\r\nX-Header-Injection: boo',),
 
          ('/invälid_url_bytes',),
 
          ('non-absolute-path',),
 
    ])
 
    def test_login_bad_came_froms(self, url_came_from):
 
        response = self.app.post(base.url(controller='login', action='index',
 
                                     came_from=url_came_from),
 
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
 
                                  'password': base.TEST_USER_ADMIN_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()},
 
                                 status=400)
 

	
 
    def test_login_short_password(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
 
                                  'password': 'as',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        assert response.status == '200 OK'
 

	
 
        response.mustcontain('Enter 3 characters or more')
 

	
 
    def test_login_wrong_username_password(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': 'error',
 
                                  'password': 'test12',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('Invalid username or password')
 

	
 
    def test_login_non_ascii(self):
 
        response = self.app.post(base.url(controller='login', action='index'),
 
                                 {'username': base.TEST_USER_REGULAR_LOGIN,
 
                                  'password': 'blåbærgrød',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('>Invalid username or password<')
 

	
 
    # verify that get arguments are correctly passed along login redirection
 

	
 
    @base.parametrize('args', [
 
        {'foo':'one', 'bar':'two'},
 
        {'blue': u'blå', 'green': u'grøn'},
 
    ])
 
    def test_redirection_to_login_form_preserves_get_args(self, args):
 
        with fixture.anon_access(False):
 
            response = self.app.get(base.url(controller='summary', action='index',
 
                                        repo_name=base.HG_REPO,
 
                                        **args))
 
            assert response.status == '302 Found'
 
            came_from = urlparse.parse_qs(urlparse.urlparse(response.location).query)['came_from'][0]
 
            came_from_qs = urlparse.parse_qsl(urlparse.urlparse(came_from).query)
 
            assert sorted(came_from_qs) == sorted((k, v.encode('utf-8')) for k, v in args.items())
 

	
 
    @base.parametrize('args,args_encoded', [
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå', 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_login_form_preserves_get_args(self, args, args_encoded):
 
        response = self.app.get(base.url(controller='login', action='index',
 
                                    came_from=base.url('/_admin/users', **args)))
 
        came_from = urlparse.parse_qs(urlparse.urlparse(response.form.action).query)['came_from'][0]
 
        for encoded in args_encoded:
 
            assert encoded in came_from
 

	
 
    @base.parametrize('args,args_encoded', [
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå', 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_redirection_after_successful_login_preserves_get_args(self, args, args_encoded):
 
        response = self.app.post(base.url(controller='login', action='index',
 
                                     came_from=base.url('/_admin/users', **args)),
 
                                 {'username': base.TEST_USER_ADMIN_LOGIN,
 
                                  'password': base.TEST_USER_ADMIN_PASS,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        assert response.status == '302 Found'
 
        for encoded in args_encoded:
 
            assert encoded in response.location
 

	
 
    @base.parametrize('args,args_encoded', [
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå', 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_login_form_after_incorrect_login_preserves_get_args(self, args, args_encoded):
 
        response = self.app.post(base.url(controller='login', action='index',
 
                                     came_from=base.url('/_admin/users', **args)),
 
                                 {'username': 'error',
 
                                  'password': 'test12',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('Invalid username or password')
 
        came_from = urlparse.parse_qs(urlparse.urlparse(response.form.action).query)['came_from'][0]
 
        for encoded in args_encoded:
 
            assert encoded in came_from
 

	
 
    #==========================================================================
 
    # REGISTRATIONS
 
    #==========================================================================
 
    def test_register(self):
 
        response = self.app.get(base.url(controller='login', action='register'))
 
        response.mustcontain('Sign Up')
 

	
 
    def test_register_err_same_username(self):
 
        uname = base.TEST_USER_ADMIN_LOGIN
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': uname,
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmail@example.com',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        with test_context(self.app):
 
            msg = validators.ValidUsername()._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': uname})
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_same_email(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': 'test_admin_0',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': base.TEST_USER_ADMIN_EMAIL,
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        with test_context(self.app):
 
            msg = validators.UniqSystemEmail()()._messages['email_taken']
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_same_email_case_sensitive(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': 'test_admin_1',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': base.TEST_USER_ADMIN_EMAIL.title(),
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        with test_context(self.app):
 
            msg = validators.UniqSystemEmail()()._messages['email_taken']
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_wrong_data(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': 'xs',
 
                                             'password': 'test',
 
                                             'password_confirmation': 'test',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        assert response.status == '200 OK'
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Enter a value 6 characters long or more')
 

	
 
    def test_register_err_username(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': 'error user',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Username may only contain '
 
                'alphanumeric characters underscores, '
 
                'periods or dashes and must begin with an '
 
                'alphanumeric character')
 

	
 
    def test_register_err_case_sensitive(self):
 
        usr = base.TEST_USER_ADMIN_LOGIN.title()
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': usr,
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        with test_context(self.app):
 
            msg = validators.ValidUsername()._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': usr})
 
        response.mustcontain(msg)
 

	
 
    def test_register_special_chars(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                        {'username': 'xxxaxn',
 
                                         'password': 'ąćźżąśśśś',
 
                                         'password_confirmation': 'ąćźżąśśśś',
 
                                         'email': 'goodmailm@test.plx',
 
                                         'firstname': 'test',
 
                                         'lastname': 'test',
 
                                         '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        with test_context(self.app):
 
            msg = validators.ValidPassword()._messages['invalid_password']
 
        response.mustcontain(msg)
 

	
 
    def test_register_password_mismatch(self):
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': 'xs',
 
                                             'password': '123qwe',
 
                                             'password_confirmation': 'qwe123',
 
                                             'email': 'goodmailm@test.plxa',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test',
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        with test_context(self.app):
 
            msg = validators.ValidPasswordsMatch('password', 'password_confirmation')._messages['password_mismatch']
 
        response.mustcontain(msg)
 

	
 
    def test_register_ok(self):
 
        username = 'test_regular4'
 
        password = 'qweqwe'
 
        email = 'user4@example.com'
 
        name = 'testname'
 
        lastname = 'testlastname'
 

	
 
        response = self.app.post(base.url(controller='login', action='register'),
 
                                            {'username': username,
 
                                             'password': password,
 
                                             'password_confirmation': password,
 
                                             'email': email,
 
                                             'firstname': name,
 
                                             'lastname': lastname,
 
                                             'admin': True,
 
                                             '_session_csrf_secret_token': self.session_csrf_secret_token()})  # This should be overridden
 
        assert response.status == '302 Found'
 
        self.checkSessionFlash(response, 'You have successfully registered with Kallithea')
 

	
 
        ret = Session().query(User).filter(User.username == 'test_regular4').one()
 
        assert ret.username == username
 
        assert check_password(password, ret.password) == True
 
        assert ret.email == email
 
        assert ret.name == name
 
        assert ret.lastname == lastname
 
        assert ret.api_key is not None
 
        assert ret.admin == False
 

	
 
    #==========================================================================
 
    # PASSWORD RESET
 
    #==========================================================================
 

	
 
    def test_forgot_password_wrong_mail(self):
 
        bad_email = 'username%wrongmail.org'
 
        response = self.app.post(
 
                        base.url(controller='login', action='password_reset'),
 
                            {'email': bad_email,
 
                             '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.mustcontain('An email address must contain a single @')
 

	
 
    def test_forgot_password(self):
 
        response = self.app.get(base.url(controller='login',
 
                                    action='password_reset'))
 
        assert response.status == '200 OK'
 

	
 
        username = 'test_password_reset_1'
 
        password = 'qweqwe'
 
        email = 'username@example.com'
 
        name = u'passwd'
 
        lastname = u'reset'
 
        timestamp = int(time.time())
 

	
 
        new = User()
 
        new.username = username
 
        new.password = password
 
        new.email = email
 
        new.name = name
 
        new.lastname = lastname
 
        new.api_key = generate_api_key()
 
        Session().add(new)
 
        Session().commit()
 

	
 
        token = UserModel().get_reset_password_token(
 
            User.get_by_username(username), timestamp, self.session_csrf_secret_token())
 

	
 
        collected = []
 
        def mock_send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
            collected.append((recipients, subject, body, html_body))
 

	
 
        with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', mock_send_email):
 
        with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', mock_send_email), \
 
                mock.patch.object(time, 'time', lambda: timestamp):
 
            response = self.app.post(base.url(controller='login',
 
                                         action='password_reset'),
 
                                     {'email': email,
 
                                      '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        self.checkSessionFlash(response, 'A password reset confirmation code has been sent')
 

	
 
        ((recipients, subject, body, html_body),) = collected
 
        assert recipients == ['username@example.com']
 
        assert subject == 'Password reset link'
 
        assert '\n%s\n' % token in body
 
        (confirmation_url,) = (line for line in body.splitlines() if line.startswith('http://'))
 
        assert ' href="%s"' % confirmation_url.replace('&', '&amp;').replace('@', '%40') in html_body
 

	
 
        d = urlparse.parse_qs(urlparse.urlparse(confirmation_url).query)
 
        assert d['token'] == [token]
 
        assert d['timestamp'] == [str(timestamp)]
 
        assert d['email'] == [email]
 

	
 
        response = response.follow()
 

	
 
        # BAD TOKEN
 

	
 
        bad_token = "bad"
 

	
 
        response = self.app.post(base.url(controller='login',
 
                                     action='password_reset_confirmation'),
 
                                 {'email': email,
 
                                  'timestamp': timestamp,
 
                                  'password': "p@ssw0rd",
 
                                  'password_confirm': "p@ssw0rd",
 
                                  'token': bad_token,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 })
 
        assert response.status == '200 OK'
 
        response.mustcontain('Invalid password reset token')
 

	
 
        # GOOD TOKEN
 

	
 
        response = self.app.get(confirmation_url)
 
        assert response.status == '200 OK'
 
        response.mustcontain("You are about to set a new password for the email address %s" % email)
 
        response.mustcontain('<form action="%s" method="post">' % base.url(controller='login', action='password_reset_confirmation'))
 
        response.mustcontain('value="%s"' % self.session_csrf_secret_token())
 
        response.mustcontain('value="%s"' % token)
 
        response.mustcontain('value="%s"' % timestamp)
 
        response.mustcontain('value="username@example.com"')
 

	
 
        # fake a submit of that form
 
        response = self.app.post(base.url(controller='login',
 
                                     action='password_reset_confirmation'),
 
                                 {'email': email,
 
                                  'timestamp': timestamp,
 
                                  'password': "p@ssw0rd",
 
                                  'password_confirm': "p@ssw0rd",
 
                                  'token': token,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 })
 
        assert response.status == '302 Found'
 
        self.checkSessionFlash(response, 'Successfully updated password')
 

	
 
        response = response.follow()
 

	
 
    #==========================================================================
 
    # API
 
    #==========================================================================
 

	
 
    def _api_key_test(self, api_key, status):
 
        """Verifies HTTP status code for accessing an auth-requiring page,
 
        using the given api_key URL parameter as well as using the API key
 
        with bearer authentication.
 

	
 
        If api_key is None, no api_key is passed at all. If api_key is True,
 
        a real, working API key is used.
 
        """
 
        with fixture.anon_access(False):
 
            if api_key is None:
 
                params = {}
 
                headers = {}
 
            else:
 
                if api_key is True:
 
                    api_key = User.get_first_admin().api_key
 
                params = {'api_key': api_key}
 
                headers = {'Authorization': 'Bearer ' + str(api_key)}
 

	
 
            self.app.get(base.url(controller='changeset', action='changeset_raw',
 
                             repo_name=base.HG_REPO, revision='tip', **params),
 
                         status=status)
 

	
 
            self.app.get(base.url(controller='changeset', action='changeset_raw',
 
                             repo_name=base.HG_REPO, revision='tip'),
 
                         headers=headers,
 
                         status=status)
 

	
 
    @base.parametrize('test_name,api_key,code', [
 
        ('none', None, 302),
 
        ('empty_string', '', 403),
 
        ('fake_number', '123456', 403),
 
        ('fake_not_alnum', 'a-z', 403),
 
        ('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 403),
 
        ('proper_api_key', True, 200)
 
    ])
 
    def test_access_page_via_api_key(self, test_name, api_key, code):
 
        self._api_key_test(api_key, code)
 

	
 
    def test_access_page_via_extra_api_key(self):
 
        new_api_key = ApiKeyModel().create(base.TEST_USER_ADMIN_LOGIN, u'test')
 
        Session().commit()
 
        self._api_key_test(new_api_key.api_key, status=200)
 

	
 
    def test_access_page_via_expired_api_key(self):
 
        new_api_key = ApiKeyModel().create(base.TEST_USER_ADMIN_LOGIN, u'test')
 
        Session().commit()
 
        # patch the API key and make it expired
 
        new_api_key.expires = 0
 
        Session().commit()
 
        self._api_key_test(new_api_key.api_key, status=403)
kallithea/tests/other/test_vcs_operations.py
Show inline comments
 
@@ -33,594 +33,620 @@ import re
 
import tempfile
 
import time
 
import urllib2
 
from subprocess import PIPE, Popen
 
from tempfile import _RandomNameSequence
 

	
 
import pytest
 

	
 
from kallithea import CONFIG
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.model.db import CacheInvalidation, Repository, Ui, User, UserIpMap, UserLog
 
from kallithea.model.meta import Session
 
from kallithea.model.ssh_key import SshKeyModel
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
DEBUG = True
 
HOST = '127.0.0.1:4999'  # test host
 

	
 
fixture = Fixture()
 

	
 

	
 
# Parameterize different kinds of VCS testing - both the kind of VCS and the
 
# access method (HTTP/SSH)
 

	
 
# Mixin for using HTTP and SSH URLs
 
class HttpVcsTest(object):
 
    @staticmethod
 
    def repo_url_param(webserver, repo_name, **kwargs):
 
        return webserver.repo_url(repo_name, **kwargs)
 

	
 
class SshVcsTest(object):
 
    public_keys = {
 
        base.TEST_USER_REGULAR_LOGIN: u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== kallithea@localhost',
 
        base.TEST_USER_ADMIN_LOGIN: u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUq== kallithea@localhost',
 
    }
 

	
 
    @classmethod
 
    def repo_url_param(cls, webserver, repo_name, username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS, client_ip=base.IP_ADDR):
 
        user = User.get_by_username(username)
 
        if user.ssh_keys:
 
            ssh_key = user.ssh_keys[0]
 
        else:
 
            sshkeymodel = SshKeyModel()
 
            ssh_key = sshkeymodel.create(user, u'test key', cls.public_keys[user.username])
 
            Session().commit()
 

	
 
        return cls._ssh_param(repo_name, user, ssh_key, client_ip)
 

	
 
# Mixins for using Mercurial and Git
 
class HgVcsTest(object):
 
    repo_type = 'hg'
 
    repo_name = base.HG_REPO
 

	
 
class GitVcsTest(object):
 
    repo_type = 'git'
 
    repo_name = base.GIT_REPO
 

	
 
# Combine mixins to give the combinations we want to parameterize tests with
 
class HgHttpVcsTest(HgVcsTest, HttpVcsTest):
 
    pass
 

	
 
class GitHttpVcsTest(GitVcsTest, HttpVcsTest):
 
    pass
 

	
 
class HgSshVcsTest(HgVcsTest, SshVcsTest):
 
    @staticmethod
 
    def _ssh_param(repo_name, user, ssh_key, client_ip):
 
        # Specify a custom ssh command on the command line
 
        return r"""--config ui.ssh="bash -c 'SSH_ORIGINAL_COMMAND=\"\$2\" SSH_CONNECTION=\"%s 1024 127.0.0.1 22\" kallithea-cli ssh-serve -c %s %s %s' --" ssh://someuser@somehost/%s""" % (
 
            client_ip,
 
            CONFIG['__file__'],
 
            user.user_id,
 
            ssh_key.user_ssh_key_id,
 
            repo_name)
 

	
 
class GitSshVcsTest(GitVcsTest, SshVcsTest):
 
    @staticmethod
 
    def _ssh_param(repo_name, user, ssh_key, client_ip):
 
        # Set a custom ssh command in the global environment
 
        os.environ['GIT_SSH_COMMAND'] = r"""bash -c 'SSH_ORIGINAL_COMMAND="$2" SSH_CONNECTION="%s 1024 127.0.0.1 22" kallithea-cli ssh-serve -c %s %s %s' --""" % (
 
            client_ip,
 
            CONFIG['__file__'],
 
            user.user_id,
 
            ssh_key.user_ssh_key_id)
 
        return "ssh://someuser@somehost/%s""" % repo_name
 

	
 
parametrize_vcs_test = base.parametrize('vt', [
 
    HgHttpVcsTest,
 
    GitHttpVcsTest,
 
    HgSshVcsTest,
 
    GitSshVcsTest,
 
])
 
parametrize_vcs_test_hg = base.parametrize('vt', [
 
    HgHttpVcsTest,
 
    HgSshVcsTest,
 
])
 
parametrize_vcs_test_http = base.parametrize('vt', [
 
    HgHttpVcsTest,
 
    GitHttpVcsTest,
 
])
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, *args, **environ):
 
        """
 
        Runs command on the system with given ``args`` using simple space
 
        join without safe quoting.
 
        """
 
        command = ' '.join(args)
 
        ignoreReturnCode = environ.pop('ignoreReturnCode', False)
 
        if DEBUG:
 
            print('*** CMD %s ***' % command)
 
        testenv = dict(os.environ)
 
        testenv['LANG'] = 'en_US.UTF-8'
 
        testenv['LANGUAGE'] = 'en_US:en'
 
        testenv['HGPLAIN'] = ''
 
        testenv['HGRCPATH'] = ''
 
        testenv.update(environ)
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            if stdout:
 
                print('stdout:', stdout)
 
            if stderr:
 
                print('stderr:', stderr)
 
        if not ignoreReturnCode:
 
            assert p.returncode == 0
 
        return stdout, stderr
 

	
 

	
 
def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
 
    return tempfile.mkdtemp(dir=base.TESTS_TMP_PATH, prefix=prefix, suffix=suffix)
 

	
 

	
 
def _add_files(vcs, dest_dir, files_no=3):
 
    """
 
    Generate some files, add it to dest_dir repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param dest_dir:
 
    """
 
    added_file = '%ssetup.py' % _RandomNameSequence().next()
 
    open(os.path.join(dest_dir, added_file), 'a').close()
 
    Command(dest_dir).execute(vcs, 'add', added_file)
 

	
 
    email = 'me@example.com'
 
    if os.name == 'nt':
 
        author_str = 'User <%s>' % email
 
    else:
 
        author_str = 'User ǝɯɐᴎ <%s>' % email
 
    for i in xrange(files_no):
 
        cmd = """echo "added_line%s" >> %s""" % (i, added_file)
 
        Command(dest_dir).execute(cmd)
 
        if vcs == 'hg':
 
            cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        elif vcs == 'git':
 
            cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        # git commit needs EMAIL on some machines
 
        Command(dest_dir).execute(cmd, EMAIL=email)
 

	
 
def _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=False, files_no=3):
 
    _add_files(vt.repo_type, dest_dir, files_no=files_no)
 
    # PUSH it back
 
    stdout = stderr = None
 
    if vt.repo_type == 'hg':
 
        stdout, stderr = Command(dest_dir).execute('hg push -f --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
 
    elif vt.repo_type == 'git':
 
        stdout, stderr = Command(dest_dir).execute('git push -f --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode)
 

	
 
    return stdout, stderr
 

	
 

	
 
def _check_outgoing(vcs, cwd, clone_url):
 
    if vcs == 'hg':
 
        # hg removes the password from default URLs, so we have to provide it here via the clone_url
 
        return Command(cwd).execute('hg -q outgoing', clone_url, ignoreReturnCode=True)
 
    elif vcs == 'git':
 
        Command(cwd).execute('git remote update')
 
        return Command(cwd).execute('git log origin/master..master')
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    user = User.get_default_user()
 
    user.active = enable
 
    Session().commit()
 
    if enable != User.get_default_user().active:
 
        raise Exception('Cannot set anonymous access')
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 

	
 

	
 
def _check_proper_git_push(stdout, stderr):
 
    assert 'fatal' not in stderr
 
    assert 'rejected' not in stderr
 
    assert 'Pushing to' in stderr
 
    assert 'master -> master' in stderr
 

	
 

	
 
@pytest.mark.usefixtures("test_context_fixture")
 
class TestVCSOperations(base.TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        # DISABLE ANONYMOUS ACCESS
 
        set_anonymous_access(False)
 

	
 
    @pytest.fixture()
 
    def testhook_cleanup(self):
 
        yield
 
        # remove hook
 
        for hook in ['prechangegroup', 'pretxnchangegroup', 'preoutgoing', 'changegroup', 'outgoing', 'incoming']:
 
            entry = Ui.get_by_key('hooks', '%s.testhook' % hook)
 
            if entry:
 
                Session().delete(entry)
 
        Session().commit()
 

	
 
    @pytest.fixture(scope="module")
 
    def testfork(self):
 
        # create fork so the repo stays untouched
 
        git_fork_name = u'%s_fork%s' % (base.GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(base.GIT_REPO, git_fork_name)
 
        hg_fork_name = u'%s_fork%s' % (base.HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(base.HG_REPO, hg_fork_name)
 
        return {'git': git_fork_name, 'hg': hg_fork_name}
 

	
 
    @parametrize_vcs_test
 
    def test_clone_repo_by_admin(self, webserver, vt):
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir())
 

	
 
        if vt.repo_type == 'git':
 
            assert 'Cloning into' in stdout + stderr
 
            assert stderr == '' or stdout == ''
 
        elif vt.repo_type == 'hg':
 
            assert 'requesting all changes' in stdout
 
            assert 'adding changesets' in stdout
 
            assert 'adding manifests' in stdout
 
            assert 'adding file changes' in stdout
 
            assert stderr == ''
 

	
 
    @parametrize_vcs_test_http
 
    def test_clone_wrong_credentials(self, webserver, vt):
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name, password='bad!')
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        if vt.repo_type == 'git':
 
            assert 'fatal: Authentication failed' in stderr
 
        elif vt.repo_type == 'hg':
 
            assert 'abort: authorization failed' in stderr
 

	
 
    def test_clone_git_dir_as_hg(self, webserver):
 
        clone_url = HgHttpVcsTest.repo_url_param(webserver, base.GIT_REPO)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'HTTP Error 404: Not Found' in stderr or "not a valid repository" in stdout and 'abort:' in stderr
 

	
 
    def test_clone_hg_repo_as_git(self, webserver):
 
        clone_url = GitHttpVcsTest.repo_url_param(webserver, base.HG_REPO)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'not found' in stderr
 

	
 
    @parametrize_vcs_test
 
    def test_clone_non_existing_path(self, webserver, vt):
 
        clone_url = vt.repo_url_param(webserver, 'trololo')
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        if vt.repo_type == 'git':
 
            assert 'not found' in stderr or 'abort: Access to %r denied' % 'trololo' in stderr
 
        elif vt.repo_type == 'hg':
 
            assert 'HTTP Error 404: Not Found' in stderr or 'abort: no suitable response from remote hg' in stderr and 'remote: abort: Access to %r denied' % 'trololo' in stdout
 

	
 
    @parametrize_vcs_test
 
    def test_push_new_repo(self, webserver, vt):
 
        # Clear the log so we know what is added
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
        # Create an empty server repo using the API
 
        repo_name = u'new_%s_%s' % (vt.repo_type, _RandomNameSequence().next())
 
        usr = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        params = {
 
            "id": 7,
 
            "api_key": usr.api_key,
 
            "method": 'create_repo',
 
            "args": dict(repo_name=repo_name,
 
                         owner=base.TEST_USER_ADMIN_LOGIN,
 
                         repo_type=vt.repo_type),
 
        }
 
        req = urllib2.Request(
 
            'http://%s:%s/_admin/api' % webserver.server_address,
 
            data=ascii_bytes(json.dumps(params)),
 
            headers={'content-type': 'application/json'})
 
        response = urllib2.urlopen(req)
 
        result = json.loads(response.read())
 
        # Expect something like:
 
        # {u'result': {u'msg': u'Created new repository `new_XXX`', u'task': None, u'success': True}, u'id': 7, u'error': None}
 
        assert result[u'result'][u'success']
 

	
 
        # Create local clone of the empty server repo
 
        local_clone_dir = _get_tmp_dir()
 
        clone_url = vt.repo_url_param(webserver, repo_name)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, local_clone_dir)
 

	
 
        # Make 3 commits and push to the empty server repo.
 
        # The server repo doesn't have any other heads than the
 
        # refs/heads/master we are pushing, but the `git log` in the push hook
 
        # should still list the 3 commits.
 
        stdout, stderr = _add_files_and_push(webserver, vt, local_clone_dir, clone_url=clone_url)
 
        if vt.repo_type == 'git':
 
            _check_proper_git_push(stdout, stderr)
 
        elif vt.repo_type == 'hg':
 
            assert 'pushing to ' in stdout
 
            assert 'remote: added ' in stdout
 

	
 
        # Verify that we got the right events in UserLog. Expect something like:
 
        # <UserLog('id:new_git_XXX:started_following_repo')>
 
        # <UserLog('id:new_git_XXX:user_created_repo')>
 
        # <UserLog('id:new_git_XXX:pull')>
 
        # <UserLog('id:new_git_XXX:push:aed9d4c1732a1927da3be42c47eb9afdc200d427,d38b083a07af10a9f44193486959a96a23db78da,4841ff9a2b385bec995f4679ef649adb3f437622')>
 
        action_parts = [ul.action.split(':', 1) for ul in UserLog.query().order_by(UserLog.user_log_id)]
 
        assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == ([
 
            (u'started_following_repo', 0),
 
            (u'user_created_repo', 0),
 
            (u'pull', 0),
 
            (u'push', 3)]
 
            if vt.repo_type == 'git' else [
 
            (u'started_following_repo', 0),
 
            (u'user_created_repo', 0),
 
            # (u'pull', 0), # Mercurial outgoing hook is not called for empty clones
 
            (u'push', 3)])
 

	
 
    @parametrize_vcs_test
 
    def test_push_new_file(self, webserver, testfork, vt):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
        dest_dir = _get_tmp_dir()
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
 

	
 
        clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type])
 
        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url=clone_url)
 

	
 
        if vt.repo_type == 'git':
 
            _check_proper_git_push(stdout, stderr)
 
        elif vt.repo_type == 'hg':
 
            assert 'pushing to' in stdout
 
            assert 'Repository size' in stdout
 
            assert 'Last revision is now' in stdout
 

	
 
        action_parts = [ul.action.split(':', 1) for ul in UserLog.query().order_by(UserLog.user_log_id)]
 
        assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \
 
            [(u'pull', 0), (u'push', 3)]
 

	
 
    @parametrize_vcs_test
 
    def test_pull(self, webserver, testfork, vt):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
        dest_dir = _get_tmp_dir()
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'init', dest_dir)
 

	
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
 
        stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url)
 

	
 
        if vt.repo_type == 'git':
 
            assert 'FETCH_HEAD' in stderr
 
        elif vt.repo_type == 'hg':
 
            assert 'new changesets' in stdout
 

	
 
        action_parts = [ul.action for ul in UserLog.query().order_by(UserLog.user_log_id)]
 
        assert action_parts == [u'pull']
 

	
 
        # Test handling of URLs with extra '/' around repo_name
 
        stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/./%s/' % vt.repo_name), ignoreReturnCode=True)
 
        if issubclass(vt, HttpVcsTest):
 
            if vt.repo_type == 'git':
 
                # NOTE: when pulling from http://hostname/./vcs_test_git/ , the git client will normalize that and issue an HTTP request to /vcs_test_git/info/refs
 
                assert 'Already up to date.' in stdout
 
            else:
 
                assert vt.repo_type == 'hg'
 
                assert "abort: HTTP Error 404: Not Found" in stderr
 
        else:
 
            assert issubclass(vt, SshVcsTest)
 
            if vt.repo_type == 'git':
 
                assert "abort: Access to './%s' denied" % vt.repo_name in stderr
 
            else:
 
                assert "abort: Access to './%s' denied" % vt.repo_name in stdout
 

	
 
        stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/%s/' % vt.repo_name), ignoreReturnCode=True)
 
        if vt.repo_type == 'git':
 
            assert 'Already up to date.' in stdout
 
        else:
 
            assert vt.repo_type == 'hg'
 
            assert "no changes found" in stdout
 
        assert "denied" not in stderr
 
        assert "denied" not in stdout
 
        assert "404" not in stdout
 

	
 
    @parametrize_vcs_test
 
    def test_push_invalidates_cache(self, webserver, testfork, vt):
 
        pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in Repository.query().filter(Repository.repo_name == testfork[vt.repo_type])]
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == testfork[vt.repo_type]).scalar()
 
        if not key:
 
            key = CacheInvalidation(testfork[vt.repo_type], testfork[vt.repo_type])
 
            Session().add(key)
 

	
 
        key.cache_active = True
 
        Session().commit()
 

	
 
        dest_dir = _get_tmp_dir()
 
        clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type])
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, files_no=1, clone_url=clone_url)
 

	
 
        if vt.repo_type == 'git':
 
            _check_proper_git_push(stdout, stderr)
 

	
 
        post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in Repository.query().filter(Repository.repo_name == testfork[vt.repo_type])]
 
        assert pre_cached_tip != post_cached_tip
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == testfork[vt.repo_type]).all()
 
        assert key == []
 

	
 
    @parametrize_vcs_test_http
 
    def test_push_wrong_credentials(self, webserver, vt):
 
        dest_dir = _get_tmp_dir()
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
 

	
 
        clone_url = webserver.repo_url(vt.repo_name, username='bad', password='name')
 
        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir,
 
                                             clone_url=clone_url, ignoreReturnCode=True)
 

	
 
        if vt.repo_type == 'git':
 
            assert 'fatal: Authentication failed' in stderr
 
        elif vt.repo_type == 'hg':
 
            assert 'abort: authorization failed' in stderr
 

	
 
    @parametrize_vcs_test
 
    def test_push_with_readonly_credentials(self, webserver, vt):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
        dest_dir = _get_tmp_dir()
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name, username=base.TEST_USER_REGULAR_LOGIN, password=base.TEST_USER_REGULAR_PASS)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, ignoreReturnCode=True, clone_url=clone_url)
 

	
 
        if vt.repo_type == 'git':
 
            assert 'The requested URL returned error: 403' in stderr or 'abort: Push access to %r denied' % str(vt.repo_name) in stderr
 
        elif vt.repo_type == 'hg':
 
            assert 'abort: HTTP Error 403: Forbidden' in stderr or 'abort: push failed on remote' in stderr and 'remote: Push access to %r denied' % str(vt.repo_name) in stdout
 

	
 
        action_parts = [ul.action.split(':', 1) for ul in UserLog.query().order_by(UserLog.user_log_id)]
 
        assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \
 
            [(u'pull', 0)]
 

	
 
    @parametrize_vcs_test
 
    def test_push_back_to_wrong_url(self, webserver, vt):
 
        dest_dir = _get_tmp_dir()
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(
 
            webserver, vt, dest_dir, clone_url='http://%s:%s/tmp' % (
 
                webserver.server_address[0], webserver.server_address[1]),
 
            ignoreReturnCode=True)
 

	
 
        if vt.repo_type == 'git':
 
            assert 'not found' in stderr
 
        elif vt.repo_type == 'hg':
 
            assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    @parametrize_vcs_test
 
    def test_ip_restriction(self, webserver, vt):
 
        user_model = UserModel()
 
        try:
 
            # Add IP constraint that excludes the test context:
 
            user_model.add_extra_ip(base.TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
 
            Session().commit()
 
            # IP permissions are cached, need to wait for the cache in the server process to expire
 
            time.sleep(1.5)
 
            clone_url = vt.repo_url_param(webserver, vt.repo_name)
 
            stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
            if vt.repo_type == 'git':
 
                # The message apparently changed in Git 1.8.3, so match it loosely.
 
                assert re.search(r'\b403\b', stderr) or 'abort: User test_admin from 127.0.0.127 cannot be authorized' in stderr
 
            elif vt.repo_type == 'hg':
 
                assert 'abort: HTTP Error 403: Forbidden' in stderr or 'remote: abort: User test_admin from 127.0.0.127 cannot be authorized' in stdout
 
        finally:
 
            # release IP restrictions
 
            for ip in UserIpMap.query():
 
                UserIpMap.delete(ip.ip_id)
 
            Session().commit()
 
            # IP permissions are cached, need to wait for the cache in the server process to expire
 
            time.sleep(1.5)
 

	
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir())
 

	
 
        if vt.repo_type == 'git':
 
            assert 'Cloning into' in stdout + stderr
 
            assert stderr == '' or stdout == ''
 
        elif vt.repo_type == 'hg':
 
            assert 'requesting all changes' in stdout
 
            assert 'adding changesets' in stdout
 
            assert 'adding manifests' in stdout
 
            assert 'adding file changes' in stdout
 

	
 
            assert stderr == ''
 

	
 
    @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks
 
    def test_custom_hooks_preoutgoing(self, testhook_cleanup, webserver, testfork, vt):
 
        # set prechangegroup to failing hook (returns True)
 
        Ui.create_or_update_hook('preoutgoing.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
 
        Session().commit()
 
        # clone repo
 
        clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS)
 
        dest_dir = _get_tmp_dir()
 
        stdout, stderr = Command(base.TESTS_TMP_PATH) \
 
            .execute(vt.repo_type, 'clone', clone_url, dest_dir, ignoreReturnCode=True)
 
        if vt.repo_type == 'hg':
 
            assert 'preoutgoing.testhook hook failed' in stdout
 
        elif vt.repo_type == 'git':
 
            assert 'error: 406' in stderr
 

	
 
    @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks
 
    def test_custom_hooks_prechangegroup(self, testhook_cleanup, webserver, testfork, vt):
 
        # set prechangegroup to failing hook (returns exit code 1)
 
        Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
 
        Session().commit()
 
        # clone repo
 
        clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS)
 
        dest_dir = _get_tmp_dir()
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url,
 
                                             ignoreReturnCode=True)
 
        assert 'failing_test_hook failed' in stdout + stderr
 
        assert 'Traceback' not in stdout + stderr
 
        assert 'prechangegroup.testhook hook failed' in stdout + stderr
 
        # there are still outgoing changesets
 
        stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
 
        assert stdout != ''
 

	
 
        # set prechangegroup hook to exception throwing method
 
        Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.exception_test_hook')
 
        Session().commit()
 
        # re-try to push
 
        stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True)
 
        if vt is HgHttpVcsTest:
 
            # like with 'hg serve...' 'HTTP Error 500: INTERNAL SERVER ERROR' should be returned
 
            assert 'HTTP Error 500: INTERNAL SERVER ERROR' in stderr
 
        elif vt is HgSshVcsTest:
 
            assert 'remote: Exception: exception_test_hook threw an exception' in stdout
 
        else: assert False
 
        # there are still outgoing changesets
 
        stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
 
        assert stdout != ''
 

	
 
        # set prechangegroup hook to method that returns False
 
        Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.passing_test_hook')
 
        Session().commit()
 
        # re-try to push
 
        stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True)
 
        assert 'passing_test_hook succeeded' in stdout + stderr
 
        assert 'Traceback' not in stdout + stderr
 
        assert 'prechangegroup.testhook hook failed' not in stdout + stderr
 
        # no more outgoing changesets
 
        stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
 
        assert stdout == ''
 
        assert stderr == ''
 

	
 
    def test_add_submodule_git(self, webserver, testfork):
 
        dest_dir = _get_tmp_dir()
 
        clone_url = GitHttpVcsTest.repo_url_param(webserver, base.GIT_REPO)
 

	
 
        fork_url = GitHttpVcsTest.repo_url_param(webserver, testfork['git'])
 

	
 
        # add submodule
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', fork_url, dest_dir)
 
        stdout, stderr = Command(dest_dir).execute('git submodule add', clone_url, 'testsubmodule')
 
        stdout, stderr = Command(dest_dir).execute('git commit -am "added testsubmodule pointing to', clone_url, '"', EMAIL=base.TEST_USER_ADMIN_EMAIL)
 
        stdout, stderr = Command(dest_dir).execute('git push', fork_url, 'master')
 

	
 
        # check for testsubmodule link in files page
 
        self.log_user()
 
        response = self.app.get(base.url(controller='files', action='index',
 
                                    repo_name=testfork['git'],
 
                                    revision='tip',
 
                                    f_path='/'))
 
        # check _repo_files_url that will be used to reload as AJAX
 
        response.mustcontain('var _repo_files_url = ("/%s/files/");' % testfork['git'])
 

	
 
        response.mustcontain('<a class="submodule-dir" href="%s" target="_blank"><i class="icon-file-submodule"></i><span>testsubmodule @ ' % clone_url)
 

	
 
        # check that following a submodule link actually works - and redirects
 
        response = self.app.get(base.url(controller='files', action='index',
 
                                    repo_name=testfork['git'],
 
                                    revision='tip',
 
                                    f_path='/testsubmodule'),
 
                                status=302)
 
        assert response.location == clone_url
scripts/make-release
Show inline comments
 
#!/bin/bash
 
set -e
 
set -x
 

	
 
cleanup()
 
{
 
  echo "Removing venv $venv"
 
  rm  -rf "$venv"
 
}
 

	
 
echo "Checking that you are NOT inside a virtualenv"
 
[ -z "$VIRTUAL_ENV" ]
 

	
 
venv=$(mktemp -d --tmpdir kallithea-release-XXXXX)
 
trap cleanup EXIT
 

	
 
echo "Setting up a fresh virtualenv in $venv"
 
virtualenv -p python2 "$venv"
 
. "$venv/bin/activate"
 

	
 
echo "Install/verify tools needed for building and uploading stuff"
 
pip install --upgrade -e . -r dev_requirements.txt twine python-ldap python-pam
 

	
 
echo "Cleanup and update copyrights ... and clean checkout"
 
scripts/run-all-cleanup
 
scripts/update-copyrights.py
 
hg up -cr .
 

	
 
echo "Make release build from clean checkout in build/"
 
rm -rf build dist
 
hg archive build
 
cd build
 

	
 
echo "Check that each entry in MANIFEST.in match something"
 
sed -e 's/[^ ]*[ ]*\([^ ]*\).*/\1/g' MANIFEST.in | xargs ls -lad
 

	
 
echo "Build dist"
 
python2 setup.py compile_catalog
 
python2 setup.py sdist
 

	
 
echo "Verify VERSION from kallithea/__init__.py"
 
namerel=$(cd dist && echo Kallithea-*.tar.gz)
 
namerel=${namerel%.tar.gz}
 
version=${namerel#Kallithea-}
 
ls -l $(pwd)/dist/$namerel.tar.gz
 
echo "Releasing Kallithea $version in directory $namerel"
 

	
 
echo "Verify dist file content"
 
diff -u <((hg mani | grep -v '^\.hg\|^kallithea/i18n/en/LC_MESSAGES/kallithea.mo$') | LANG=C sort) <(tar tf dist/Kallithea-$version.tar.gz | sed "s|^$namerel/||" | grep . | grep -v '^kallithea/i18n/.*/LC_MESSAGES/kallithea.mo$\|^Kallithea.egg-info/\|^PKG-INFO$\|/$' | LANG=C sort)
 

	
 
echo "Verify docs build"
 
python2 setup.py build_sphinx # the results are not actually used, but we want to make sure it builds
 

	
 
echo "Shortlog for inclusion in the release announcement"
 
scripts/shortlog.py "only('.', branch('stable') & tagged() & public() & not '.')"
 

	
 
cat - << EOT
 

	
 
Now, make sure
 
* all tests are passing
 
* release note is ready
 
* announcement is ready
 
* source has been pushed to https://kallithea-scm.org/repos/kallithea
 

	
 
EOT
 

	
 
echo "Verify current revision is tagged for $version"
 
hg log -r "'$version'&." | grep .
 

	
 
echo -n "Enter \"pypi\" to upload Kallithea $version to pypi: "
 
read answer
 
[ "$answer" = "pypi" ]
 

	
 
echo "Rebuild readthedocs for docs.kallithea-scm.org"
 
xdg-open https://readthedocs.org/projects/kallithea/
 
curl -X POST http://readthedocs.org/build/kallithea
 
xdg-open https://readthedocs.org/builds/kallithea/
 
xdg-open https://readthedocs.org/projects/kallithea/builds
 
xdg-open http://docs.kallithea-scm.org/en/latest/ # or whatever the branch is
 

	
 
twine upload dist/*
 
xdg-open https://pypi.python.org/pypi/Kallithea
0 comments (0 inline, 0 general)