Changeset - 97619528c270
[Not reviewed]
default
0 2 0
Søren Løvborg - 9 years ago 2016-11-09 15:49:49
sorenl@unity3d.com
auth: remove KallitheaCrypto pseudo-class

Every caller except one already used the module-level wrapper functions.
2 files changed with 30 insertions and 42 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth
 
~~~~~~~~~~~~~~~~~~
 

	
 
authentication and permission libraries
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import time
 
import os
 
import logging
 
import traceback
 
import hashlib
 
import itertools
 
import collections
 

	
 
from decorator import decorator
 

	
 
from pylons import request, session
 
from pylons.i18n.translation import _
 
from webhelpers.pylonslib import secure_form
 
from sqlalchemy import or_
 
from sqlalchemy.orm.exc import ObjectDeletedError
 
from sqlalchemy.orm import joinedload
 
from webob.exc import HTTPFound, HTTPBadRequest, HTTPForbidden, HTTPMethodNotAllowed
 

	
 
from kallithea import __platform__, is_windows, is_unix
 
from kallithea.config.routing import url
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model import meta
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 
from kallithea.model.db import User, Repository, Permission, \
 
    UserToPerm, UserGroupRepoToPerm, UserGroupToPerm, UserGroupMember, \
 
    RepoGroup, UserGroupRepoGroupToPerm, UserIpMap, UserGroupUserGroupToPerm, \
 
    UserGroup, UserApiKeys
 

	
 
from kallithea.lib.utils2 import safe_str, safe_unicode, aslist
 
from kallithea.lib.utils import get_repo_slug, get_repo_group_slug, \
 
    get_user_group_slug, conditional_cache
 
from kallithea.lib.caching_query import FromCache
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PasswordGenerator(object):
 
    """
 
    This is a simple class for generating password from different sets of
 
    characters
 
    usage::
 

	
 
        passwd_gen = PasswordGenerator()
 
        #print 8-letter password containing only big and small letters
 
            of alphabet
 
        passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
 
    """
 
    ALPHABETS_NUM = r'''1234567890'''
 
    ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
 
    ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
 
    ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
 
    ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
 
        + ALPHABETS_NUM + ALPHABETS_SPECIAL
 
    ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
 
    ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
 
    ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
 
    ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
 

	
 
    def gen_password(self, length, alphabet=ALPHABETS_FULL):
 
        assert len(alphabet) <= 256, alphabet
 
        l = []
 
        while len(l) < length:
 
            i = ord(os.urandom(1))
 
            if i < len(alphabet):
 
                l.append(alphabet[i])
 
        return ''.join(l)
 

	
 

	
 
class KallitheaCrypto(object):
 

	
 
    @classmethod
 
    def hash_string(cls, str_):
 
        """
 
        Cryptographic function used for password hashing based on pybcrypt
 
        or Python's own OpenSSL wrapper on windows
 

	
 
        :param password: password to hash
 
        """
 
        if is_windows:
 
            return hashlib.sha256(str_).hexdigest()
 
        elif is_unix:
 
            import bcrypt
 
            return bcrypt.hashpw(safe_str(str_), bcrypt.gensalt(10))
 
        else:
 
            raise Exception('Unknown or unsupported platform %s' \
 
                            % __platform__)
 
def get_crypt_password(password):
 
    """
 
    Cryptographic function used for password hashing based on pybcrypt
 
    or Python's own OpenSSL wrapper on windows
 

	
 
    @classmethod
 
    def hash_check(cls, password, hashed):
 
        """
 
        Checks matching password with it's hashed value, runs different
 
        implementation based on platform it runs on
 

	
 
        :param password: password
 
        :param hashed: password in hashed form
 
        """
 

	
 
        if is_windows:
 
            return hashlib.sha256(password).hexdigest() == hashed
 
        elif is_unix:
 
            import bcrypt
 
            return bcrypt.checkpw(safe_str(password), safe_str(hashed))
 
        else:
 
            raise Exception('Unknown or unsupported platform %s' \
 
                            % __platform__)
 

	
 

	
 
def get_crypt_password(password):
 
    return KallitheaCrypto.hash_string(password)
 
    :param password: password to hash
 
    """
 
    if is_windows:
 
        return hashlib.sha256(password).hexdigest()
 
    elif is_unix:
 
        import bcrypt
 
        return bcrypt.hashpw(safe_str(password), bcrypt.gensalt(10))
 
    else:
 
        raise Exception('Unknown or unsupported platform %s' \
 
                        % __platform__)
 

	
 

	
 
def check_password(password, hashed):
 
    return KallitheaCrypto.hash_check(password, hashed)
 
    """
 
    Checks matching password with it's hashed value, runs different
 
    implementation based on platform it runs on
 

	
 
    :param password: password
 
    :param hashed: password in hashed form
 
    """
 

	
 
    if is_windows:
 
        return hashlib.sha256(password).hexdigest() == hashed
 
    elif is_unix:
 
        import bcrypt
 
        return bcrypt.checkpw(safe_str(password), safe_str(hashed))
 
    else:
 
        raise Exception('Unknown or unsupported platform %s' \
 
                        % __platform__)
 

	
 

	
 
def _cached_perms_data(user_id, user_is_admin, user_inherit_default_permissions,
 
                       explicit, algo):
 
    RK = 'repositories'
 
    GK = 'repositories_groups'
 
    UK = 'user_groups'
 
    GLOBAL = 'global'
 
    PERM_WEIGHTS = Permission.PERM_WEIGHTS
 
    permissions = {RK: {}, GK: {}, UK: {}, GLOBAL: set()}
 

	
 
    def _choose_perm(new_perm, cur_perm):
 
        new_perm_val = PERM_WEIGHTS[new_perm]
 
        cur_perm_val = PERM_WEIGHTS[cur_perm]
 
        if algo == 'higherwin':
 
            if new_perm_val > cur_perm_val:
 
                return new_perm
 
            return cur_perm
 
        elif algo == 'lowerwin':
 
            if new_perm_val < cur_perm_val:
 
                return new_perm
 
            return cur_perm
 

	
 
    #======================================================================
 
    # fetch default permissions
 
    #======================================================================
 
    default_user = User.get_by_username('default', cache=True)
 
    default_user_id = default_user.user_id
 

	
 
    default_repo_perms = Permission.get_default_perms(default_user_id)
 
    default_repo_groups_perms = Permission.get_default_group_perms(default_user_id)
 
    default_user_group_perms = Permission.get_default_user_group_perms(default_user_id)
 

	
 
    if user_is_admin:
 
        #==================================================================
 
        # admin users have all rights;
 
        # based on default permissions, just set everything to admin
 
        #==================================================================
 
        permissions[GLOBAL].add('hg.admin')
 
        permissions[GLOBAL].add('hg.create.write_on_repogroup.true')
 

	
 
        # repositories
 
        for perm in default_repo_perms:
 
            r_k = perm.UserRepoToPerm.repository.repo_name
 
            p = 'repository.admin'
 
            permissions[RK][r_k] = p
 

	
 
        # repository groups
 
        for perm in default_repo_groups_perms:
 
            rg_k = perm.UserRepoGroupToPerm.group.group_name
 
            p = 'group.admin'
 
            permissions[GK][rg_k] = p
 

	
 
        # user groups
 
        for perm in default_user_group_perms:
 
            u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
            p = 'usergroup.admin'
 
            permissions[UK][u_k] = p
 
        return permissions
 

	
 
    #==================================================================
 
    # SET DEFAULTS GLOBAL, REPOS, REPOSITORY GROUPS
 
    #==================================================================
 

	
 
    # default global permissions taken from the default user
 
    default_global_perms = UserToPerm.query() \
 
        .filter(UserToPerm.user_id == default_user_id) \
 
        .options(joinedload(UserToPerm.permission))
 

	
 
    for perm in default_global_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # defaults for repositories, taken from default user
 
    for perm in default_repo_perms:
 
        r_k = perm.UserRepoToPerm.repository.repo_name
 
        if perm.Repository.private and not (perm.Repository.owner_id == user_id):
 
            # disable defaults for private repos,
 
            p = 'repository.none'
 
        elif perm.Repository.owner_id == user_id:
 
            # set admin if owner
 
            p = 'repository.admin'
 
        else:
 
            p = perm.Permission.permission_name
 

	
 
        permissions[RK][r_k] = p
 

	
 
    # defaults for repository groups taken from default user permission
 
    # on given group
 
    for perm in default_repo_groups_perms:
 
        rg_k = perm.UserRepoGroupToPerm.group.group_name
 
        p = perm.Permission.permission_name
 
        permissions[GK][rg_k] = p
 

	
 
    # defaults for user groups taken from default user permission
 
    # on given user group
 
    for perm in default_user_group_perms:
kallithea/lib/auth_modules/auth_internal.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth_modules.auth_internal
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea authentication plugin for built in internal auth
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Created on Nov 17, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 

	
 
from kallithea.lib import auth_modules
 
from kallithea.lib.compat import formatted_json, hybrid_property
 
from kallithea.model.db import User
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class KallitheaAuthPlugin(auth_modules.KallitheaAuthPluginBase):
 
    def __init__(self):
 
        pass
 

	
 
    @hybrid_property
 
    def name(self):
 
        # Also found as kallithea.lib.model.db.User.DEFAULT_AUTH_TYPE
 
        return 'internal'
 

	
 
    def settings(self):
 
        return []
 

	
 
    def user_activation_state(self):
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        return 'hg.register.auto_activate' in def_user_perms
 

	
 
    def accepts(self, user, accepts_empty=True):
 
        """
 
        Custom accepts for this auth that doesn't accept empty users. We
 
        know that user exists in database.
 
        """
 
        return super(KallitheaAuthPlugin, self).accepts(user,
 
                                                        accepts_empty=False)
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        if not userobj:
 
            log.debug('userobj was:%s skipping', userobj)
 
            return None
 
        if userobj.extern_type != self.name:
 
            log.warning("userobj:%s extern_type mismatch got:`%s` expected:`%s`",
 
                     userobj, userobj.extern_type, self.name)
 
            return None
 
        if not username:
 
            log.debug('Empty username - skipping...')
 
            return None
 

	
 
        user_data = {
 
            "username": userobj.username,
 
            "firstname": userobj.firstname,
 
            "lastname": userobj.lastname,
 
            "groups": [],
 
            "email": userobj.email,
 
            "admin": userobj.admin,
 
            "active": userobj.active,
 
            "active_from_extern": userobj.active,
 
            "extern_name": userobj.user_id,
 
        }
 

	
 
        log.debug(formatted_json(user_data))
 
        if userobj.active:
 
            from kallithea.lib import auth
 
            password_match = auth.KallitheaCrypto.hash_check(password, userobj.password)
 
            password_match = auth.check_password(password, userobj.password)
 
            if userobj.username == User.DEFAULT_USER and userobj.active:
 
                log.info('user %s authenticated correctly as anonymous user',
 
                         username)
 
                return user_data
 

	
 
            elif userobj.username == username and password_match:
 
                log.info('user %s authenticated correctly', user_data['username'])
 
                return user_data
 
            log.error("user %s had a bad password", username)
 
            return None
 
        else:
 
            log.warning('user %s tried auth but is disabled', username)
 
            return None
 

	
 
    def get_managed_fields(self):
 
        # Note: 'username' should only be editable (at least for user) if self registration is enabled
 
        return []
0 comments (0 inline, 0 general)