Changeset - f629e9a0c376
[Not reviewed]
default
0 8 0
Andrew Shadura - 11 years ago 2015-05-17 02:07:18
andrew@shadura.me
auth: secure password reset implementation

This is a better implementation of password reset function, which
doesn't involve sending a new password to the user's email address
in clear text, and at the same time is stateless.

The old implementation generated a new password and sent it
in clear text to whatever email assigned to the user currently,
so that any user, possibly unauthenticated, could request a reset
for any username or email. Apart from potential insecurity, this
made it possible for anyone to disrupt users' workflow by repeatedly
resetting their passwords.

The idea behind this implementation is to generate
an authentication token which is dependent on the user state
at the time before the password change takes place, so the token
is one-time and can't be reused, and also to bind the token to
the browser session.

The token is calculated as SHA1 hash of the following:

* user's identifier (number, not a name)
* timestamp
* hashed user's password
* session identifier
* per-application secret

We use numeric user's identifier, as it's fixed and doesn't change,
so renaming users doesn't affect the mechanism. Timestamp is added
to make it possible to limit the token's validness (currently hard
coded to 24h), and we don't want users to be able to fake that field
easily. Hashed user's password is needed to prevent using the token
again once the password has been changed. Session identifier is
an additional security measure to ensure someone else stealing the
token can't use it. Finally, per-application secret is just another
way to make it harder for an attacker to guess all values in an
attempt to generate a valid token.

When the token is generated, an anonymous user is directed to a
confirmation page where the timestamp and the usernames are already
preloaded, so the user needs to specify the token. User can either
click the link in the email if it's really them reading it, or to type
the token manually.

Using the right token in the same session as it was requested directs
the user to a password change form, where the user is supposed to
specify a new password (twice, of course). Upon completing the form
(which is POSTed) the password change happens and a notification
mail is sent.

The test is updated to test the basic functionality with a bad and
a good token, but it doesn't (yet) cover all code paths.

The original work from Andrew has been thorougly reviewed and heavily
modified by Søren Løvborg.
8 files changed with 276 insertions and 56 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/login.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.login
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Login controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 22, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import formencode
 
import urlparse
 

	
 
from formencode import htmlfill
 
from webob.exc import HTTPFound
 
from webob.exc import HTTPFound, HTTPBadRequest
 
from pylons.i18n.translation import _
 
from pylons.controllers.util import redirect
 
from pylons import request, session, tmpl_context as c, url
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyDecorator
 
from kallithea.lib.base import BaseController, log_in_user, render
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.model.db import User, Setting
 
from kallithea.model.forms import LoginForm, RegisterForm, PasswordResetForm
 
from kallithea.model.forms import \
 
    LoginForm, RegisterForm, PasswordResetRequestForm, PasswordResetConfirmationForm
 
from kallithea.model.user import UserModel
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class LoginController(BaseController):
 

	
 
    def __before__(self):
 
        super(LoginController, self).__before__()
 

	
 
    def _validate_came_from(self, came_from):
 
        """Return True if came_from is valid and can and should be used"""
 
        if not came_from:
 
            return False
 

	
 
        parsed = urlparse.urlparse(came_from)
 
        server_parsed = urlparse.urlparse(url.current())
 
        allowed_schemes = ['http', 'https']
 
        if parsed.scheme and parsed.scheme not in allowed_schemes:
 
            log.error('Suspicious URL scheme detected %s for url %s',
 
                     parsed.scheme, parsed)
 
            return False
 
        if server_parsed.netloc != parsed.netloc:
 
            log.error('Suspicious NETLOC detected %s for url %s server url '
 
                      'is: %s' % (parsed.netloc, parsed, server_parsed))
 
            return False
 
        return True
 

	
 
    def _redirect_to_origin(self, origin):
 
        '''redirect to the original page, preserving any get arguments given'''
 
        request.GET.pop('came_from', None)
 
        raise HTTPFound(location=url(origin, **request.GET))
 

	
 
    def index(self):
 
        c.came_from = safe_str(request.GET.get('came_from', ''))
 
        if not self._validate_came_from(c.came_from):
 
            c.came_from = url('home')
 

	
 
        not_default = self.authuser.username != User.DEFAULT_USER
 
        ip_allowed = AuthUser.check_ip_allowed(self.authuser, self.ip_addr)
 

	
 
        # redirect if already logged in
 
        if self.authuser.is_authenticated and not_default and ip_allowed:
 
            return self._redirect_to_origin(c.came_from)
 

	
 
        if request.POST:
 
            # import Login Form validator class
 
            login_form = LoginForm()
 
            try:
 
                c.form_result = login_form.to_python(dict(request.POST))
 
                # form checks for username/password, now we're authenticated
 
                username = c.form_result['username']
 
                user = User.get_by_username(username, case_insensitive=True)
 
            except formencode.Invalid as errors:
 
                defaults = errors.value
 
                # remove password from filling in form again
 
                del defaults['password']
 
                return htmlfill.render(
 
                    render('/login.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 
            except UserCreationError as e:
 
                # container auth or other auth functions that create users on
 
                # the fly can throw this exception signaling that there's issue
 
                # with user creation, explanation should be provided in
 
                # Exception itself
 
                h.flash(e, 'error')
 
            else:
 
                log_in_user(user, c.form_result['remember'],
 
                    is_external_auth=False)
 
                return self._redirect_to_origin(c.came_from)
 

	
 
        return render('/login.html')
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.register.auto_activate',
 
                               'hg.register.manual_activate')
 
    def register(self):
 
        c.auto_active = 'hg.register.auto_activate' in User.get_default_user()\
 
            .AuthUser.permissions['global']
 

	
 
        settings = Setting.get_app_settings()
 
        captcha_private_key = settings.get('captcha_private_key')
 
        c.captcha_active = bool(captcha_private_key)
 
        c.captcha_public_key = settings.get('captcha_public_key')
 

	
 
        if request.POST:
 
            register_form = RegisterForm()()
 
            try:
 
                form_result = register_form.to_python(dict(request.POST))
 
                form_result['active'] = c.auto_active
 

	
 
                if c.captcha_active:
 
                    from kallithea.lib.recaptcha import submit
 
                    response = submit(request.POST.get('recaptcha_challenge_field'),
 
                                      request.POST.get('recaptcha_response_field'),
 
                                      private_key=captcha_private_key,
 
                                      remoteip=self.ip_addr)
 
                    if c.captcha_active and not response.is_valid:
 
                        _value = form_result
 
                        _msg = _('Bad captcha')
 
                        error_dict = {'recaptcha_field': _msg}
 
                        raise formencode.Invalid(_msg, _value, None,
 
                                                 error_dict=error_dict)
 

	
 
                UserModel().create_registration(form_result)
 
                h.flash(_('You have successfully registered into Kallithea'),
 
                        category='success')
 
                Session().commit()
 
                return redirect(url('login_home'))
 

	
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('/register.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 
            except UserCreationError as e:
 
                # container auth or other auth functions that create users on
 
                # the fly can throw this exception signaling that there's issue
 
                # with user creation, explanation should be provided in
 
                # Exception itself
 
                h.flash(e, 'error')
 

	
 
        return render('/register.html')
 

	
 
    def password_reset(self):
 
        settings = Setting.get_app_settings()
 
        captcha_private_key = settings.get('captcha_private_key')
 
        c.captcha_active = bool(captcha_private_key)
 
        c.captcha_public_key = settings.get('captcha_public_key')
 

	
 
        if request.POST:
 
            password_reset_form = PasswordResetForm()()
 
            password_reset_form = PasswordResetRequestForm()()
 
            try:
 
                form_result = password_reset_form.to_python(dict(request.POST))
 
                if c.captcha_active:
 
                    from kallithea.lib.recaptcha import submit
 
                    response = submit(request.POST.get('recaptcha_challenge_field'),
 
                                      request.POST.get('recaptcha_response_field'),
 
                                      private_key=captcha_private_key,
 
                                      remoteip=self.ip_addr)
 
                    if c.captcha_active and not response.is_valid:
 
                        _value = form_result
 
                        _msg = _('Bad captcha')
 
                        error_dict = {'recaptcha_field': _msg}
 
                        raise formencode.Invalid(_msg, _value, None,
 
                                                 error_dict=error_dict)
 
                UserModel().reset_password_link(form_result)
 
                h.flash(_('Your password reset link was sent'),
 
                redirect_link = UserModel().send_reset_password_email(form_result)
 
                h.flash(_('A password reset confirmation code has been sent'),
 
                            category='success')
 
                return redirect(url('login_home'))
 
                return redirect(redirect_link)
 

	
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('/password_reset.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 

	
 
        return render('/password_reset.html')
 

	
 
    def password_reset_confirmation(self):
 
        if request.GET and request.GET.get('key'):
 
        # This controller handles both GET and POST requests, though we
 
        # only ever perform the actual password change on POST (since
 
        # GET requests are not allowed to have side effects, and do not
 
        # receive automatic CSRF protection).
 

	
 
        # The template needs the email address outside of the form.
 
        c.email = request.params.get('email')
 

	
 
        if not request.POST:
 
            return htmlfill.render(
 
                render('/password_reset_confirmation.html'),
 
                defaults=dict(request.params),
 
                encoding='UTF-8')
 

	
 
        form = PasswordResetConfirmationForm()()
 
            try:
 
                user = User.get_by_api_key(request.GET.get('key'))
 
                data = dict(email=user.email)
 
                UserModel().reset_password(data)
 
                h.flash(_('Your password reset was successful, '
 
                          'new password has been sent to your email'),
 
                            category='success')
 
            except Exception as e:
 
                log.error(e)
 
                return redirect(url('reset_password'))
 
            form_result = form.to_python(dict(request.POST))
 
        except formencode.Invalid as errors:
 
            return htmlfill.render(
 
                render('/password_reset_confirmation.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding='UTF-8')
 

	
 
        if not UserModel().verify_reset_password_token(
 
            form_result['email'],
 
            form_result['timestamp'],
 
            form_result['token'],
 
        ):
 
            return htmlfill.render(
 
                render('/password_reset_confirmation.html'),
 
                defaults=form_result,
 
                errors={'token': _('Invalid password reset token')},
 
                prefix_error=False,
 
                encoding='UTF-8')
 

	
 
        UserModel().reset_password(form_result['email'], form_result['password'])
 
        h.flash(_('Successfully updated password'), category='success')
 
        return redirect(url('login_home'))
 

	
 
    def logout(self):
 
        session.delete()
 
        log.info('Logging out and deleting session for user')
 
        redirect(url('home'))
 

	
 
    def authentication_token(self):
 
        """Return the CSRF protection token for the session - just like it
 
        could have been screen scraped from a page with a form.
 
        Only intended for testing but might also be useful for other kinds
 
        of automation.
 
        """
 
        return h.authentication_token()
kallithea/model/forms.py
Show inline comments
 
@@ -112,199 +112,213 @@ def UserForm(edit=False, old_data={}):
 
            password_confirmation = All(
 
                v.ValidPassword(),
 
                v.UnicodeString(strip=False, min=6, not_empty=False)
 
            )
 
            chained_validators = [v.ValidPasswordsMatch('password',
 
                                                        'password_confirmation')]
 

	
 
        active = v.StringBoolean(if_missing=False)
 
        firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
 
        extern_name = v.UnicodeString(strip=True, if_missing=None)
 
        extern_type = v.UnicodeString(strip=True, if_missing=None)
 
    return _UserForm
 

	
 

	
 
def UserGroupForm(edit=False, old_data={}, available_members=[]):
 
    class _UserGroupForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        users_group_name = All(
 
            v.UnicodeString(strip=True, min=1, not_empty=True),
 
            v.ValidUserGroup(edit, old_data)
 
        )
 
        user_group_description = v.UnicodeString(strip=True, min=1,
 
                                                 not_empty=False)
 

	
 
        users_group_active = v.StringBoolean(if_missing=False)
 

	
 
        if edit:
 
            users_group_members = v.OneOf(
 
                available_members, hideList=False, testValueList=True,
 
                if_missing=None, not_empty=False
 
            )
 

	
 
    return _UserGroupForm
 

	
 

	
 
def RepoGroupForm(edit=False, old_data={}, repo_groups=[],
 
                   can_create_in_root=False):
 
    repo_group_ids = [rg[0] for rg in repo_groups]
 
    class _RepoGroupForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 

	
 
        group_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                         v.SlugifyName(),
 
                         v.ValidRegex(msg=_('Name must not contain only digits'))(r'(?!^\d+$)^.+$'))
 
        group_description = v.UnicodeString(strip=True, min=1,
 
                                            not_empty=False)
 
        group_copy_permissions = v.StringBoolean(if_missing=False)
 

	
 
        if edit:
 
            #FIXME: do a special check that we cannot move a group to one of
 
            #its children
 
            pass
 

	
 
        group_parent_id = All(v.CanCreateGroup(can_create_in_root),
 
                              v.OneOf(repo_group_ids, hideList=False,
 
                                      testValueList=True,
 
                                      if_missing=None, not_empty=True),
 
                              v.Int(min=-1, not_empty=True))
 
        enable_locking = v.StringBoolean(if_missing=False)
 
        chained_validators = [v.ValidRepoGroup(edit, old_data)]
 

	
 
    return _RepoGroupForm
 

	
 

	
 
def RegisterForm(edit=False, old_data={}):
 
    class _RegisterForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        username = All(
 
            v.ValidUsername(edit, old_data),
 
            v.UnicodeString(strip=True, min=1, not_empty=True)
 
        )
 
        password = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        password_confirmation = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        active = v.StringBoolean(if_missing=False)
 
        firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
 

	
 
        chained_validators = [v.ValidPasswordsMatch('password',
 
                                                    'password_confirmation')]
 

	
 
    return _RegisterForm
 

	
 

	
 
def PasswordResetForm():
 
    class _PasswordResetForm(formencode.Schema):
 
def PasswordResetRequestForm():
 
    class _PasswordResetRequestForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        email = v.Email(not_empty=True)
 
    return _PasswordResetForm
 
    return _PasswordResetRequestForm
 

	
 
def PasswordResetConfirmationForm():
 
    class _PasswordResetConfirmationForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        email = v.UnicodeString(strip=True, not_empty=True)
 
        timestamp = v.Number(strip=True, not_empty=True)
 
        token = v.UnicodeString(strip=True, not_empty=True)
 
        password = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
 
        password_confirm = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
 

	
 
        chained_validators = [v.ValidPasswordsMatch('password',
 
                                                    'password_confirm')]
 
    return _PasswordResetConfirmationForm
 

	
 
def RepoForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
             repo_groups=[], landing_revs=[]):
 
    repo_group_ids = [rg[0] for rg in repo_groups]
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        repo_group = All(v.CanWriteGroup(old_data),
 
                         v.OneOf(repo_group_ids, hideList=True),
 
                         v.Int(min=-1, not_empty=True))
 
        repo_type = v.OneOf(supported_backends, required=False,
 
                            if_missing=old_data.get('repo_type'))
 
        repo_description = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        repo_private = v.StringBoolean(if_missing=False)
 
        repo_landing_rev = v.OneOf(landing_revs, hideList=True)
 
        repo_copy_permissions = v.StringBoolean(if_missing=False)
 
        clone_uri = All(v.UnicodeString(strip=True, min=1, not_empty=False))
 

	
 
        repo_enable_statistics = v.StringBoolean(if_missing=False)
 
        repo_enable_downloads = v.StringBoolean(if_missing=False)
 
        repo_enable_locking = v.StringBoolean(if_missing=False)
 

	
 
        if edit:
 
            #this is repo owner
 
            user = All(v.UnicodeString(not_empty=True), v.ValidRepoUser())
 
            # Not a real field - just for reference for validation:
 
            # clone_uri_hidden = v.UnicodeString(if_missing='')
 

	
 
        chained_validators = [v.ValidCloneUri(),
 
                              v.ValidRepoName(edit, old_data)]
 
    return _RepoForm
 

	
 

	
 
def RepoPermsForm():
 
    class _RepoPermsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        chained_validators = [v.ValidPerms(type_='repo')]
 
    return _RepoPermsForm
 

	
 

	
 
def RepoGroupPermsForm(valid_recursive_choices):
 
    class _RepoGroupPermsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        recursive = v.OneOf(valid_recursive_choices)
 
        chained_validators = [v.ValidPerms(type_='repo_group')]
 
    return _RepoGroupPermsForm
 

	
 

	
 
def UserGroupPermsForm():
 
    class _UserPermsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        chained_validators = [v.ValidPerms(type_='user_group')]
 
    return _UserPermsForm
 

	
 

	
 
def RepoFieldForm():
 
    class _RepoFieldForm(formencode.Schema):
 
        filter_extra_fields = True
 
        allow_extra_fields = True
 

	
 
        new_field_key = All(v.FieldKey(),
 
                            v.UnicodeString(strip=True, min=3, not_empty=True))
 
        new_field_value = v.UnicodeString(not_empty=False, if_missing='')
 
        new_field_type = v.OneOf(['str', 'unicode', 'list', 'tuple'],
 
                                 if_missing='str')
 
        new_field_label = v.UnicodeString(not_empty=False)
 
        new_field_desc = v.UnicodeString(not_empty=False)
 

	
 
    return _RepoFieldForm
 

	
 

	
 
def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
                 repo_groups=[], landing_revs=[]):
 
    repo_group_ids = [rg[0] for rg in repo_groups]
 
    class _RepoForkForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        repo_group = All(v.CanWriteGroup(),
 
                         v.OneOf(repo_group_ids, hideList=True),
 
                         v.Int(min=-1, not_empty=True))
 
        repo_type = All(v.ValidForkType(old_data), v.OneOf(supported_backends))
 
        description = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        private = v.StringBoolean(if_missing=False)
 
        copy_permissions = v.StringBoolean(if_missing=False)
 
        update_after_clone = v.StringBoolean(if_missing=False)
 
        fork_parent_id = v.UnicodeString()
 
        chained_validators = [v.ValidForkName(edit, old_data)]
 
        landing_rev = v.OneOf(landing_revs, hideList=True)
 

	
kallithea/model/user.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.user
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
users model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import hashlib
 
import logging
 
import time
 
import traceback
 

	
 
from pylons import config
 
from pylons.i18n.translation import _
 

	
 
from sqlalchemy.exc import DatabaseError
 

	
 
from kallithea import EXTERN_TYPE_INTERNAL
 
from kallithea.lib.utils2 import safe_unicode, generate_api_key, get_current_authuser
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.model import BaseModel
 
from kallithea.model.db import User, UserToPerm, Notification, \
 
    UserEmailMap, UserIpMap
 
from kallithea.lib.exceptions import DefaultUserException, \
 
    UserOwnsReposException
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserModel(BaseModel):
 
    password_reset_token_lifetime = 86400 # 24 hours
 

	
 
    cls = User
 

	
 
    def get(self, user_id, cache=False):
 
        user = self.sa.query(User)
 
        if cache:
 
            user = user.options(FromCache("sql_cache_short",
 
                                          "get_user_%s" % user_id))
 
        return user.get(user_id)
 

	
 
    def get_user(self, user):
 
        return self._get_user(user)
 

	
 
    def create(self, form_data, cur_user=None):
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 

	
 
        from kallithea.lib.hooks import log_create_user, \
 
            check_allowed_create_user
 
        _fd = form_data
 
        user_data = {
 
            'username': _fd['username'],
 
            'password': _fd['password'],
 
            'email': _fd['email'],
 
            'firstname': _fd['firstname'],
 
            'lastname': _fd['lastname'],
 
            'active': _fd['active'],
 
            'admin': False
 
        }
 
        # raises UserCreationError if it's not allowed
 
        check_allowed_create_user(user_data, cur_user)
 
        from kallithea.lib.auth import get_crypt_password
 

	
 
        new_user = User()
 
        for k, v in form_data.items():
 
            if k == 'password':
 
                v = get_crypt_password(v)
 
            if k == 'firstname':
 
                k = 'name'
 
            setattr(new_user, k, v)
 

	
 
        new_user.api_key = generate_api_key()
 
        self.sa.add(new_user)
 

	
 
        log_create_user(new_user.get_dict(), cur_user)
 
        return new_user
 

	
 
    def create_or_update(self, username, password, email, firstname='',
 
                         lastname='', active=True, admin=False,
 
                         extern_type=None, extern_name=None, cur_user=None):
 
        """
 
        Creates a new instance if not found, or updates current one
 

	
 
        :param username:
 
        :param password:
 
        :param email:
 
        :param active:
 
        :param firstname:
 
        :param lastname:
 
        :param active:
 
        :param admin:
 
        :param extern_name:
 
        :param extern_type:
 
        :param cur_user:
 
        """
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 

	
 
        from kallithea.lib.auth import get_crypt_password, check_password
 
        from kallithea.lib.hooks import log_create_user, \
 
            check_allowed_create_user
 
        user_data = {
 
            'username': username, 'password': password,
 
            'email': email, 'firstname': firstname, 'lastname': lastname,
 
            'active': active, 'admin': admin
 
        }
 
        # raises UserCreationError if it's not allowed
 
        check_allowed_create_user(user_data, cur_user)
 

	
 
        log.debug('Checking for %s account in Kallithea database', username)
 
        user = User.get_by_username(username, case_insensitive=True)
 
        if user is None:
 
            log.debug('creating new user %s', username)
 
            new_user = User()
 
            edit = False
 
        else:
 
            log.debug('updating user %s', username)
 
            new_user = user
 
            edit = True
 

	
 
        try:
 
            new_user.username = username
 
            new_user.admin = admin
 
            new_user.email = email
 
            new_user.active = active
 
            new_user.extern_name = safe_unicode(extern_name) \
 
                if extern_name else None
 
@@ -178,240 +184,313 @@ class UserModel(BaseModel):
 
        form_data['extern_type'] = EXTERN_TYPE_INTERNAL
 
        new_user = self.create(form_data)
 

	
 
        self.sa.add(new_user)
 
        self.sa.flush()
 

	
 
        # notification to admins
 
        subject = _('New user registration')
 
        body = (
 
            'New user registration\n'
 
            '---------------------\n'
 
            '- Username: {user.username}\n'
 
            '- Full Name: {user.full_name}\n'
 
            '- Email: {user.email}\n'
 
            ).format(user=new_user)
 
        edit_url = h.canonical_url('edit_user', id=new_user.user_id)
 
        email_kwargs = {
 
            'registered_user_url': edit_url,
 
            'new_username': new_user.username}
 
        NotificationModel().create(created_by=new_user, subject=subject,
 
                                   body=body, recipients=None,
 
                                   type_=Notification.TYPE_REGISTRATION,
 
                                   email_kwargs=email_kwargs)
 

	
 
    def update(self, user_id, form_data, skip_attrs=[]):
 
        from kallithea.lib.auth import get_crypt_password
 

	
 
        user = self.get(user_id, cache=False)
 
        if user.username == User.DEFAULT_USER:
 
            raise DefaultUserException(
 
                            _("You can't edit this user since it's "
 
                              "crucial for entire application"))
 

	
 
        for k, v in form_data.items():
 
            if k in skip_attrs:
 
                continue
 
            if k == 'new_password' and v:
 
                user.password = get_crypt_password(v)
 
            else:
 
                # old legacy thing orm models store firstname as name,
 
                # need proper refactor to username
 
                if k == 'firstname':
 
                    k = 'name'
 
                setattr(user, k, v)
 
        self.sa.add(user)
 

	
 
    def update_user(self, user, **kwargs):
 
        from kallithea.lib.auth import get_crypt_password
 

	
 
        user = self._get_user(user)
 
        if user.username == User.DEFAULT_USER:
 
            raise DefaultUserException(
 
                _("You can't edit this user since it's"
 
                  " crucial for entire application")
 
            )
 

	
 
        for k, v in kwargs.items():
 
            if k == 'password' and v:
 
                v = get_crypt_password(v)
 

	
 
            setattr(user, k, v)
 
        self.sa.add(user)
 
        return user
 

	
 
    def delete(self, user, cur_user=None):
 
        if cur_user is None:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 
        user = self._get_user(user)
 

	
 
        if user.username == User.DEFAULT_USER:
 
            raise DefaultUserException(
 
                _("You can't remove this user since it is"
 
                  " crucial for the entire application"))
 
        if user.repositories:
 
            repos = [x.repo_name for x in user.repositories]
 
            raise UserOwnsReposException(
 
                _('User "%s" still owns %s repositories and cannot be '
 
                  'removed. Switch owners or remove those repositories: %s')
 
                % (user.username, len(repos), ', '.join(repos)))
 
        if user.repo_groups:
 
            repogroups = [x.group_name for x in user.repo_groups]
 
            raise UserOwnsReposException(_(
 
                'User "%s" still owns %s repository groups and cannot be '
 
                'removed. Switch owners or remove those repository groups: %s')
 
                % (user.username, len(repogroups), ', '.join(repogroups)))
 
        if user.user_groups:
 
            usergroups = [x.users_group_name for x in user.user_groups]
 
            raise UserOwnsReposException(
 
                _('User "%s" still owns %s user groups and cannot be '
 
                  'removed. Switch owners or remove those user groups: %s')
 
                % (user.username, len(usergroups), ', '.join(usergroups)))
 
        self.sa.delete(user)
 

	
 
        from kallithea.lib.hooks import log_delete_user
 
        log_delete_user(user.get_dict(), cur_user)
 

	
 
    def reset_password_link(self, data):
 
    def get_reset_password_token(self, user, timestamp, session_id):
 
        """
 
        The token is calculated as SHA1 hash of the following:
 

	
 
         * user's identifier (number, not a name)
 
         * timestamp
 
         * hashed user's password
 
         * session identifier
 
         * per-application secret
 

	
 
        We use numeric user's identifier, as it's fixed and doesn't change,
 
        so renaming users doesn't affect the mechanism. Timestamp is added
 
        to make it possible to limit the token's validness (currently hard
 
        coded to 24h), and we don't want users to be able to fake that field
 
        easily. Hashed user's password is needed to prevent using the token
 
        again once the password has been changed. Session identifier is
 
        an additional security measure to ensure someone else stealing the
 
        token can't use it. Finally, per-application secret is just another
 
        way to make it harder for an attacker to guess all values in an
 
        attempt to generate a valid token.
 
        """
 
        return hashlib.sha1('\0'.join([
 
            str(user.user_id),
 
            str(timestamp),
 
            user.password,
 
            session_id,
 
            config.get('app_instance_uuid'),
 
        ])).hexdigest()
 

	
 
    def send_reset_password_email(self, data):
 
        """
 
        Sends email with a password reset token and link to the password
 
        reset confirmation page with all information (including the token)
 
        pre-filled. Also returns URL of that page, only without the token,
 
        allowing users to copy-paste or manually enter the token from the
 
        email.
 
        """
 
        from kallithea.lib.celerylib import tasks, run_task
 
        from kallithea.model.notification import EmailNotificationModel
 
        import kallithea.lib.helpers as h
 

	
 
        user_email = data['email']
 
        user = User.get_by_email(user_email)
 
        timestamp = int(time.time())
 
        if user is not None:
 
            log.debug('password reset user found %s', user)
 
            link = h.canonical_url('reset_password_confirmation',
 
                                   key=user.api_key)
 
            log.debug('password reset user %s found', user)
 
            token = self.get_reset_password_token(user,
 
                                                  timestamp,
 
                                                  h.authentication_token())
 
            # URL must be fully qualified; but since the token is locked to
 
            # the current browser session, we must provide a URL with the
 
            # current scheme and hostname, rather than the canonical_url.
 
            link = h.url('reset_password_confirmation', qualified=True,
 
                         email=user_email,
 
                         timestamp=timestamp,
 
                         token=token)
 

	
 
            reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
 
            body = EmailNotificationModel().get_email_tmpl(
 
                reg_type, 'txt',
 
                user=user.short_contact,
 
                reset_token=token,
 
                reset_url=link)
 
            html_body = EmailNotificationModel().get_email_tmpl(
 
                reg_type, 'html',
 
                user=user.short_contact,
 
                reset_token=token,
 
                reset_url=link)
 
            log.debug('sending email')
 
            run_task(tasks.send_email, [user_email],
 
                     _("Password reset link"), body, html_body)
 
            log.info('send new password mail to %s', user_email)
 
        else:
 
            log.debug("password reset email %s not found", user_email)
 

	
 
        return True
 
        return h.url('reset_password_confirmation',
 
                     email=user_email,
 
                     timestamp=timestamp)
 

	
 
    def reset_password(self, data):
 
    def verify_reset_password_token(self, email, timestamp, token):
 
        from kallithea.lib.celerylib import tasks, run_task
 
        from kallithea.lib import auth
 
        user_email = data['email']
 
        import kallithea.lib.helpers as h
 
        user = User.get_by_email(email)
 
        if user is None:
 
            log.debug("user with email %s not found", email)
 
            return False
 

	
 
        token_age = int(time.time()) - int(timestamp)
 

	
 
        if token_age < 0:
 
            log.debug('timestamp is from the future')
 
            return False
 

	
 
        if token_age > UserModel.password_reset_token_lifetime:
 
            log.debug('password reset token expired')
 
            return False
 

	
 
        expected_token = self.get_reset_password_token(user,
 
                                                       timestamp,
 
                                                       h.authentication_token())
 
        log.debug('computed password reset token: %s', expected_token)
 
        log.debug('received password reset token: %s', token)
 
        return expected_token == token
 

	
 
    def reset_password(self, user_email, new_passwd):
 
        from kallithea.lib.celerylib import tasks, run_task
 
        from kallithea.lib import auth
 
        user = User.get_by_email(user_email)
 
        new_passwd = auth.PasswordGenerator().gen_password(
 
            8, auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
 
        if user is not None:
 
            user.password = auth.get_crypt_password(new_passwd)
 
            Session().add(user)
 
            Session().commit()
 
            log.info('change password for %s', user_email)
 
        if new_passwd is None:
 
            raise Exception('unable to generate new password')
 
            raise Exception('unable to set new password')
 

	
 
        run_task(tasks.send_email, [user_email],
 
                 _('Your new password'),
 
                 _('Your new Kallithea password:%s') % (new_passwd,))
 
        log.info('send new password mail to %s', user_email)
 
                 _('Password reset notification'),
 
                 _('The password to your account %s has been changed using password reset form.') % (user.username,))
 
        log.info('send password reset mail to %s', user_email)
 

	
 
        return True
 

	
 
    def has_perm(self, user, perm):
 
        perm = self._get_perm(perm)
 
        user = self._get_user(user)
 

	
 
        return UserToPerm.query().filter(UserToPerm.user == user)\
 
            .filter(UserToPerm.permission == perm).scalar() is not None
 

	
 
    def grant_perm(self, user, perm):
 
        """
 
        Grant user global permissions
 

	
 
        :param user:
 
        :param perm:
 
        """
 
        user = self._get_user(user)
 
        perm = self._get_perm(perm)
 
        # if this permission is already granted skip it
 
        _perm = UserToPerm.query()\
 
            .filter(UserToPerm.user == user)\
 
            .filter(UserToPerm.permission == perm)\
 
            .scalar()
 
        if _perm:
 
            return
 
        new = UserToPerm()
 
        new.user = user
 
        new.permission = perm
 
        self.sa.add(new)
 
        return new
 

	
 
    def revoke_perm(self, user, perm):
 
        """
 
        Revoke users global permissions
 

	
 
        :param user:
 
        :param perm:
 
        """
 
        user = self._get_user(user)
 
        perm = self._get_perm(perm)
 

	
 
        UserToPerm.query().filter(
 
            UserToPerm.user == user,
 
            UserToPerm.permission == perm,
 
        ).delete()
 

	
 
    def add_extra_email(self, user, email):
 
        """
 
        Adds email address to UserEmailMap
 

	
 
        :param user:
 
        :param email:
 
        """
 
        from kallithea.model import forms
 
        form = forms.UserExtraEmailForm()()
 
        data = form.to_python(dict(email=email))
 
        user = self._get_user(user)
 

	
 
        obj = UserEmailMap()
 
        obj.user = user
 
        obj.email = data['email']
 
        self.sa.add(obj)
 
        return obj
 

	
 
    def delete_extra_email(self, user, email_id):
 
        """
 
        Removes email address from UserEmailMap
 

	
 
        :param user:
 
        :param email_id:
 
        """
 
        user = self._get_user(user)
 
        obj = UserEmailMap.query().get(email_id)
 
        if obj is not None:
 
            self.sa.delete(obj)
 

	
 
    def add_extra_ip(self, user, ip):
 
        """
 
        Adds IP address to UserIpMap
 

	
 
        :param user:
 
        :param ip:
 
        """
 
        from kallithea.model import forms
 
        form = forms.UserExtraIpForm()()
 
        data = form.to_python(dict(ip=ip))
 
        user = self._get_user(user)
 

	
 
        obj = UserIpMap()
 
        obj.user = user
 
        obj.ip_addr = data['ip']
 
        self.sa.add(obj)
 
        return obj
 

	
 
    def delete_extra_ip(self, user, ip_id):
kallithea/templates/email_templates/password_reset.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
<%inherit file="main.html"/>
 

	
 
<h4>${_('Hello %s') % user}</h4>
 

	
 
<p>${_('We received a request to create a new password for your account.')}</p>
 
<p>${_('You can generate it by clicking following URL')}:</p>
 
<p>${_('We have received a request to reset the password for your account.')}</p>
 
<p>${_('To set a new password, click the following link')}:</p>
 
<p><a href="${reset_url}">${reset_url}</a></p>
 

	
 
<p>${_("Please ignore this email if you did not request a new password .")}</p>
 
<p>${_("Should you not be able to use the link above, please type the following code into the password reset form")}: <code>${reset_token}</code></p>
 

	
 
<p>${_("If it weren't you who requested the password reset, just disregard this message.")}</p>
kallithea/templates/email_templates/password_reset.txt
Show inline comments
 
## -*- coding: utf-8 -*-
 
<%inherit file="main.txt"/>
 

	
 
${_('Hello %s') % user|n,unicode}
 

	
 
${_('We received a request to create a new password for your account.')|n,unicode}
 
${_('You can generate it by clicking following URL')|n,unicode}:
 
${_('We have received a request to reset the password for your account..')|n,unicode}
 
${_('To set a new password, click the following link')|n,unicode}:
 

	
 
${reset_url|n,unicode}
 

	
 
${_("Please ignore this email if you did not request a new password .")|n,unicode}
 
${_("Should you not be able to use the link above, please type the following code into the password reset form")|n,unicode}: ${reset_token|n,unicode}
 

	
 
${_("If it weren't you who requested the password reset, just disregard this message.")|n,unicode}
kallithea/templates/password_reset.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
<%inherit file="base/root.html"/>
 

	
 
<%block name="title">
 
    ${_('Password Reset')}
 
</%block>
 

	
 
<div id="register">
 
    <%include file="/base/flash_msg.html"/>
 
    <div class="title withlogo">
 
        %if c.site_name:
 
            <h5>${_('Reset Your Password to %s') % c.site_name}</h5>
 
        %else:
 
            <h5>${_('Reset Your Password')}</h5>
 
        %endif
 
    </div>
 
    <div class="inner">
 
        ${h.form(url('password_reset'))}
 
        <div class="form">
 
            <!-- fields -->
 
            <div class="fields">
 

	
 
                 <div class="field">
 
                    <div class="label">
 
                        <label for="email">${_('Email Address')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.text('email')}
 
                    </div>
 
                 </div>
 

	
 
                %if c.captcha_active:
 
                <div class="field">
 
                    <div class="label">
 
                        <label for="email">${_('Captcha')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.hidden('recaptcha_field')}
 
                        <div id="recaptcha"></div>
 
                    </div>
 
                </div>
 
                %endif
 

	
 
                <div class="buttons">
 
                    <div class="nohighlight">
 
                      ${h.submit('send',_('Send Password Reset Email'),class_="btn")}
 
                          <div class="activation_msg">${_('Password reset link will be sent to the email address matching your username.')}</div>
 
                          <div class="activation_msg">${_('A password reset link will be sent to the specified email address if it is registered in the system.')}</div>
 
                    </div>
 
                </div>
 
            </div>
 
        </div>
 
        ${h.end_form()}
 
        %if c.captcha_active:
 
        <script type="text/javascript" src="https://www.google.com/recaptcha/api/js/recaptcha_ajax.js"></script>
 
        %endif
 
        <script type="text/javascript">
 
         $(document).ready(function(){
 
            $('#email').focus();
 
            %if c.captcha_active:
 
            Recaptcha.create("${c.captcha_public_key}", "recaptcha",
 
                {
 
                  theme: "white"
 
                }
 
            );
 
            %endif
 
         });
 
        </script>
 
    </div>
 
   </div>
kallithea/templates/password_reset_confirmation.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
<%inherit file="base/root.html"/>
 

	
 
<%block name="title">
 
    ${_('Reset Your Password')}
 
</%block>
 

	
 
<div id="register">
 
    <%include file="/base/flash_msg.html"/>
 
    <div class="title withlogo">
 
        %if c.site_name:
 
            <h5>${_('Reset Your Password to %s') % c.site_name}</h5>
 
        %else:
 
            <h5>${_('Reset Your Password')}</h5>
 
        %endif
 
    </div>
 
    <div class="inner">
 
        ${h.form(h.url('reset_password_confirmation'), method='post')}
 
        <p>${_('You are about to set a new password for the email address %s.') % c.email}</p>
 
        <p>${_('Note that you must use the same browser session for this as the one used to request the password reset.')}</p>
 
        <div style="display: none;">
 
            ${h.hidden('email')}
 
            ${h.hidden('timestamp')}
 
        </div>
 
        <div class="form">
 
            <!-- fields -->
 
            <div class="fields">
 
                 <div class="field">
 
                    <div class="label">
 
                        <label for="token">${_('Code you received in the email')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.text('token', class_='focus')}
 
                    </div>
 
                 </div>
 

	
 
                 <div class="field">
 
                    <div class="label">
 
                        <label for="password">${_('New Password')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.password('password',class_='focus')}
 
                    </div>
 
                 </div>
 

	
 
                 <div class="field">
 
                    <div class="label">
 
                        <label for="password_confirm">${_('Confirm New Password')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.password('password_confirm',class_='focus')}
 
                    </div>
 
                 </div>
 
                <div class="buttons">
 
                    <div class="nohighlight">
 
                      ${h.submit('send',_('Confirm'),class_="btn")}
 
                    </div>
 
                </div>
 
            </div>
 
        </div>
 
        ${h.end_form()}
 
    </div>
 
   </div>
kallithea/tests/functional/test_login.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from __future__ import with_statement
 
import re
 
import time
 

	
 
import mock
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.utils2 import generate_api_key
 
from kallithea.lib.auth import check_password
 
from kallithea.lib import helpers as h
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model import validators
 
from kallithea.model.db import User, Notification
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestLoginController(TestController):
 
    def setUp(self):
 
        self.remove_all_notifications()
 
        self.assertEqual(Notification.query().all(), [])
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='login', action='index'))
 
        self.assertEqual(response.status, '200 OK')
 
        # Test response...
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        self.assert_authenticated_user(response, TEST_USER_ADMIN_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        self.assert_authenticated_user(response, TEST_USER_REGULAR_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=test_came_from),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        response = response.follow()
 

	
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('Users Administration')
 

	
 
    def test_login_do_not_remember(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS,
 
                                  'remember': False})
 

	
 
        self.assertIn('Set-Cookie', response.headers)
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            self.assertFalse(re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE),
 
                'Cookie %r has expiration date, but should be a session cookie' % cookie)
 

	
 
    def test_login_remember(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS,
 
                                  'remember': True})
 

	
 
        self.assertIn('Set-Cookie', response.headers)
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            self.assertTrue(re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE),
 
                'Cookie %r should have expiration date, but is a session cookie' % cookie)
 

	
 
    def test_logout(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        # Verify that a login session has been established.
 
        response = self.app.get(url(controller='login', action='index'))
 
        response = response.follow()
 
        self.assertIn('authuser', response.session)
 

	
 
        response.click('Log Out')
 

	
 
        # Verify that the login session has been terminated.
 
        response = self.app.get(url(controller='login', action='index'))
 
        self.assertNotIn('authuser', response.session)
 

	
 
    @parameterized.expand([
 
          ('data:text/html,<script>window.alert("xss")</script>',),
 
          ('mailto:test@example.com',),
 
          ('file:///etc/passwd',),
 
          ('ftp://some.ftp.server',),
 
          ('http://other.domain/bl%C3%A5b%C3%A6rgr%C3%B8d',),
 
    ])
 
    def test_login_bad_came_froms(self, url_came_from):
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=url_came_from),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
@@ -233,237 +236,266 @@ class TestLoginController(TestController
 
                                             'lastname': 'test'})
 
        msg = validators.UniqSystemEmail()()._messages['email_taken']
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_wrong_data(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'xs',
 
                                             'password': 'test',
 
                                             'password_confirmation': 'test',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Enter a value 6 characters long or more')
 

	
 
    def test_register_err_username(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'error user',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Username may only contain '
 
                'alphanumeric characters underscores, '
 
                'periods or dashes and must begin with an '
 
                'alphanumeric character')
 

	
 
    def test_register_err_case_sensitive(self):
 
        usr = TEST_USER_ADMIN_LOGIN.title()
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': usr,
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        msg = validators.ValidUsername()._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': usr})
 
        response.mustcontain(msg)
 

	
 
    def test_register_special_chars(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                        {'username': 'xxxaxn',
 
                                         'password': 'ąćźżąśśśś',
 
                                         'password_confirmation': 'ąćźżąśśśś',
 
                                         'email': 'goodmailm@test.plx',
 
                                         'firstname': 'test',
 
                                         'lastname': 'test'})
 

	
 
        msg = validators.ValidPassword()._messages['invalid_password']
 
        response.mustcontain(msg)
 

	
 
    def test_register_password_mismatch(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'xs',
 
                                             'password': '123qwe',
 
                                             'password_confirmation': 'qwe123',
 
                                             'email': 'goodmailm@test.plxa',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 
        msg = validators.ValidPasswordsMatch('password', 'password_confirmation')._messages['password_mismatch']
 
        response.mustcontain(msg)
 

	
 
    def test_register_ok(self):
 
        username = 'test_regular4'
 
        password = 'qweqwe'
 
        email = 'username@test.com'
 
        name = 'testname'
 
        lastname = 'testlastname'
 

	
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': username,
 
                                             'password': password,
 
                                             'password_confirmation': password,
 
                                             'email': email,
 
                                             'firstname': name,
 
                                             'lastname': lastname,
 
                                             'admin': True})  # This should be overriden
 
        self.assertEqual(response.status, '302 Found')
 
        self.checkSessionFlash(response, 'You have successfully registered into Kallithea')
 

	
 
        ret = Session().query(User).filter(User.username == 'test_regular4').one()
 
        self.assertEqual(ret.username, username)
 
        self.assertEqual(check_password(password, ret.password), True)
 
        self.assertEqual(ret.email, email)
 
        self.assertEqual(ret.name, name)
 
        self.assertEqual(ret.lastname, lastname)
 
        self.assertNotEqual(ret.api_key, None)
 
        self.assertEqual(ret.admin, False)
 

	
 
    #==========================================================================
 
    # PASSWORD RESET
 
    #==========================================================================
 

	
 
    def test_forgot_password_wrong_mail(self):
 
        bad_email = 'username%wrongmail.org'
 
        response = self.app.post(
 
                        url(controller='login', action='password_reset'),
 
                            {'email': bad_email, }
 
        )
 

	
 
        response.mustcontain('An email address must contain a single @')
 

	
 
    def test_forgot_password(self):
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset'))
 
        self.assertEqual(response.status, '200 OK')
 

	
 
        username = 'test_password_reset_1'
 
        password = 'qweqwe'
 
        email = 'username@python-works.com'
 
        name = 'passwd'
 
        lastname = 'reset'
 
        timestamp = int(time.time())
 

	
 
        new = User()
 
        new.username = username
 
        new.password = password
 
        new.email = email
 
        new.name = name
 
        new.lastname = lastname
 
        new.api_key = generate_api_key()
 
        Session().add(new)
 
        Session().commit()
 

	
 
        response = self.app.post(url(controller='login',
 
                                     action='password_reset'),
 
                                 {'email': email, })
 

	
 
        self.checkSessionFlash(response, 'Your password reset link was sent')
 
        self.checkSessionFlash(response, 'A password reset confirmation code has been sent')
 

	
 
        response = response.follow()
 

	
 
        # BAD KEY
 
        # BAD TOKEN
 

	
 
        token = "bad"
 

	
 
        key = "bad"
 
        response = self.app.post(url(controller='login',
 
                                     action='password_reset_confirmation'),
 
                                 {'email': email,
 
                                  'timestamp': timestamp,
 
                                  'password': "p@ssw0rd",
 
                                  'password_confirm': "p@ssw0rd",
 
                                  'token': token,
 
                                 })
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('Invalid password reset token')
 

	
 
        # GOOD TOKEN
 

	
 
        # TODO: The token should ideally be taken from the mail sent
 
        # above, instead of being recalculated.
 

	
 
        token = UserModel().get_reset_password_token(
 
            User.get_by_username(username), timestamp, self.authentication_token())
 

	
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset_confirmation',
 
                                    key=key))
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertTrue(response.location.endswith(url('reset_password')))
 

	
 
        # GOOD KEY
 
                                    email=email,
 
                                    timestamp=timestamp,
 
                                    token=token))
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain("You are about to set a new password for the email address %s" % email)
 

	
 
        key = User.get_by_username(username).api_key
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset_confirmation',
 
                                    key=key))
 
        response = self.app.post(url(controller='login',
 
                                     action='password_reset_confirmation'),
 
                                 {'email': email,
 
                                  'timestamp': timestamp,
 
                                  'password': "p@ssw0rd",
 
                                  'password_confirm': "p@ssw0rd",
 
                                  'token': token,
 
                                 })
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertTrue(response.location.endswith(url('login_home')))
 

	
 
        self.checkSessionFlash(response,
 
                               ('Your password reset was successful, '
 
                                'new password has been sent to your email'))
 
        self.checkSessionFlash(response, 'Successfully updated password')
 

	
 
        response = response.follow()
 

	
 
    #==========================================================================
 
    # API
 
    #==========================================================================
 

	
 
    def _get_api_whitelist(self, values=None):
 
        config = {'api_access_controllers_whitelist': values or []}
 
        return config
 

	
 
    @parameterized.expand([
 
        ('none', None),
 
        ('empty_string', ''),
 
        ('fake_number', '123456'),
 
        ('proper_api_key', None)
 
    ])
 
    def test_access_not_whitelisted_page_via_api_key(self, test_name, api_key):
 
        whitelist = self._get_api_whitelist([])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual([],
 
                             whitelist['api_access_controllers_whitelist'])
 
            if test_name == 'proper_api_key':
 
                #use builtin if api_key is None
 
                api_key = User.get_first_admin().api_key
 

	
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=api_key),
 
                             status=403)
 

	
 
    @parameterized.expand([
 
        ('none', None, 302),
 
        ('empty_string', '', 302),
 
        ('fake_number', '123456', 302),
 
        ('fake_not_alnum', 'a-z', 302),
 
        ('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 302),
 
        ('proper_api_key', None, 200)
 
    ])
 
    def test_access_whitelisted_page_via_api_key(self, test_name, api_key, code):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 
            if test_name == 'proper_api_key':
 
                api_key = User.get_first_admin().api_key
 

	
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=api_key),
 
                             status=code)
 

	
 
    def test_access_page_via_extra_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=new_api_key.api_key),
 
                             status=200)
 

	
 
    def test_access_page_via_expired_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            #patch the API key and make it expired
 
            new_api_key.expires = 0
 
            Session().add(new_api_key)
 
            Session().commit()
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip',
 
                                 api_key=new_api_key.api_key),
 
                             status=302)
0 comments (0 inline, 0 general)